I am new to PySpark, and facing an issue while consuming data from the Azure eventhub. I am unable to deserialize the consumed data. I see only null values upon deserializing data using the schema. Please find the below schema, eventhub message, and code that I am using to consume data and let me know how can I resolve this issue. Thanks in advance.
#Sample Eventhub message:
EventHubOverrideMessage(gtin13=00******0010, sourceId=0******5, lastUpdateTimestamp=2024-07-09T12:45:00.009805, lastUpdatedUser=null, inStore=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), pickup=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), delivery=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), ship=null)
# Deserialization Schema: (Tried replacingTimestampType with stringtype)
StructType([
StructField("gtin13", StringType(), True),
StructField("sourceId", StringType(), True),
StructField("lastUpdateTimestamp", TimestampType(), True),
StructField("lastUpdatedUser", StringType(), True),
StructField("inStore", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("pickup", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("delivery", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("ship", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True)
])
# Actual Schema:
{
"title": "schema",
"type": "object",
"properties": {
"gtin13": {
"type": "string",
"pattern": "^[0-9]{13}$"
},
"sourceId": {
"type": "string",
"pattern": "^[0-9]{8}$"
},
"lastUpdateTimestamp": {
"type": "string",
"format": "date-time"
},
"inStore": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"pickup": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": "string"
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"delivery": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"ship": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
}
},
"required": [
"gtin13",
"sourceId",
"lastUpdateTimestamp"
]
}
#CODE:
kafkaDF = (
spark.readStream.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrap_server,
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",EVENTHUB_CONNECTION_STRING)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("startingOffsets", "earliest")
.option("mode", "PERMISSIVE")
.load()
.withColumn("value", col("value").cast(StringType()))
)
print("Schema of Kafka DataFrame:")
kafkaDF.printSchema()
# Deserialize JSON data
deserialized_stream = kafkaDF.withColumn(
"data", F.from_json(col("value").cast("string"), json_schema)
)
parsed_df = deserialized_stream.select("data.*")
# Display the parsed DataFrame
display(deserialized_stream)
# PRINT SCHEMA OUTPUT:
Schema of Kafka DataFrame: root |-- key: binary (nullable = true) |-- value: string (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
# Deserialized Output:
{"gtin13":null,"sourceId":null,"lastUpdateTimestamp":null,"inStore":null,"pickup":null,"delivery":null,"ship":null}
New contributor
Ashish Vemula is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.