I am trying to deserialize messages written in confluent avro format.
I know we can use abris or remove magic bytes but Is there a way we can do it without any of these two methods?
Below is what I am trying but with no luck
`
import java.nio.file.{Files, Paths}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.streaming.Trigger
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object ReadFromKafka {
System.setProperty("hadoop.home.dir", "C:\hadoop")
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("ConfluentConsumer").setMaster("local[*]")
val spark = SparkSession.builder()
.appName("AvroKafkaStructuredStreaming")
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val kafkaBootstrapServers = "bootstrapserver"
val kafkaTopic = "topic"
val schemaRegistryUrl = "schemaregistryurl:8081"
// Function to fetch Avro schema from Schema Registry
def fetchAvroSchema(topic: String, schemaRegistryUrl: String, version: String): Schema = {
val client = HttpClients.createDefault()
val url = s"$schemaRegistryUrl/subjects/$topic-value/versions/$version/schema"
println("url--"+url)
val request = new HttpGet(url)
val response = client.execute(request)
val jsonSchema = EntityUtils.toString(response.getEntity)
client.close()
new Parser().parse(jsonSchema)
}
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 10)
.option("value.deserializer", classOf[KafkaAvroDeserializer].getName)
.option("schema.registry.url", schemaRegistryUrl)
.option("mode", "PERMISSIVE")
.load()
import org.apache.kafka.clients.consumer.ConsumerConfig
// Fetch Avro schema dynamically
val avroSchema = fetchAvroSchema(kafkaTopic, schemaRegistryUrl, "latest").toString()
println("avroSchema "+avroSchema)
val deserializedDF = df.select(from_avro($"value", avroSchema).as("data"))
// val deserializedDF = df.select(from_avro(expr("substring(value, 6)"), avroSchema)) // this works
deserializedDF.show(false)
/* val query = deserializedDF.writeStream
.outputMode("append")
.format("console")
// .trigger(Trigger.ProcessingTime("10 seconds"))
.start()*/
//query.awaitTermination()
}
}
`
below is the error that I am getting:
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -5057
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
spark version used: 3.12
scala : 2.12.14
below is my build.sbt file:
scalaVersion := "2.12.14"
val sparkVersion = "3.1.2"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.2"
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "7.5.1"
libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.1.2"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %"compile",
"org.apache.spark" %% "spark-sql" % sparkVersion % "compile"
)
dependencyOverrides ++= {
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.3",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.12.3",
"com.fasterxml.jackson.core" % "jackson-core" % "2.12.3",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.12.3"
)
}``