I am using spark cluster which is consisting of ec2 machines and now with the help of pyspark i want to transfer data from source s3 bucket to destination bucket in parquet format.both of the buckets have different IAM roles and bucket policies. I am setting spark aws access key and secret key at hadoop level using this code def set_s3_credentials(access_key, secret_key):
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set(“fs.s3a.access.key”, access_key)
hadoop_conf.set(“fs.s3a.secret.key”, secret_key) but i am getting this ERROR – Error occurred: An error occurred while calling o56.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 8 times, most recent failure: Lost task 0.7 in stage 1.0 (TID 8) (172.12.2.153 executor 0): java.nio.file.AccessDeniedException: s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: getFileStatus on s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 6C07ME6ZAV2B4XSA; S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==; Proxy: null), S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255).some times i am getting 056.parquet in place of 056.showString. In my code first seting credentials for source bucket by using above funciton and then reading my data from source bucket using spark.read.parquet() and successfully able to read it in dataframe and able to use aggregate function like df.count() or able to see it using df.show(). but after that i am trying to change the credentials for destination bucket using the above function but after changing those credentails when i am trying to write or use df.show() the data it is showing me the exception.
def copy_parquet_file():
try:
# Set and log source AWS credentials
set_s3_credentials(source_aws_access_key_id, source_aws_secret_access_key)
logging.info(f”Set source AWS credentials for bucket: ‘{source_bucket_name}'”)
# update_spark_conf(source_s3_conf)
logging.info(f"Starting to copy file from bucket '{source_bucket_name}' key '{source_key}' to bucket '{destination_bucket_name}' key '{destination_key}'")
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
logging.info(f'Source bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Source bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')
# Read parquet file from source bucket using PySpark
source_parquet_path = f"s3a://{source_bucket_name}/{source_key}"
df = spark.read.parquet(source_parquet_path, header=True, multiLine=True, quote=""", escape=""")
df = df.limit(100)
# df.printSchema()
logging.info("Read source parquet file successfully.")
# Set and log destination AWS credentials
set_s3_credentials(dest_aws_access_key_id, dest_aws_secret_access_key)
# update_spark_conf(destination_s3_conf)
logging.info(f"Set destination AWS credentials for bucket: '{destination_bucket_name}'")
logging.info(f'Destination bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Destination bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')
# Write dataframe to destination bucket using PySpark
destination_parquet_path = f"s3a://{destination_bucket_name}/{destination_key}"
time.sleep(10)
df.show()
df.write.parquet(destination_parquet_path, mode='overwrite')
logging.info(f'Copied {source_key} to {destination_key} successfully.')
except Exception as e:
logging.error(f"Error occurred: {e}") here is my code. Please help me to resolve this issue
I want to transfer my data without such exceptions successfully.
Rahul Sheoran is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.