Here is my Python code where I’m trying to produce a message with my custom setup, but I’m encountering the mentioned error. Could anyone please help me with creating the correct set_key_serialization_schema?
import json
import os
from pyflink.common import Types, WatermarkStrategy, TypeInformation
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource,
KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema, KafkaRecordSerializationSchemaBuilder
from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema,SerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
class CustomKeySerializationSchema(SerializationSchema):
def serialize(self, element):
data = json.loads(element)
order_id = data.get('orderid', '123')
return order_id
if __name__ == '__main__':
print("Setting up Streaming enviornment.")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(r"file:///C:projectsFlink-POCDEMOpathtoyourscriptsflink-sql-connector-kafka-3.1.0-1.18.jar")
print("Setting parallelism to 1.")
env.set_parallelism(1)
type_info = Types.ROW_NAMED(
["orderid"],
[Types.INT()]
)
deserialization_schema = JsonRowSerializationSchema.Builder()
.with_type_info(type_info)
.build()
user=os.environ['user']
passwd=os.environ['password']
print("Extracing data from kafka")
source = KafkaSource.builder()
.set_bootstrap_servers(os.environ['bootstrap_servers'])
.set_property("security.protocol", "SASL_SSL")
.set_property("sasl.mechanism", "PLAIN")
.set_property("sasl.jaas.config",
f"org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username={user} password={passwd};")
.set_topics("Flink_test")
.set_group_id("my-group")
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
print("Storing data in data stream")
print("Sending Data to kafka")
sink = KafkaSink.builder()
.set_bootstrap_servers("pkc-921jm.us-east-2.aws.confluent.cloud:9092")
.set_property("security.protocol", "SASL_SSL")
.set_property("sasl.mechanism", "PLAIN")
.set_property("sasl.jaas.config",
f"org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username={user} password={passwd};")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("dvaghela")
.set_key_serialization_schema(CustomKeySerializationSchema())
.set_value_serialization_schema(SimpleStringSchema())
.build()
).set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()
print("Transofrming the data and make sink to destination kafka topic")
stream.sink_to(sink)
print("Executing the job........")
env.execute()
Error
Traceback (most recent call last):
File "C:projectsFlink-POCDEMOkafka-key.py", line 96, in <module>
.set_key_serialization_schema(CustomKeySerializationSchema())
File "C:UsersdeepvPycharmProjectspythonProject.venvlibsite-packagespyflinkdatastreamconnectorskafka.py", line 1136, in set_key_serialization_schema
self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema)
File "C:UsersdeepvPycharmProjectspythonProject.venvlibsite-packagespy4jjava_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "C:UsersdeepvPycharmProjectspythonProject.venvlibsite-packagespyflinkutilexceptions.py", line 146, in deco
return f(*a, **kw)
File "C:UsersdeepvPycharmProjectspythonProject.venvlibsite-packagespy4jprotocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o52.setKeySerializationSchema.
: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.setKeySerializationSchema(KafkaRecordSerializationSchemaBuilder.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
enter image description here
enter image description here
key and value are the same, but i want to assing my custom key from the data
New contributor
Deep Vaghela is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.