I would like to read Avro records (in my Flink job) from a File source, without supplying a schema.
This comment at the top of AvroInputFormat
in the package org.apache.flink.formats.avro
is exactly what I need:
/**
* Provides a {@link FileInputFormat} for Avro records.
*
* @param <E> the type of the result Avro record. If you specify {@link GenericRecord} then the
* result will be returned as a {@link GenericRecord}, so you do not have to know the schema
* ahead of time.
*/
This is how I invoke it (I tried both Scala and Java):
val avroInputFormat = new AvroInputFormat(new Path("s3://.../records.avro"), classOf[GenericRecord])
env.createInput(avroInputFormat)
...
Unfortunately, all attempts to read Avro records using this FileInputFormat
failed with an UnsupportedOperationException
. Searching various sites online lead me to conclude that some member of the GenericRecord
isn’t serializable.
Has anyone had any success in retrieving GenericRecord
s from an Avro file, without having access to a schema? If it’s relevant, I am not reading from a Kafka topic, but an S3 bucket (so it’s a File source).
Here is part of the exception trace:
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
Thank you for any help you can offer in guiding me to a resolution.
If my interpretation of the comment is incorrect (i.e., there isn’t a way to do what I’m trying to accomplish), please do let me know.