UPDATE:
I am now using these Streams:
CREATE OR REPLACE
STREAM INPUT_STREAM_MSD_HEADER_DATA(
header ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
)
WITH (
KAFKA_TOPIC='msd.avro',
VALUE_FORMAT='AVRO'
);
CREATE OR REPLACE
STREAM OUTPUT_STREAM_MSD_HEADER_DATA
WITH (KAFKA_TOPIC='splunk_observability.json', VALUE_FORMAT='JSON', PARTITIONS=1) AS
SELECT
AS_MAP(transform(header, v=> v->key), transform(header, v=>from_bytes(v->VALUE, 'ascii'))) AS `headers`,
'Dev' AS `Env` -- Hardcoded environment value, change to 'QA' or 'PROD' as needed
FROM INPUT_STREAM_MSD_HEADER_DATA
EMIT CHANGES;
Now my output looks like this:
{
"HEADERS": {
"h1": "v1",
"h2": "v2",
"h3": "v3"
}
}
But i want my output to look like this:
{
"h1": "v1",
"h2": "v2",
"h3": "v3"
}
So I am looking for any kind of Function or Lambda expression to achieve this flattening without accessing the fields with indeces.
I am fine with adding more ksqlDB-Streams or FLink-SQL Statements.
Unfortunately, UDFs are no option as they are not supported.
VERSION 1:
I am using Confluent Cloud Kafka and want more observability. Therefore I want to monitor it using Splunk. I have several Topics and want to extract the Header information from the messages, send them to Topic_HEADERS, using ksqlDB, which holds the Header information then. This Topic_B should then be added to a Splunk_Sink_Connector in order to get it into Splunk. Confluent Cloud Kafka is already up and running, as well as the Topics and the Splunk_Sink_Connector. I also have the data in Splunk, but my problem is that I need to flatten the records.
So this is one example ksql-Streams:
CREATE OR REPLACE
STREAM INPUT_STREAM_MSD_HEADER_DATA(
header ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
)
WITH (
KAFKA_TOPIC='msd.avro',
VALUE_FORMAT='AVRO'
);
CREATE OR REPLACE
STREAM OUTPUT_STREAM_MSD_HEADER_DATA
WITH (KAFKA_TOPIC='splunk_observability.json', VALUE_FORMAT='JSON', PARTITIONS=1) AS
SELECT
from_bytes(HEADER[1]->VALUE, 'ascii') AS `Timestamp`,
from_bytes(HEADER[2]->VALUE, 'ascii') AS `serialnumber`,
from_bytes(HEADER[3]->VALUE, 'ascii') AS `SerialNumber`,
from_bytes(HEADER[4]->VALUE, 'ascii') AS `customerid`,
'msd.avro' AS `topicName`,
'DEV' AS `environment` -- Hardcoded environment value, change to 'QA' or 'PROD' as needed
FROM INPUT_STREAM_MSD_HEADER_DATA
EMIT CHANGES;
Note that I want read from different Topics but write to the same Topic. The Fields from the Header are not the same from the different Topics that are read.
And now i got 2 requests:
-
Make it more dynamic:
If the number of Headers changes i have to change the streams every time as the indices are not right any more. -
Use the array like on the “left side” also on the “right side”:
Use something like … AS HEADER[1]→Key like on the left side, but this does not work. Is there any other function/way to do it?
So the perfect solution in pseudo code would look something like this inside the SELECT:
SELECT
for (i=1; 0<header.length;i++){
from_bytes(HEADER[i]->VALUE, 'ascii') AS HEADER[1]→Key,
}
FROM ...
Are there any functions to get funcitonality like that using ksqlDB or will this be added soon?
At the moment user defined functions UDFs should not be used.
I also had a look at https://support.confluent.io/hc/en-us/articles/14541735851284-How-to-query-record-headers-using-ksqlDB and I am aware that this would help with the problem. But i need the data extracted directly. So my output has to look like:
{
keyname1: “value1”,
keyname2: “value2”,
keyname3: “value3”,
…
}
Also using a Flink Statement to achieve this would be possible.
I already tried a lot of different things but did not achieve the wanted functionality.
As said, something like this would be optimal:
SELECT
for (i=1; 0<header.length;i++){
from_bytes(HEADER[i]->VALUE, 'ascii') AS HEADER[1]→Key,
}
FROM ...