I try to implement the Flink kafka source with pyflink codes. First, the deserialization class is
from pyflink.common.serialization import DeserializationSchema
from pyflink.common.typeinfo import Types, TypeInformation
class CustomCsvDeserializationSchema(DeserializationSchema):
def deserialize(self, value):
fields = value.decode('utf-8').split(',')
return fields
def is_end_of_stream(self, next_element):
return False
def get_produced_type(self):
type_info = Types.ROW([Types.SQL_DATE(), Types.FLOAT(), Types.STRING()
, Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
return TypeInformation.of(type_info)
And the next codes are the Kafka Source codes
source = KafkaSource.builder()
.set_bootstrap_servers(kafka_brokerlist)
.set_topics(kafka_topic)
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(CustomCsvDeserializationSchema())
.build()
But the error is
py4j.protocol.Py4JJavaError: An error occurred while calling o10.fromSource.
: java.lang.NullPointerException: Cannot invoke "org.apache.flink.api.common.serialization.DeserializationSchema.getProducedType()" because "this.deserializationSchema" is null
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:229)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2686)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2056)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:833)
I think the deserialization schema is not generated from the codes, so the value is null. How can I fix this error? Any reply will be thanksful.