I have a use case where I have to read data from Kafka and write to a Sink . The data in kafka is in avro and the fields are wrapped in an avro map. The map will not have same keys always and will vary with the type of data. I have to flatten this map i.e. convert each key to a column and its a value is a value in the column. I have a standard schema to flatten this to before writing to Sink. I am writing this data in delta format / parquet on the Sink How can i approach this?
Avro schema for Kafka
{
"type": "record",
"name": "MapExample",
"fields": [
{
"name": "mapOfStrings",
"type": {
"type": "map",
"values": "string"
}
}
]
}
Schema for disk
List(
StructField("field1", StringType, nullable = true),
StructField("field2", IntegerType, nullable = true),
StructField("field3", DoubleType, nullable = true),
StructField("field4", LongType, nullable = true),
StructField("field5", BooleanType, nullable = true)
)
)
Has anyone used cast method to convert to schema? Below code is what i am attempting to do but I’m not sure if this is a good approach or are there any other ways of doing what I’m trying to do? I’m looking for an optimized way to do this operation in Apache Spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.types._
def selectAvroMap(spark: SparkSession, sourceFormat: String, sourceOptions: Map[String, String], avroSchema: String, schema: StructType): DataFrame = {
val readDF = spark.readStream.format("kafka").load
val readDF1 = readDF.select(from_avro(col("value"), avroSchemaLocation) as 'xyz)
val mapDF = readDF1.select("xyz.mapOfStrings.col1", "xyz.mapOfStrings")
// Convert the tags map to a DataFrame using the provided schema
val flattenedDF = mapDF.select(col("col1"), col("mapOfStrings").cast(tagsSchema))
flattenedDF
}