I have the following Pipeline:
Source: MySQL Source
-> Process
-> Process
-> (
Map -> Sink: Writer -> Sink: Committer,
Map -> Sink: Writer -> Sink: Committer,
Map -> Sink: Writer -> Sink: Committer,
Sink: Print to Std. Out
)
In first Process
I check for a condition, if it is met, I run out.collect()
on the message, and in addition, run out.collect()
to send a STOP signal message.
In second Process
I check for the STOP signal message and throw an RuntimeException
.
My intention was to sink the last message, then bring the whole pipeline down, stop it or pause it indefinitely, waiting for human intervention.
I can see the exception being thrown, but the whole stream just continued. It seems that the STOP signal message had been just skipped or ignored.
This puzzles me as the parallelism is 1, I’d expect the message being either retried or failed, but none of these happens.
Restarting strategy was set the NoRestart and had no effect. I could do a Thread.sleep()
or System.exit()
but those also stop the checkpointing meaning the last message before STOP signal will not get into the sink.
I tried the same thing in Map
function, in which I threw an exception. Still the same thing.
My question: Isn’t Flink suppose to stop and retry an operator when there is a failure? Is there some hidden configuration I missed?