I am running Spark jobs on Azure Synapse Analytics. The notebook reads and writes data from Azure Data Lake Storage Gen 2 account (same storage, however, read and write happens at different paths). It processes CSV data (large chunk) and small reference data (parquet/CSV) and writes final output in parquet format. The larger dataset of CSV format is stored as 200 partition files.
The notebook job is using 980 vCores of Spark compute cluster (each node has 4 vCores and 28 GB memory). Overall following code logic is applied:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.sql.shuffle.partitions",200)
// Read the data from ADLS Gen2
// Do computations
// Write output
df = (outputDf
.repartition(980)
.write
.mode("overwrite")
.parquet(outputPath))
The executor error logs indicate following error:
2024-12-13 04:29:22,453 WARN TaskSetManager [task-result-getter-3]: Lost task 998.0 in stage 13.0 (TID 5210) (vm-d8e97779 executor 118): org.apache.spark.SparkException: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: org.apache.spark.util.TaskCompletionListenerException: null
Previous exception in task: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
org.apache.spark.scheduler.Task.run(Task.scala:139)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:172)
... 7 more
Suppressed: java.lang.NullPointerException
at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.close(VegasPipeInputStream.java:806)
at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
at org.apache.hadoop.util.LineReader.close(LineReader.java:152)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:281)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:69)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:71)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2(CSVDataSource.scala:97)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2$adapted(CSVDataSource.scala:97)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
... 10 more
Caused by: java.io.IOException: Could not open pipe [/mnt/vegas/pipes/2575cfd8-c0e5-43d6-b86d-21f829a05087.pipe, pos=0,blocksRead=0; bytesRead=0; availInPipe=0]
Vegas Service: Context=c18dceff-0a3e-4be0-9dfa-81200dbe71ce, Temporarily Unsupported Scenario: Line Separator not in initial block of partition
at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.pipelineThread(VegasPipeInputStream.java:536)
... 3 more
I would like to understand the root cause of this issue. Because often retries or changing spark pool helps, and business logic works fine. However, intermittently these errors keep coming and notebook does not finish successfully in those cases.
2