Here’s the background:
StarRocks 3.1.2, Flink 1.16, flink-sql-connection-mysql-cdc-2.2.0.jar, flink-connector-starrocks-1.2.9_flink-1.16.jar
Mysql5.7.4 enabled binlog
sql is as follows:
CREATE TABLE IF NOT EXISTS cvm_price_factor_sink (
Id BIGINT,
Region STRING,
SelfVersionNo BIGINT,
NextVersionNo BIGINT,
ChargeDetailType INT,
Multi DECIMAL,
Div DECIMAL,
PRIMARY KEY (Id, Region) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'sink.max-retries' = '10',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.format' = 'json',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.transaction.enabled' = 'false',
'jdbc-url' = 'jdbc:mysql://xxx:9030',
'username' = 'root',
'password' = 'xxx',
'load-url' = 'xxx:8030',
'database-name' = 'dev_int1_brp_oss',
'table-name' = 'cvm_price_factor');
CREATE TABLE IF NOT EXISTS cvm_price_factor_src_dev (
Id BIGINT,
Region STRING,
SelfVersionNo BIGINT,
NextVersionNo BIGINT,
ChargeDetailType INT,
Multi DECIMAL,
Div DECIMAL,
PRIMARY KEY (Id, Region) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'server-time-zone' = 'Asia/Shanghai',
'hostname' = 'xxx',
'port' = '3306',
'username' = 'root',
'password' = 'xxx',
'database-name' = 'dev_int1_brp_oss',
'table-name' = 'cvm_price_factor');
INSERT INTO cvm_price_factor_sink SELECT * FROM cvm_price_factor_src_dev;
The error is as follows:
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Can’t find ‘Status’ in the response of transaction begin request. Transaction load is supported since StarRocks 2.4, and please make sure your StarRocks version support transaction load first. db: dev_int1_brp_oss, table: cvm_price_factor, label: flink-8fc21496-e705-4b29-9fa0-710442e0f250, response:
at com.starrocks.data.load.stream.TransactionStreamLoader.doBegin(TransactionStreamLoader.java:144)
at com.starrocks.data.load.stream.TransactionStreamLoader.begin(TransactionStreamLoader.java:99)
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:169)
at com.starrocks.data.load.stream.v2.TransactionTableRegion.streamLoad(TransactionTableRegion.java:331)
at com.starrocks.data.load.stream.v2.TransactionTableRegion.flush(TransactionTableRegion.java:228)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:220)
… 1 more
I want to synchronize the data from the specified library table in mysql to the corresponding library table in starrocks
turkey is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.