I’m working on a PyFlink application that consumes events from a Kafka topic (wikipedia-events) and separates these events based on whether they were made by a bot or a human. The goal is to filter the events into two separate Kafka topics (bot-edits and human-edits). However, when I try to run my PyFlink pipeline, I encounter an error related to the Kafka sink’s serialization. Specifically, I get the following error message:
Traceback (most recent call last):
File "/home/jason/docker-tests/flink/human_bot_flink.py", line 6, in <module>
from pyflink.datastream import KafkaSerializationSchema
ImportError: cannot import name 'KafkaSerializationSchema' from 'pyflink.datastream' (/home/jason/anaconda3/envs/flink-env/lib/python3.11/site-packages/pyflink/datastream/__init__.py)
Code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
import json
from pyflink.table import TableEnvironment, EnvironmentSettings
def parse_event(json_string):
"""Parse Kafka JSON string into a Python dictionary."""
try:
return json.loads(json_string)
except json.JSONDecodeError:
return None # Return None or handle error as appropriate
if __name__ == "__main__":
# Initialize the Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance()
.in_streaming_mode()
.build()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///home/jason/docker-tests/flink/flink-sql-connector-kafka-3.2.0-1.18.jar"
)
# Kafka source configuration (input topic)
kafka_source = KafkaSource.builder()
.set_bootstrap_servers("localhost:39092")
.set_topics("wikipedia-events")
.set_group_id("flink-consumer-group")
.set_value_only_deserializer(SimpleStringSchema())
.build()
# Kafka sink configurations (output topics)
bot_sink = KafkaSink.builder()
.set_bootstrap_servers("localhost:39092")
.set_record_serializer(SimpleStringSchema())
.set_topics("bot-edits")
.build()
human_sink = KafkaSink.builder()
.set_bootstrap_servers("localhost:39092")
.set_record_serializer(SimpleStringSchema())
.set_topics("human-edits")
.build()
# Create a stream from the Kafka source
data_stream = env.from_source(
kafka_source, watermark_strategy=None, source_name="KafkaSource")
# Parse the incoming JSON events
parsed_stream = data_stream.map(parse_event)
# Filter out any None values from malformed events
parsed_stream = parsed_stream.filter(lambda event: event is not None)
# Filter events into bot and human streams
bot_edits = parsed_stream.filter(
lambda event: event.get('user_type') == 'bot')
human_edits = parsed_stream.filter(
lambda event: event.get('user_type') == 'human')
# Write filtered streams to separate Kafka topics
bot_edits.sink_to(bot_sink)
human_edits.sink_to(human_sink)
# Execute the Flink pipeline
env.execute("Separate Bot and Human Edits")
What I’ve Tried:
I’ve checked the PyFlink documentation and tried to use SimpleStringSchema as the serializer for the Kafka sink, but I still encounter issues.
I’ve tried different configurations for the Kafka sink, but the issue persists.
Questions:
How can I properly configure the Kafka sink in PyFlink to serialize the events without encountering the ImportError?
Is there another approach for serializing events when using KafkaSink in PyFlink?
Can you provide a working example of how to properly serialize and write data to Kafka in PyFlink?