I am exploring using Apache Hudi HoodieStreamer to ingest protobuf messages from Kafka into Hudi.
Despite a lot of attempts I have hit a roadblock
I get an exception while the HoodieStreamer tries make use of the schema from my locally hosted confluent schema registry
I start the HoodieStreameras follows:
spark-submit
--packages org.apache.hudi:hudi-utilities-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
--jars /home/gaurav/ws/learn/hoodie-delta-streamer/kafka-protobuf-provider-7.6.1.jar
--driver-memory 8g --executor-memory 8g
--class org.apache.hudi.utilities.streamer.HoodieStreamer /home/gaurav/ws/learn/hoodie-delta-streamer/hudi-utilities-bundle_2.12-0.15.0.jar
--props /home/gaurav/ws/learn/hoodie-delta-streamer/kafka/try1/props/kafka-source.properties
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.ProtoKafkaSource
--table-type COPY_ON_WRITE
--target-base-path file:///home/gaurav/ws/learn/hoodie-delta-streamer/db/try1
--target-table vols1
--op UPSERT
--continuous
--source-limit 4000000
--min-sync-interval-seconds 60
The kafka-source.properties passed above is as follows:
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=name
hoodie.datasource.write.partitionpath.field=name
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/vols1-value/versions/latest
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
hoodie.streamer.source.kafka.topic=vols1
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://localhost:8081
The exception I get is:
Caused by: org.apache.hudi.utilities.exception.HoodieSchemaFetchException: Error reading source schema from registry. Please check hoodie.streamer.schemaprovider.registry.url is configured correctly. Truncated URL: http://loc...ons/latest
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to parse schema from registry: syntax = "proto3";
package com.gaurav.data.vol;
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to parse schema from registry: syntax = "proto3";
package com.gaurav.data.vol;
import "google/protobuf/timestamp.proto";
message VolSurface {
optional string name = 1;
repeated .google.protobuf.Timestamp expiry = 2;
repeated double atmVol = 3;
repeated double skew = 4;
}
Caused by: org.apache.hudi.exception.HoodieException: Could not load class org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:64)
at org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:107)
... 11 more
Caused by: java.lang.InstantiationException: org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
at java.base/java.lang.Class.newInstance(Class.java:571)
at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:62)
... 12 more
Caused by: java.lang.NoSuchMethodException: org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter.<init>()
at java.base/java.lang.Class.getConstructor0(Class.java:3349)
at java.base/java.lang.Class.newInstance(Class.java:556)
... 13 more
I hope the use the below line in the properties file . My understanding is that the converter is needed because Hudi needs to convert protobuf message into Avro
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
The NoSuchMethodException on the <init>() method (as opposed to a NoClassDefFoundError) seems to indicate that it is failing to find a no-arg constructor and looking at the code downloaded from github that is true, ie there isnt any no-arg constructor for the class ProtoSchemaToAvroSchemaConverter . I think that there is something basic I am missing and I will be really grateful if you can point me in the right direction,