I am trying to process data using pyspark which is set up in to run in AWS ECS service, below are the Transformation I do on data and then write dataframe to parquet.
NOTE: It only fails for big files.
w = Window.partitionBy(*columns).orderBy(F.desc(column1))
df = input_data_frame.withColumn("rank", F.rank().over(w)).where(col("rank") == 1).drop("rank")
After this I try to write this data to a location
data_frame.write.mode("overwrite").partitionBy("column2").save(file_location, format="parquet")
where i get this error:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:46777)
Traceback (most recent call last):
File "/app/kafka-connect-ingest/preprocessing/preprocessing_files_invoker.py", line 42, in main
executor.execute()
File "/app/kafka-connect-ingest/preprocessing/src/preprocessing/preprocessing_files.py", line 64, in execute
file_location = self.write_effective_deltas(effective_delta, tmp_dir)
File "/app/kafka-connect-ingest/preprocessing/src/preprocessing/preprocessing_files.py", line 195, in write_effective_deltas
data_frame.write.mode("overwrite").partitionBy("dl_delete_flag").save(file_location, format="parquet")
File "/usr/local/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 1109, in save
self._jwrite.save(path)
File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/usr/local/lib/python3.10/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.10/site-packages/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o55.save
This is the spark config I set up:
spark = SparkSession.builder.appName("Data preprocessing").getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl")
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
spark.conf.set("spark.sql.parquet.writeLegacyFormat", "True")
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "False")
spark.conf.set("spark.sql.shuffle.partitions", "1")
spark.conf.set("spark.sql.files.maxRecordsPerFile", "40000")
I expect file to be written in desired location successfully.
New contributor
Sagar Javeri is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
1