this one has stumped me for a few days… I have a Postgres server with a table to which a deployed Debezium instance is connected, recording CDC. Debezium serialises using Avro and Confluent Schema Registry.
I have successfully built a Delta Live Table pipeline in Databricks that deserialises the Avro messages using Schema Registry. The issue is, the Postgres table has TIMESTAMPTZ
columns. Thus, Debezium serialises data in these columns as ISO-8601 strings (e.g. 2024-09-16T09:17:35.621000Z
) – to preserve the time zone information. Below is an example field from the Avro schema:
{
"name": "created",
"type": {
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
}
The resultant Delta Live Table has the created
column as type StringType
not TimestampType
. I do not care about timezone information; I only care for UTC timestamps. Now, according to the Spark docs, Avro fields are automatically interpreted to be of TimestampType
when "type": "long"
and "logicalType": "timestamp-micros"
. However, as mentioned, because the Postgres column is of type TIMESTAMPTZ
, the connector serialises the data as strings, not as longs. My goal is for the resultant DLT to recognise ISO-8601 columns as being of type TimestampType
.
Basically, I want Databricks to recognise that fields with "connect.name": "io.debezium.time.ZonedTimestamp"
should be treated as TimestampType
. The DLT pipeline code I am using is below:
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, explode, flatten, split, lit
from confluent_kafka.schema_registry import SchemaRegistryClient
def create_dlt_name(topic_name: str) -> str:
prefix, database, schema, table = topic_name.split('.')
return f"{database}__{schema}__{table}"
def create_dlt_for_topic(topic_name):
dlt_name = create_dlt_name(topic_name)
schema_registry_subject = f"{topic_name}-value"
@dlt.table(
name=dlt_name
)
def t():
cdc_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", topic_name)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.load()
)
cdc_deserialised_df = cdc_df.select(
from_avro(
data=col("value"),
subject=schema_registry_subject,
schemaRegistryAddress=SCHEMA_REGISTRY_URL
).alias("deserialised_value")
)
transposed_df = cdc_deserialised_df.select(
col("deserialised_value.after.*"),
col("deserialised_value.op").alias("cdc_operation"),
col("deserialised_value.source.lsn").alias("cdc_log_sequence_number")
)
return transposed_df
for topic_name in CDC_TOPIC_NAMES:
create_dlt_for_topic(topic_name)
I have tried using Kafka Connect SMTs to convert timestamps, but as per this question, you have to explicitly declare each field that you want to convert – which I want to avoid. If I was happy to do this, I’d be better off just calling to_timestamp
on the known columns to create the final DLT.