I need use flinkcdc 2.4 capture data from mysql , I need write a backfill program, I want to do a initial mode just capture the subset of the table, I find the property snapshot.select.statement.overrides, but it does not work, I find a issue in github
https://github.com/apache/flink-cdc/discussions/1131
it said the scan.incremental.snapshot.enabled should be false, but how to config this property to false in datastream api? thanks
String dbName="db1";
String tableName="table11";
String query = "select * from db1.table11 where id='1' ";
Properties debeziumProperties = new Properties();
// debeziumProperties.put("scan.incremental.snapshot.enabled", "false");
// debeziumProperties.put("snapshot.select.statement.overrides",String.format("%s.%s",dbName,tableName));
// debeziumProperties.put(String.format("snapshot.select.statement.overrides.%s.%s", dbName,tableName), query);
debeziumProperties.put("snapshot.select.statement.overrides","db1.table11");
debeziumProperties.put("snapshot.select.statement.overrides.db1.table11", query);
String configFile = "application-batch.conf";
ConfigParser.init(configFile);
ConfigParser.MySqlConfig config = ConfigParser.getMysqlConfig();
Map<String, Object> jsonConvertConfig = new HashMap<>();
jsonConvertConfig.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
.hostname(config.getHost())
.port(config.getPort())
.databaseList(dbName)
.tableList(String.format("%s.%s",dbName,tableName))
.username(config.getUserName())
.password(config.getPassword())
.debeziumProperties(debeziumProperties)
.deserializer(new JsonDebeziumDeserializationSchema(true, jsonConvertConfig))
.startupOptions(StartupOptions.initial());
Many thanks.