I encountered an issue while using Flink CDC to sync data from MySQL to Elasticsearch. There’s a one-to-many relationship between two MySQL tables, “matcher” and “matcher_task”.
Table:
CREATE TABLE `matcher`(
`id`bigint NOT NULL AUTO_INCREMENT,
`position_code`varchar(100),
`talent_code`varchar(60),
`talent_flag` tinyint(1)
);
CREATE TABLE `matcher_task` (
`id` bigint NOT NULL AUTO_INCREMENT,
`matcher_id` bigint,
`employee_code` varchar(60),
`read_flag` tinyint(1) DEFAULT '0',
`step` tinyint(1) DEFAULT NULL
);
I want to store “matcher_task” data in Elasticsearch within a field “taskJson” of type “nested” in an index.
Index:
"matcher_index": {
"mappings": {
"_doc": {
"properties": {
"id": {
"type": "long"
},
"positionCode": {
"type": "keyword"
},
"taskJson": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"matcherId": {
"type": "long"
},
"employeeCode": {
"type": "keyword"
},
"readFlag": {
"type": "keyword"
},
"step": {
"type": "keyword"
}
}
}
}
}
}
}
Version:
- Elasticsearch:6
- Flink:1.14
Initially, I wanted to use Flink SQL aggregate functions to solve this problem, but Flink 1.14 does not support the syntax for aggregate functions.
function: JSON_OBJECT,JSON_ARRAYAGG
SOURCE:
<entry key="SOURCE">
CREATE TABLE matcher_task_source (
`id` BIGINT NOT NULL,
`matcher_id` BIGINT,
`employee_code` STRING,
`read_flag` INT,
`step` INT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '${mysql_host}',
'port' = '${mysql_port}',
'username' = '${mysql_username}',
'password' = '${mysql_password}',
'database-name' = '${mysql_database}',
'table-name' = 'matcher_task',
)
</entry>
SINK:
<entry key="SINK">
CREATE TABLE matcher_index (
`id` BIGINT NOT NULL,
`taskJson` ARRAY<ROW<`id` STRING, `matcherId` STRING, `employeeCode` STRING, `readFlag` STRING, `step` STRING>>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '${es_url}',
${es_auth}
'index' = 'matcher_index',
'document-type' = '_doc'
)
</entry>
<!-- -->
<entry key="SINK">
INSERT INTO matcher_index
select mt.resume_id as id,
JSON_ARRAYAGG(
JSON_OBJECT(
'id',mt.id,
'matcherId',mt.matcher_id,
'employeeCode',mt.employee_code,
'readFlag',mt.read_flag,
'step',mt.step
)
) AS taskJson
from matcher_task_source mt
GROUP BY mt.resume_id
</entry>
I attempted to use a Flink UDF (User-Defined Function “JsonConcatAgg”) to solve it, but it still didn’t work!
SINK:
<entry key="SINK">
INSERT INTO matcher_index
select mt.resume_id as id,
JsonConcatAgg(
mt.id,
mt.matcherId,
mt.read_flag,
mt.employee_code,
mt.step
) AS taskJson
from matcher_task_source mt
GROUP BY mt.resume_id
</entry>
Java:
import com.google.gson.JsonObject;
import org.apache.flink.table.functions.AggregateFunction;
public class JsonConcatAggFunction extends AggregateFunction<String, JsonConcatAggFunction.Accumulator> {
public static class Accumulator {
public String jsonArray = "[";
}
@Override
public String getValue(JsonConcatAggFunction.Accumulator accumulator) {
if (accumulator.jsonArray.length() > 1) {
accumulator.jsonArray += "]";
return accumulator.jsonArray;
}
return "[]";
}
@Override
public JsonConcatAggFunction.Accumulator createAccumulator() {
return null;
}
public void accumulate(Accumulator acc, String id, String matcherId, String readFlag, String employeeCode, String step) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("id", id);
jsonObject.addProperty("matcherId", matcherId);
jsonObject.addProperty("readFlag", readFlag);
jsonObject.addProperty("employeeCode", employeeCode);
jsonObject.addProperty("step", step);
if (acc.jsonArray.length() > 1) {
acc.jsonArray += ",";
}
acc.jsonArray += jsonObject.toString();
}
}
user25870902 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.