I am trying to produce Avro-encoded messages to Kafka using PySpark Structured Streaming. My schema is registered in the Confluent Schema Registry. However, I encounter an InvalidRecordException when the schema validation is turned on.
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, col
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.types import StructType, StructField, StringType
# Create SparkSession
spark = SparkSession.builder
.appName("KafkaAvroSchemaProducer")
.getOrCreate()
# Define the schema for the DataFrame
schema = StructType([
StructField("name", StringType(), True)
])
# Define sample message
sample_message = {"name": "John"}
# Create DataFrame with the sample message
sample_df = spark.createDataFrame([sample_message], schema=schema)
# Convert DataFrame to Avro format
avro_df = sample_df.select(
to_avro(struct("*")).alias("value") # Convert to Avro format
).selectExpr("CAST(NULL AS STRING) AS key", "value")
# Kafka producer configuration
producer_config = {
'kafka.bootstrap.servers': 'your_kafka_broker:9092',
'topic': 'sample_topic',
'schema.registry.url': 'http://your_schema_registry:8081'
}
# Write DataFrame to Kafka in Avro format
avro_df.write
.format("kafka")
.option("kafka.bootstrap.servers", producer_config['kafka.bootstrap.servers'])
.option("topic", producer_config['topic'])
.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.option("schema.registry.url", producer_config['schema.registry.url'])
.save()
# Stop the Spark session
spark.stop()
Schema from Schema Registry:
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"}
]
}
Exception:
InvalidRecordException: The record is rejected by the record interceptor
Is there something wrong with how I’m serializing the data to Avro format using to_avro? Is there any additional configuration needed for schema validation to pass?