We are using PyFlink for our data stream processing pipeline.
Looking for a way to handle failures globally. There is a FailureEnricher in the Java implementation.
I came across this code in an example
<code>from pyflink.util.failure_enricher import FailureEnricher
class MyCustomFailureEnricher(FailureEnricher):
def enrich(self, failure):
# Custom logic to handle the failure
print("A failure occurred:", failure)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_failure_enricher(MyCustomFailureEnricher())
</code>
<code>from pyflink.util.failure_enricher import FailureEnricher
class MyCustomFailureEnricher(FailureEnricher):
def enrich(self, failure):
# Custom logic to handle the failure
print("A failure occurred:", failure)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_failure_enricher(MyCustomFailureEnricher())
</code>
from pyflink.util.failure_enricher import FailureEnricher
class MyCustomFailureEnricher(FailureEnricher):
def enrich(self, failure):
# Custom logic to handle the failure
print("A failure occurred:", failure)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_failure_enricher(MyCustomFailureEnricher())
However this class from pyflink.util.failure_enricher import FailureEnricher
is not available in the apache-flink version 1.19 of pyflink.
What are we missing and which version of pyflink has support for FailureEnrichers?