I’m encountering an issue while using Apache Flink’s Kafka connector. My requirement is to consume data from a Kafka topic starting at a specific timestamp. However, I’m unsure whether there is data available at that timestamp in the topic. If no data exists, the connector throws an error. I would like the connector to reset the offset to the earliest available offset instead of failing when no data is found at the specified timestamp.
Configuration:
Below is my current table creation SQL:
CREATE TABLE TEST_TABLE (
code_type_id STRING,
create_user STRING,
create_time STRING,
create_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'TEST_TOPIC',
'properties.bootstrap.servers' = '10.xx.xx.xx:9092',
'properties.group.id' = 'test_group',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1734074100000',
'properties.auto.offset.reset' = 'earliest',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true',
'csv.field-delimiter' = 'u0001',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxxxxxx";',
'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'
);
Error log:
Code:
Flink Version: 1.16
What I’ve Tried:
I’m using Flink’s Kafka connector with the following configurations:
scan.startup.mode
set to timestamp to start reading from a specific timestamp.
properties.auto.offset.reset
set to earliest to reset the offset to the earliest available when no data is found.
Issue:
Despite these settings, when there is no data at the specified timestamp, the connector throws an error instead of resetting the offset to the earliest position as expected. It seems that the properties.auto.offset.reset
is not taking effect in this scenario.
Expected Behavior:
When no data exists at the specified scan.startup.timestamp-millis
, the Kafka connector should automatically reset the offset to the earliest available position without throwing an error.
What Happens Instead:
An error is thrown when there is no data at the specified timestamp, and the properties.auto.offset.reset = 'earliest'
does not seem to take effect.
Additional Information:
I’ve set csv.ignore-parse-errors
to true and csv.allow-comments
to true to handle potential data issues.
The field delimiter is set to u0001.
Security is configured using SASL with SCRAM-SHA-256 mechanism.
Question:
How can I configure the Flink Kafka connector to reset the offset to the earliest available when no data exists at the specified startup timestamp, instead of throwing an error?
Thanks!
wancrin potter is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
3