I have a PySpark dataframe with currently one record in it –
+-----------------------------------------------------------------------+---------------------------------------------------------------------------------+
|source_file_path |destination_file_path |
+-----------------------------------------------------------------------+---------------------------------------------------------------------------------+
|Input-Data/EOBLetter/20240425/MONTHLY_EOB_69L7X7F53_20231227212740.json|Input-Data/EOBLetter/Processed/20240425/MONTHLY_EOB_69L7X7F53_20231227212740.json|
+-----------------------------------------------------------------------+---------------------------------------------------------------------------------+
I have below function(Under class ) that is applied for each row in above dataframe using foreach()
class LoadToS3:
def __init__(self, bucket_name, output_dir):
self.bucket_name = bucket_name
self.output_dir = output_dir
# self.s3 = boto3.resource('s3')
@property
def client(self):
return boto3.client('s3')
def move_files_in_s3(self, row):
copy_source = {
'Bucket': self.bucket_name,
'Key': row.source_file_path
}
self.client.copy(copy_source, self.bucket_name,
row.destination_file_path)
self.client.delete_object(
Bucket=self.bucket_name,
Key=row.source_file_path
)
I am calling the above function with forach() –
load_to_s3 = LoadToS3(
self.args['SOURCE_S3_BUCKET'], self.args['DESTINATION_S3_PATH'])
s3_move_path_df.foreach(load_to_s3.move_files_in_s3)
But I am getting below error while calling above foreach()-
Following error occured: An error occurred while calling o384.isEmpty.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1374.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1374.0 (TID 5111) (100.127.190.158 executor 1): java.io.FileNotFoundException:
File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
Any help would be appreciated !!