I’m new to Flink and Kafka, and I’m trying to create a Kafka sink to send a string in JSON format to Kafka. To achieve this, I need to build a JsonRowSerializationSchema. Here’s what I’ve tried so far:
serialization_schema = JsonRowSerializationSchema.Builder()
.with_type_info(Types.ROW([Types.STRING()]))
.build()
sink = KafkaSink.builder()
.set_bootstrap_servers("localhost:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("preprocessed_data")
.set_value_serialization_schema(serialization_schema)
.build()
)
I know my data is actually a string because my streaming process looks like this:
preprocessed_stream = ds.map(Preprocessing(), output_type=Types.STRING())
preprocessed_stream.sink_to(sink)
env.execute()
However, I keep encountering this error:
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.flink.types.Row (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.flink.types.Row is in unnamed module of loader 'app')
Is there something I am missing? Is my JsonRowSerializationSchema implemented correctly?
I tried using SimpleStringSchema() instead of set_value_serialization_schema, and it works, but this is not the format that I am expecting. Any guidance or tips on how to resolve this issue would be greatly appreciated!
Ania is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.