I’m trying to show Avro data from Kafka to console using DataStream and catch Exception
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option ‘mode’ as ‘PERMISSIVE’.“`
How can I fix it? my config
public static void main(String[] args) throws StreamingQueryException, IOException, TimeoutException, RestClientException {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", AvroDeserializer.class);
kafkaParams.put("value.deserializer", AvroDeserializer.class);
kafkaParams.put("group.id", "sparkGroupId");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
SparkSession spark = SparkSession.builder().appName("Hello Spark").master(
"local").getOrCreate();
System.out.println("Hello, Spark v." + spark.version());
spark.sparkContext().setLogLevel("ERROR");
Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "j_client")
.option("startingOffsets", "earliest")
.load();
String schemaRegistryAddr = "http://localhost:8081";
create RestService object
var restService = new RestService(schemaRegistryAddr);
// Create REST service to access schema registry and retrieve topic schema (latest)
var valueRestResponseSchema = restService.getLatestVersion("j_client" + "-value");
var jsonSchema = valueRestResponseSchema.getSchema();
var jClientDF =df.select(
col("key").cast("string"), // cast to string from binary value
from_avro(col("value"), jsonSchema).as("j_client"), // convert from avro value
col("topic"),
col("offset"),
col("timestamp"),
col("timestampType"))
;
jClientDF.printSchema();
Stream data to console for testing
jClientDF
.writeStream()
.format("console")
.start()
.awaitTermination()
;