I’m developing a clickstream project that collects user events and stores them within HDFS.
You can see the project architecture on the diagram:
1. The Collector collects events via HTTP API and sends them into a Kafka topic.
2. Streaming Ingest Job, is a spark streaming project that consumes events from the topic and writes into HDFS (into /raw directory).
3. You can see batch job tasks on the diagram:
3.a) prepare_raw_events: it moves all .json files from “/raw” dir into “/stage” dir.
3.b) process_raw_events: it is a spark app that reads json files from “/stage”, process it and then writes processed data into “/final” dir.
3.c) archive_raw_events: it moves all .json files from “/stage” dir into “/archive” dir.
The problem is, for example, imagine we have event records and the stream job fetches those events (when triggered) and writes them into HDFS as a JSON file. While writing the data the batch job starts and moves “raw” data. And then the “streaming job” fails because can’t find the target file. You can find the error as follows:
...
24/12/23 14:29:00 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor-1, groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor] Seeking to offset 2800818 for partition clickstream-dev-topic-0
24/12/23 14:29:00 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor-1, groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor] Seeking to earliest offset of partition clickstream-dev-topic-0
24/12/23 14:29:00 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor-1, groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor] Resetting offset for partition clickstream-dev-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
24/12/23 14:29:00 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor-1, groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor] Seeking to latest offset of partition clickstream-dev-topic-0
24/12/23 14:29:00 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor-1, groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor] Resetting offset for partition clickstream-dev-topic-0 to position FetchPosition{offset=5009235, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
24/12/23 14:29:01 ERROR Utils: Aborting task
java.io.FileNotFoundException: File does not exist: hdfs://hadoop:9000/data/raw/part-00000-57d7fbce-6c1a-4e79-a143-87cba8045438-c000.json
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1757)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.$anonfun$commitTask$1(ManifestFileCommitProtocol.scala:142)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:142)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$2(FileFormatWriter.scala:252)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
24/12/23 14:29:01 ERROR FileFormatWriter: Job job_202412231428034161621243849097079_0025 aborted.
24/12/23 14:29:01 INFO KafkaDataConsumer: From Kafka topicPartition=clickstream-dev-topic-0 groupId=spark-kafka-source-688bfeff-890b-4eec-ba8d-759efafe5e78--422712287-executor read 1042339 records through 2289 polls (polled out 1042736 records), taking 7423334326 nanos, during time span of 57213734314 nanos.
24/12/23 14:29:01 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 25)
org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to hdfs://hadoop:9000/data/raw.
...
Just wondering, wouldn’t Spark first write the records to a temp file and then write them to the final file? How does the batch job move the unfinished file?
You can find the implementation of the project over here.