I am reading close to 1 million rows stored in S3 as parquet files into dataframe (900mb size data in bucket). Filtering the dataframe based on values and then later converting to pandas dataframe . There are 2 UDFs involved (classify and transformDate). I am getting the error eof while running this code snippet . What is wrong with this code? Is it some spark setting which I am missing or is it the improper use of UDF ? Code snippet below
#Import headers skipped for simplicity
findspark.init()
def classify(value):
if float(value) < 0.0 or float(value) >= 10.0:
return -1
return int(float(value) * 2) + 1
def transformDate(dateStr):
date_format = '%d-%b-%Y:%H:%M:%S %Z'
datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format)
return datetime_obj
def read_from_s3():
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.connection.maximum', 100)
conf.set('fs.s3a.threads.max', 50)
conf.set('spark.default.parallelism', 2048)
conf.set('spark.sql.shuffle.partitions',4096)
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
conf.set('spark.sql.streaming.schemaInference','true')
conf.set('spark.rpc.message.maxSize', '1024')
conf.set('spark.executor.memory', '32g')
conf.set('spark.shuffle.file.buffer', '64k')
conf.set('spark.eventLog.buffer.kb', '200k')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set('spark.driver.memory', '32g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set('spark.worker.cleanup.enabled', True)
conf.set('spark.executor.heartbeatInterval', '43200s')
conf.set('spark.network.timeout', '3000000s')
conf.set('spark.hadoop.fs.s3a.access.key', '<<>>')
conf.set('spark.hadoop.fs.s3a.secret.key', '<<>>')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('DEBUG')
classify_udf = udf(lambda x:classify(x),IntegerType())
transform_date_udf = udf(lambda x:transformDate(x),TimestampType())
paths = 's3a://<<mybucket>>/'
df = spark.read.parquet(paths,
inferSchema=True)
df = df.withColumn('colA_encoded',classify_udf('colA'))
df = df.withColumn('date_transformed',transform_date_udf('train_date'))
return df
def do_filter(df: DataFrame):
df = df.fillna({'colA':'0'})
df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float'))
df2 = (df.filter(col('colA_float') > 3.0).select('colZ').distinct())
df=df.join(df2, 'colZ')
return df
def do_proper_group(df: DataFrame):
window = Window.partitionBy('colZ').orderBy('colA')
df =df.select('*', dense_rank().over(window).alias('myIdx'))
return df
def plot_data_frame(df: DataFrame =None) :
fig, ax = plt.subplots()
print(df.count())
pandas_df = df.toPandas()
print(pandas_df.columns)
pandas_df.set_index('myIdx', inplace=True)
pandas_df.groupby('colZ')['colA_float'].plot(legend=True,x='myIdx', xlabel="My Group Number", ylabel="Value")
plt.show()
if __name__ == '__main__':
start = time.time()
print(start)
plot_data_frame(do_proper_group(do_filter(read_from_s3())))
end = time.time()
print(end - start)
Error log below
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:Downloadsspark-3.5.1-bin-hadoop3spark-3.5.1-bin-hadoop3pythonlibpyspark.zippysparkworker.py", line 1225, in main
File "D:Downloadsspark-3.5.1-bin-hadoop3spark-3.5.1-bin-hadoop3pythonlibpyspark.zippysparkserializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: read
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:Downloadsspark-3.5.1-bin-hadoop3spark-3.5.1-bin-hadoop3pythonlibpyspark.zippysparkworker.py", line 1225, in main
File "D:Downloadsspark-3.5.1-bin-hadoop3spark-3.5.1-bin-hadoop3pythonlibpyspark.zippysparkserializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek
Where is this EOF error coming from ? Total of 760 files in S3 bucket with size totalling 900M. All parquet files.
Not sure if the problem is centered around spark configuration or the way in which I filter the dataframe and join it. Any help appreciated.