Problem:
I’m encountering an error when trying to use windowing with Avro’s GenericRecord
in Apache Beam. The error I’m getting is:
Code Snippet:
Here’s the code that produces the error:
PCollection<KV<KV<GenericRecord, GenericRecord>, Boolean>> windowedOutput = inputRecords
.apply(Window.<KV<KV<GenericRecord, GenericRecord>, Boolean>>into(FixedWindows.of(Duration.standardMinutes(1))))
.setCoder(KvCoder.of(
KvCoder.of(
AvroCoder.of(GenericRecord.class, keySchema),
AvroCoder.of(GenericRecord.class, valueSchema)
),
BooleanCoder.of()
));
This throws the AvroRuntimeException
mentioned above when applied.
Working Example:
The following similar code (without windowing) works just fine:
PCollection<KV<GenericRecord, GenericRecord>> outputRecords = input
.apply(ParDo.of(new ConvertToAvroFn(keySchema, valueSchema)))
.setCoder(KvCoder.of(
AvroCoder.of(GenericRecord.class, keySchema),
AvroCoder.of(GenericRecord.class, valueSchema)
));
[ERROR] Encountered an exception, shutting down the process {}
org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:396) ~[avro-1.10.2.jar:1.10.2]
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:726) ~[avro-1.10.2.jar:1.10.2]
at org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:328) ~[avro-1.10.2.jar:1.10.2]
at java.lang.ClassValue.getFromHashMap(ClassValue.java:227) ~[?:1.8.0_282]
at java.lang.ClassValue.getFromBackup(ClassValue.java:209) ~[?:1.8.0_282]
at java.lang.ClassValue.get(ClassValue.java:115) ~[?:1.8.0_282]
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:339) ~[avro-1.10.2.jar:1.10.2]
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:163) ~[beam-sdks-java-core-2.45.22.jar:?]
at com.example.pipeline.CustomSink.applyCustomCheck(CustomSink.java:247)
at com.example.pipeline.CustomSink.expand(CustomSink.java:219)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:551)
2
The constructor AvroCoder.of
expects to receive your data class rather than GenericRecord. In other words if you can, write your pipeline fragment like so:
PCollection<KV<KV<MyKeyClass, MyValueClass>, Boolean>> windowedOutput = inputRecords
.apply(Window.<KV<KV<MyKeyClass, MyValueClass>, Boolean>>into(FixedWindows.of(Duration.standardMinutes(1))))
.setCoder(KvCoder.of(
KvCoder.of(
AvroCoder.of(MyKeyClass.class, keySchema),
AvroCoder.of(MyValueClass.class, valueSchema)
),
BooleanCoder.of()
));
1