I am facing a writer issue to an iceberg table using the aws glue job . My use case is to read an oracle table from onprem and write to iceberg table using aws glue. read from onprem table is good, but write is causing the error
I have added the job parameter as data lake formats as iceberg . The table already exists in the database with sufficient permission
2024-06-27 15:59:56,539 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
File "/tmp/test_iceberg.py", line 66, in <module>
glueContext.write_data_frame.from_catalog(
File "/opt/amazon/lib/python3.10/site-packages/awsglue/dataframewriter.py", line 20, in from_catalog
return self._glue_context.write_data_frame_from_catalog(frame, db, table_name, redshift_tmp_dir,
File "/opt/amazon/lib/python3.10/site-packages/awsglue/context.py", line 413, in write_data_frame_from_catalog
return DataSink(j_sink, self).writeDataFrame(frame, self)
File "/opt/amazon/lib/python3.10/site-packages/awsglue/data_sink.py", line 35, in writeDataFrame
return DataFrame(self._jsink.pyWriteDataFrame(data_frame._jdf, glue_context._glue_scala_context, callsite(), info), self._sql_ctx)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 330, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o148.pyWriteDataFrame. Trace:
py4j.Py4JException: Method pyWriteDataFrame([class com.amazonaws.services.glue.DynamicFrame, class com.amazonaws.services.glue.GlueContext, class java.lang.String, class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext ,DynamicFrame
from awsglue.job import Job
import os
import boto3
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf
## @params: [JOB_NAME]
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
catalog_name = "glue_catalog"
warehouse_path = f"s3://s3-bucket/d_extract/"
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
conf = (
SparkConf()
.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") # test
.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY") # test
.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") # test
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "AES256")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.glue_catalog.warehouse", warehouse_path)
.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)
)
sc = SparkContext(conf=conf).getOrCreate()
glueContext = GlueContext(sc)
#spark = glueContext.spark_session
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)
connection_options = {
"useConnectionProperties": "true",
"dbtable": "db.table_nm",
"connectionName": "db_conn",
"hashfield": "OPEN_DT_ID" ,
"hashpartitions": 7 ,
}
glue_df = glueContext.create_dynamic_frame.from_options(
connection_type="oracle",
connection_options=connection_options,
transformation_ctx="glue_df",
)
glue_df.show(5)
glueContext.write_data_frame.from_catalog(
frame=glue_df,
database="d_extract",
table_name="aws_table_name",
transformation_ctx = "glue_df"
)
job.commit()