I use spark on EMR with versions: emr-6.13.0, Spark 3.4.1
i try to run a simple spark streaming job that read from kafka and write to memory-table using foreachBatch
and get failure “Error while obtaining a new communication channel”.
detailed error:
Fail to execute line 54: query.awaitTermination()
<some more text>
File "/usr/lib/spark/python/pyspark/errors/exceptions/captured.py", line 175, in deco
raise converted from None
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 0f1c0ee6-5d67-4e10-a239-515c60777efc, runId = 3c4b5b12-0df9-4d45-81e3-b017f71a787b] terminated with exception: Error while obtaining a new communication channel
it’s very important to say that when i don’t use foreachBatch
i can read data from kafka and write to memory without any problem. the error start when i use the foreachBatch
the pyspark script:
%pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, MapType, BooleanType, StructField
KAFKA_TOPIC = "data.eventflow.enriched-events"
KAFKA_BROKERS = "myKafkaBroker"
# Create a Spark Session
spark = SparkSession.builder
.appName("BasicKafkaRead")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
# Define the schema for the JSON data from Kafka
schema = StructType([
StructField("event_id", StringType()),
StructField("event_name", StringType()),
StructField("created_at", StringType()),
StructField("user_id", StringType()),
StructField("game_id", StringType()),
StructField("event_payload", MapType(StringType(), StringType())),
StructField("user_payload", MapType(StringType(), StringType())),
StructField("game_payload", MapType(StringType(), StringType())),
StructField("is_known_bot", BooleanType())
])
# Read from Kafka
kafkaDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
def process_batch(batch_df, batch_id):
# Parse the JSON data
parsed_df = batch_df.selectExpr("CAST(value AS STRING) as jsonValue")
.select(from_json(col("jsonValue"), schema).alias("data"))
# Write batch to a memory table
parsed_df.write.format("memory").queryName("parsed_kafka_data").mode("append").save()
# Apply foreachBatch to process each micro-batch
query = kafkaDataFrame.writeStream
.foreachBatch(process_batch)
.outputMode("append")
.trigger(processingTime="10 seconds")
.start()
query.awaitTermination()