I’m working on a Structured Streaming application in Apache Spark, where I’m reading data from a Kafka topic, performing some transformations, and attempting to write the output to a JSON file. However, when I try to start the streaming query, I’m encountering an error.
Here’s the relevant code snippet:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
if __name__ == "__main__":
spark = SparkSession
.builder
.appName("File Streaming Demo")
.master("local[3]")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
.getOrCreate()
schema = StructType([
StructField("taxiId", StringType()),
StructField("location", StructType([
StructField("lat", StringType()),
StructField("lng", StringType()),
])),
])
kafka_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "map_log_topic")
.option("startingOffsets", "earliest")
.load()
# kafka_df.printSchema()
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
explode_df = value_df.selectExpr("value.taxiId", "value.location.lat", "value.location.lng")
invoice_writer_query = explode_df.writeStream
.format("json")
.queryName("Map logs writer")
.outputMode("append")
.option("path", "output")
.option("checkpointLocation", "chk-point-dir")
.trigger(processingTime="1 minute")
.start()
invoice_writer_query.awaitTermination()
When I execute this code, I’m getting the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o57.start.
: ExitCodeException exitCode=-1073741515:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:1007)
at org.apache.hadoop.util.Shell.run(Shell.java:900)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:677)
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1356)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:809)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:805)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:812)
at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:319)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:67)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:91)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:322)
at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:442)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:407)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1570)
I’ve tried changing the output mode to “complete”, but I get a different error:
pyspark.errors.exceptions.captured.AnalysisException: Data source json does not support Complete output mode.
I’m using Apache Spark version 3.2.1 which uses Scala with version 2.12 and have configured the necessary spark-sql-kafka package (org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1).
Can someone please help me understand what’s causing this issue and how I can resolve it? Any guidance or suggestions would be greatly appreciated.
uday mahajan is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.