In Apache Flink, how can I catch the exceptions thrown by Kafka Source Connector?
I’ve a use-case where one of the kafka-clusters which I connect to in my flink application becomes unavailable. If I can catch the exception, I can handle it with some other business logic.
However, it looks like the DataStream constructed from the KafkaSource is lazily evaluated. The error is not thrown until keyBy and process.
Is there a way to catch the exception when I create the source like this?
KafkaSource<List<Event>> kafkaSource = KafkaSourceFactory.createKafkaSource(kafkaTopic, kafkaConfig, deserializationSchema);
return env.fromSource(kafkaSource, watermarkStrategy, uniqueSourceName + "_input")
.uid(uniqueSourceName + "_input")