I’ve set up an HTTP Sink Connector with URL key substitution, but the key isn’t substituting correctly. Instead of substituting “14Y0354214”, it is being substituted as “%5BB@5a95cb0c”.
Stream:
CREATE OR REPLACE STREAM `delete_sync`(`id` VARCHAR KEY, `value` VARCHAR)
WITH (
KAFKA_TOPIC = 'delete.sync',
KEY_FORMAT='JSON',
VALUE_FORMAT='KAFKA'
);
INSERT INTO `delete_sync`
SELECT
CAST(`Member`['id'] AS STRING) AS `id`,
CAST(NULL AS VARCHAR) AS `value`
FROM
`sync_transform`
WHERE
`EventType` = 'MemberDeleted'
PARTITION BY CAST(`Member`['id'] AS STRING);
Connector Config:
{
"connector.class": "HttpSink",
"header.separator": "|",
"headers": "apikey:xxxxxxxxxx",
"http.api.url": "https://domain/api/${key}",
"input.data.format": "JSON",
"kafka.auth.mode": "SERVICE_ACCOUNT",
"kafka.service.account.id": "xxxxxxxxxxxxx",
"name": "delete.sync",
"request.body.format": "json",
"retry.on.status.codes": "500-",
"tasks.max": "6",
"topics": "delete.sync",
"max.poll.records": "100",
"behavior.on.null.values": "delete",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
}
Key as to get substitute correctly