our current script was generated using visual ETL on AWS Glue. It works but the incremental loading (with job bookmark) does not work. for every run all the data is uploaded in s3 again. What would be the possible issues here?
things I already did
-
enabled Job bookmark
-
added the job bookmark keys on different places
-
multiple columns for bookmark keys to ensure uniqueness of each row
-
used the visual ETL and then added some lines manually
-
job.commit()
andjob.init()
is in the codeimport sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsgluedq.transforms import EvaluateDataQuality from awsglue import DynamicFrame import gs_now def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: for alias, frame in mapping.items(): frame.toDF().createOrReplaceTempView(alias) result = spark.sql(query) return DynamicFrame.fromDF(result, glueContext, transformation_ctx) args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Default ruleset used by all target nodes with data quality enabled DEFAULT_DATA_QUALITY_RULESET = """ Rules = [ ColumnCount >= max(last(1)), RowCount > avg(last(10))*0.6 ] """ t_df = glueContext.create_dynamic_frame.from_options( connection_type = "oracle", connection_options = { "useConnectionProperties": "true", "dbtable": "mywarehouse_t", "connectionName": "warehouse", "hashfield":"id" }, transformation_ctx = "t_node", additional_options = {"JobBookmarkKeys": ["id", "key2"], "jobBookmarkKeysSortOrder": "asc" } ) m_df = glueContext.create_dynamic_frame.from_options( connection_type = "oracle", connection_options = { "useConnectionProperties": "true", "dbtable": "mywarehouse_m", "connectionName": "base2", "hashfield":"key2" }, transformation_ctx = "m_node" ) # Script generated for node SQL Query sqlQuery = ''' select t.*, m.tag from mywarehouse_t t inner join mywarehouse_m m on t.key2 = m.key2 ''' sql_node = sparkSqlQuery(glueContext, query = sqlQuery, mapping = {"mywarehouse_m":m_df , "mywarehouse_t":t_df}, transformation_ctx = "sql_ctx") s3_node = glueContext.write_dynamic_frame.from_options(frame=sql_node, connection_type="s3", format="glueparquet", connection_options={"path": "s3://my_s3", "partitionKeys": []}, format_options={"compression": "snappy"}, transformation_ctx="s3_node") job.commit()