I’m working on a Spark Structured Streaming application where I’m reading data from a Kafka topic, performing some transformations, and then writing the results back to another Kafka topic. However, I’m encountering an error when trying to start the streaming query. The error message I’m receiving is:
[STREAM_FAILED] Query terminated with exception: 'scala.collection.Seq org.apache.spark.sql.types.StructType.toAttributes()'
Here’s an overview of my setup:
- Both master and worker and the jupyter with pyspark environment are running in separate docker containers.
- Spark version: 3.4.0
- Scala version: 2.12
- Kafka version: 7.6.1 (confluentinc/cp-kafka:latest)
- Schema definition:
schema = StructType([ StructField("user", StringType()), StructField("int_value", IntegerType()), ])
- Sample data from the Kafka topic:
{ "user": "user2", "int_value": 78 }
I’ve verified that the schema matches the structure of the JSON data in the Kafka topic. However, I’m still encountering this error.
Code for my script (I am running it on a Jupyter Notebook). Which is trying to read data from a kafka topic and stream it to another topic (no processing implemented yet). I haven’t also been able to write data to a json file on disk:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType
from pyspark.sql.functions import from_json,col
import uuid
import os
import time
SPARK_VERSION = '3.4.0'
SCALA_VERSION = '2.12'
SPARK_MASTER = "spark://spark-master:7077"
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_INPUT_TOPIC = "event-window"
KAFKA_OUTPUT_TOPIC = "event-window-output"
CHECKPOINT_LOCATION = f"./tmp/event-window-output/{uuid.uuid4()}"
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION}',
'org.apache.kafka:kafka-clients:3.3.2'
]
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")
try:
spark_session = SparkSession.builder
.master(SPARK_MASTER)
.appName("SparkStructuredStreaming")
.config("spark.jars.packages",",".join(packages))
.getOrCreate()
spark_session.sparkContext.setLogLevel("DEBUG")
logging.info('Spark session created successfully')
except Exception:
logging.error("Couldn't create the spark session")
try:
df = spark_session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_INPUT_TOPIC)
.option("startingOffsets", "latest")
.load()
logging.info("Initial dataframe created successfully")
except Exception as e:
logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")
df = df.selectExpr("CAST(value as STRING)", "timestamp")
df.printSchema()
schema = StructType([
StructField("user", StringType()),
StructField("int_value", IntegerType()),
])
df = df.select(
from_json(col("value"), schema).alias("info"), "timestamp"
)
df = df.select("info.*", "timestamp")
df.printSchema()
result = (
df.writeStream
.outputMode("append")
.format("kafka")
.option("topic", KAFKA_OUTPUT_TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("checkpointLocation", CHECKPOINT_LOCATION)
.start()
.awaitTermination()
)
Full error trace:
---------------------------------------------------------------------------
StreamingQueryException Traceback (most recent call last)
Cell In[1], line 72
61 df = df.select("info.*", "timestamp")
62 df.printSchema()
64 result = (
65 df.writeStream.trigger(processingTime="10 seconds")
66 .outputMode("append")
67 .format("kafka")
68 .option("topic", KAFKA_OUTPUT_TOPIC)
69 .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
70 .option("checkpointLocation", CHECKPOINT_LOCATION)
71 .start()
---> 72 .awaitTermination()
73 )
File /usr/local/spark/python/pyspark/sql/streaming/query.py:221, in StreamingQuery.awaitTermination(self, timeout)
219 return self._jsq.awaitTermination(int(timeout * 1000))
220 else:
--> 221 return self._jsq.awaitTermination()
File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
181 converted = convert_exception(e.java_exception)
182 if not isinstance(converted, UnknownException):
183 # Hide where the exception came from that shows a non-Pythonic
184 # JVM exception message.
--> 185 raise converted from None
186 else:
187 raise
StreamingQueryException: [STREAM_FAILED] Query [id = 848c3d42-7794-4836-9818-cb6cdc3c361c, runId = 6934b4a7-874c-4deb-a93d-2f26273d5e93] terminated with exception: 'scala.collection.Seq org.apache.spark.sql.types.StructType.toAttributes()'
I have tried to ensure that the output topic exists before publishing, but the results are still the same.
Could anyone provide insights into what might be causing this issue? Any suggestions for troubleshooting or potential solutions would be greatly appreciated. Thank you!