I’m using PyFlink (1.18v) on Kubernetes pods and want to ensure that checkpointing is enabled so that if a pod restarts, the process continues from the last checkpoint. I have configured checkpointing to write to an AWS S3 bucket, and it appears to be working. However, after killing a pod, the process does not resume from the last checkpoint, leading to missing data in the Kafka stream after the pod restarts.
How can I resolve this issue?
Code:
# Set up the streaming execution environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# checkpoint
env.enable_checkpointing(60*1000, CheckpointingMode.AT_LEAST_ONCE)
# Configure the checkpoint storage to S3
s3_checkpoint_path = CheckpointStorage("s3://xxx/flink-checkpoints")
env.get_checkpoint_config().set_checkpoint_storage(s3_checkpoint_path)
# Define Kafka consumer properties
kafka_consumer_properties = {
"bootstrap.servers": self.kafka_host,
"group.id": "xxx_group",
"security.protocol": "PLAINTEXT"
}
# Create a Kafka consumer
kafka_consumer = FlinkKafkaConsumer(
topics=[
"topic-1",
"topic-2"
],
deserialization_schema=SimpleStringSchema(),
properties=kafka_consumer_properties
)
kafka_consumer.set_commit_offsets_on_checkpoints(True)
kafka_consumer.set_start_from_latest()
watermark_strategy = (
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
self.CustomTimestampAssigner()
)
)
# Add the Kafka consumer as a source to the Flink job
stream = env.add_source(kafka_consumer)
.map(self._decrypt_and_unpickle_message)
.filter(lambda x: x is not None)
.map(self.NormalizeMap(self))
.assign_timestamps_and_watermarks(watermark_strategy)
.key_by(lambda record: record.get("_dwh_table_name"), key_type=Types.STRING())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(self.Sink(self))
stream.print()
# Execute the Flink job
env.execute("Kafka Flink Streaming Job")