I have created a delta table in a Synapse notebook with the following schema:
CREATE TABLE IF NOT EXISTS silver.tensile
(
id STRING,
foil STRING,
timestamp TIMESTAMP,
tickness_um DOUBLE,
run_name STRING,
qc INTEGER,
stack_no INTEGER,
sheet INTEGER,
modulus_mpa DOUBLE,
force_yield_n DOUBLE,
stress_yield_mpa DOUBLE,
tensile_strain_yield DOUBLE,
force_tensile_strength_n DOUBLE,
stress_tensile_strength_mpa DOUBLE,
displacement_tensile_strength_mm DOUBLE,
tensile_displacement_break_mm DOUBLE
)
USING DELTA
LOCATION 'abfss://[email protected]/tensile'
I want to merge some pyspark dataframes with the same shcema to the table with the following code:
if not tout.isDeltaTable:
print(f"Delta table not found: {targetTable}")
if tout.toDF().schema == df.schema:
tout.alias("out").merge(
df.alias("updates"), "out.id == updates.id"
).whenNotMatchedInsertAll().execute()
else:
raise ValueError("Source schema does not match the target schema")
Strangely enough, the merge works for most of the new files, however, it sometimes mess up the schema. To be more specific, although the ready to merge dataframe is in correct schema, after merging it stores incorrect Decimal data in some STRING TYPE fields (foil, thickness_mm, run_name) and leaves the rest as NULL. I don’t have such numbers in the raw data files!
The merged data should look like this:
Correct merged data
But sometimes it merges data incorrectly and here is the outcome:
Incorrect merged data
I don’t know what I’m missing in my code! Can somebody help me please?
I checked the source and target schema to be the same.
Behrooz Nikandish is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.