I am working on a PySpark Structured Streaming job that reads data from a Kafka topic and processes it in real-time. I want to implement a graceful shutdown using signal handling, but I’m encountering an error when I try to stop the streaming query.
code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder
.appName("KafkaConsumer1")
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
consumer_group_id = "consumer-group-1"
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("assign", '{"sample-topic":[0]}')
.option("startingOffsets", "latest")
.option("kafka.group.id", consumer_group_id)
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", f"{checkpoint_dir}/c1_consumer")
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
`
When I press Ctrl + C to initiate a graceful shutdown, I get the following error:
ERROR:
root: Exception while sending command.
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=3>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "/usr/lib/python3.8/socket.py", line 669, in readinto
return self._sock.recv_into(b)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 381, in signal_handler
self.cancelAllJobs()
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 2446, in cancelAllJobs
self._jsc.sc().cancelAllJobs()
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o27.sc
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Can someone help me understand why this error occurs and how to fix it?