I have a Flink pipeline which has two sinks , one sink to publish messages to Kafka and other to update the status in database. I am following exactly once semantics using Kafka transaction and XA JDBC transaction. The issue which I am facing is when the database is down and Kafka broker is up , flink sends the messages to broker (it commits the messages , I verified that my consumer reads only committed messages) even when the DB wont come up after a set number of flink retries. My requirement is to send the message and update the corresponding record only once. How to achieve that.
keyedEventLog stream gets input from JDBC source and is then transformed to kafkaPRoducerRecordStream to be used by Kafka Sink.
Kafka Sink
CustomKafkaUtil.writeRecords(kafkaProducerRecordStream, writeConfig, StringSerializer::new, "oiwp-event-log");
Custom Kafka Util class
CustomKafkaUtil extends KafkaUtil {
public static <V> void writeRecords(DataStream<KafkaProducerRecord<byte[], V>> producerRecords, EventbusWriteRuntimeConf writeConfig, SerializableSupplier<Serializer<V>> serializerSupplier, String uidSuffix) {
uidSuffix = uidSuffix != null ? uidSuffix : writeConfig.topic;
producerRecords
.sinkTo(getCustomSink(writeConfig))
.name("kafka-sink-" + uidSuffix)
.uid("kafka-sink-" + uidSuffix);
}
private static KafkaSink<Object> getCustomSink(EventbusWriteRuntimeConf writeConfig) {
String topic = writeConfig.topic;
Properties producerProperties = SppConfigUtil.instance.getKafkaProducerConfig(topic, writeConfig.cluster, writeConfig.region);
if (!writeConfig.kafkaProperties.isEmpty()) {
producerProperties.putAll(writeConfig.kafkaProperties);
}
log.info("producer properties: {}", producerProperties);
return getSinkBuilder(producerProperties)
.setRecordSerializer(new KafkaRecordSerializationSchema<Object>() {
private static final long serialVersionUID = 5597069351310493251L;
public ProducerRecord<byte[], byte[]> serialize(Object element, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
KafkaProducerRecord<byte[], byte[]> kafkaRecord = (KafkaProducerRecord<byte[], byte[]>) element;
return new ProducerRecord<>(kafkaRecord.topic, kafkaRecord.partition, kafkaRecord.timestamp, kafkaRecord.key, kafkaRecord.value, KafkaUtil.getRecordHeaders(kafkaRecord.headers));
}
})
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("oiwp-kafka-sink-random")
.build();
}
}
JDBC Sink
keyedEventLogRecord.addSink(
JdbcSink.exactlyOnceSink(
"update " + applicationConfig.getDbConfig().getSchema() + "." + "event_log " + "set event_status = 'SUCCESS' where id = ?",
((preparedStatement, row) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(row.f0);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
preparedStatement.setString(1, jsonNode.get("id").asText());
}),
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.builder().withAllowOutOfOrderCommits(false)
.withTransactionPerConnection(true)
.build(),
new PGXADataSourceProvider(
applicationConfig.getDbConfig().getJdbcUrl() + "/" + applicationConfig.getDbConfig().getDatabase(),
applicationConfig.getDbConfig().getUsername(),
applicationConfig.getDbConfig().getPassword()
))).name("oiwp-event-log-jdbc-sink").uid("oiwp-event-log-jdbc-sink");
1
If the JDBC sink succeeds during the “pre-commit” phase, but then fails during the commit phase, the workflow should fail, and the JDBC commit is re-tried when the job is restarted. If it continues to fail, you can wind up with data written to Kafka (assuming its commit succeeded) but without the JDBC transaction data being saved in the DB. The logs might provide information on timing for succeeding/failing commits, which would help confirm or disprove the above.
Below is the relevant snippet from Flink’s End-to-End Exactly-Once document:
After a successful pre-commit, the commit must be guaranteed to eventually succeed – both our operators and our external system need to make this guarantee. If a commit fails (for example, due to an intermittent network issue), the entire Flink application fails, restarts according to the user’s restart strategy, and there is another commit attempt. This process is critical because if the commit does not eventually succeed, data loss occurs.
2