I am new to Flink and have a use case to consume data from Topic1 and insert/update in DB and also push the same data to Topic2 that will be used by different services. The code I have now is something like below:
DataStream<AxonMessage> stream =
env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
WatermarkStrategy.noWatermarks(), "foo-kafka-source")
.map(axonMessage -> (FooModel) axonMessage.getPayload());
stream.addSink(jdbc.exactlyOnceSink(new FooJdbcSink()))
.name("data-db-sink")
.uid("data-db-sink");
stream.sinkTo(kafka.exactlyOnceSink(fooSchema))
.name("data-kafka-sink")
.uid("data-kafka-sink");
The requirement is to complete the both sink operation as one transaction, i.e if the data is not inserted in the DB it should not be pushed to Topic2 as well (in case of any error).
Will the above sample code be enough (I referred this article as well) or if not how to manage transactions in Flink.
Any help will be appreciated.
Thanks!!!