I’m currently configuring Vector sinks to send data to an AWS Kinesis/Kafka Stream. My input data consists of JSON objects with nested fields, and I need to use one of these nested fields (specifically _session_id) as the partition key for the Kinesis Stream. Here is a sample of the data:
{
"timestamp": "2024-04-12T14:48:13.846548459Z",
"other fields": "xyz"
"data": [
{
"event_type": "some_event",
"attributes": {
"_session_id": "3bcf74c1-20240412-132033990",
"_session_start_timestamp": 1712928033990,
}
}
],
...
}
Previously, I had the partition_key_field
set to "timestamp"
, and the configuration was working fine and sank my data to kinesis correctly. However, I now need to use a nested field, specifically "data[0].attributes._session_id"
, as the partition key.
In my scenario, the data field comes in as a list, but only ever contains 1 list item in the list which is why I can safely use data[0] to attempt to parse that record from the list. When I made this adjustment, the parsing failed with an error indicating that the partition key does not exist.
2024-04-12T15:32:21.306393Z ERROR sink{component_kind="sink" component_id=kinesis_sink component_type=aws_kinesis_streams}: vector::internal_events::aws_kinesis: Partition key does not exist. partition_key_field=data[0].attributes._session_id error_type="parser_failed" stage="processing" internal_log_rate_limit=true
Here’s a snippet of my current configuration(similar one for Kafka):
[sinks.kinesis_sink]
type = "aws_kinesis_streams"
inputs = ["json_parser"]
partition_key_field = "data[0].attributes._session_id"
compression = "none"
region = "%%AWS_REGION%%"
stream_name = "%%AWS_KINESIS_STREAM_NAME%%"
acknowledgements.enabled = true
request_retry_partial = true
request.retry_attempts = 4
request.retry_max_duration_secs = 3
[sinks.kinesis_sink.encoding]
codec = "json"
and:
[sources.nginx_http_post]
type = "http_server"
address = "0.0.0.0:8685"
strict_path = false
headers = [ "X_Uri", "X_User_Agent", "X_Forwarded_For", "X_Date", "X_Request_ID", "X_Method"]
query_parameters = [ "platform", "appId", "compression", "fakeIp", "upload_timestamp" ]
method = "POST"
encoding = "binary"
[transforms.json_parser]
inputs = ["nginx_http_post"]
type = "remap"
source = '''
.date = del(.X_Date)
.uri = del(.X_Uri)
.ua = del(.X_User_Agent)
if get_env_var!("DEV_MODE") == "Yes" && !is_null(.fakeIp) {
.ip = del(.fakeIp)
} else {
.ip = del(.X_Forwarded_For)
}
if !is_null(.upload_timestamp) {
.client_timestamp = del(.upload_timestamp)
}
.rid = del(.X_Request_ID)
.method = del(.X_Method)
.data = del(.message)
.ingest_time = to_unix_timestamp(now()) * 1000
.server_ingest_time = to_unix_timestamp(now()) * 1000
'''
Is there a way to configure Vector sinks to properly parse nested fields for the Kinesis Stream partition key?
P.S, no, I cannot modify the transformation before the sink to move the session Id to the top level of the JSON as this will affect my data schema, which is an unacceptable modification.
Any insights or suggestions would be greatly appreciated. Thank you!