I am working on a data transformation pipeline using AWS Glue with Apache Spark. I need to create a JSON output that contains an array of objects where all fields are included, even if they contain null values.
Currently, some fields with null values are being excluded in the resulting JSON, but I need them to be explicitly represented in the output, similar to:
[
{"identifier":"1015604061WS","values":{"erp_name":[{"locale":"de_DE","scope":null,"data":"Digitales Badethermometer Ente"}],"inside_length":null,"inside_width":null,"inside_height":null,"diagonal":null,"gross_weigth":[{"locale":null,"scope":null,"data":{"amount":0.084,"unit":"KILOGRAM"}}],"nett_weigth":[{"locale":null,"scope":null,"data":{"amount":0.08,"unit":"KILOGRAM"}}],"ean13":[{"locale":null,"scope":null,"data":"4250226038424"}],"uvp":null,"internal_note":[{"locale":"de_DE","scope":null,"data":"Das digitale Bad— und Raumthermometer misst nicht nur die Temperaturen, sondern sorgt auch für Spiel und Spaß während des Badens. Durch die praktische Mehrfach—Funktionalität kann die Temperatur nicht nur im Wasser, sondern auch in der Luft einfach gemessen werden und wird digital, gut lesbar, angezeigt. So ist das Ablesen deutlich besser als bei der Skala herkömmlicher Thermometer. Dank des integrierten roten LED—Lichts wird direkt signalisiert, wenn die Wassertemperatur für Ihr Kind zu hoch ist. Die ideale Badetemperatur liegt bei 37 Grad. Zudem kann das Thermometer einfach mit Hilfe des Saugnapfs flexibel in der Badewanne, der Erwachsenenwanne oder auch an Bad—Fliesen angebracht werden, sodass es nicht stört und man von überall die Anzeige bequem ablesen kann.nHinweis: Das LED—Signale leuchtet rot, sobald die Wassertemperatur zu hoch für Ihr Kind ist und über 37 Grad steigt."}],"expiry_date":[{"locale":null,"scope":null,"data":""}],"volume":[{"locale":null,"scope":null,"data":{"amount":0.001,"unit":"CUBIC_METER"}}],"country_iso_code":[{"locale":null,"scope":null,"data":"CN"}],"material_type":[{"locale":null,"scope":null,"data":"HAWA"}],"length":[{"locale":null,"scope":null,"data":{"amount":0.14,"unit":"METER"}}],"width":[{"locale":null,"scope":null,"data":{"amount":0.15,"unit":"METER"}}],"height":[{"locale":null,"scope":null,"data":{"amount":0.069,"unit":"METER"}}],"packaging_quantity":[{"locale":null,"scope":null,"data":1}],"pruftext":[{"locale":"de_DE","scope":null,"data":"— Niedliche Badeenten— Misst Raum— und Wassertemperaturn— Rote LED—Kontrollleuchte signalisiert, wenn die Wassertemperatur über 37 Grad steigt"}],"ean128":[{"locale":"de_DE","scope":null,"data":"4250404716977"}],"length_vpe":[{"locale":null,"scope":null,"data":{"amount":0.14,"unit":"METER"}}],"width_vpe":[{"locale":null,"scope":null,"data":{"amount":0.15,"unit":"METER"}}],"heigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.069,"unit":"METER"}}],"gross_weigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.084,"unit":"KILOGRAM"}}],"nett_weigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.08,"unit":"KILOGRAM"}}],"lhm_menge":[{"locale":null,"scope":null,"data":"648"}],"rotho_colors":[{"locale":null,"scope":null,"data":["04061"]}]}},
{"identifier":"1019502238WS","values":{"erp_name":[{"locale":"de_DE","scope":null,"data":"Funktionskordel 12.5 m MADEI"}],"inside_length":null,"inside_width":null,"inside_height":null,"diagonal":null,"gross_weigth":[{"locale":null,"scope":null,"data":{"amount":0.055,"unit":"KILOGRAM"}}],"nett_weigth":[{"locale":null,"scope":null,"data":{"amount":0.054,"unit":"KILOGRAM"}}],"ean13":[{"locale":null,"scope":null,"data":"7610859190093"}],"uvp":null,"internal_note":[{"locale":"de_DE","scope":null,"data":""}],"expiry_date":[{"locale":null,"scope":null,"data":""}],"volume":null,"country_iso_code":[{"locale":null,"scope":null,"data":"CN"}],"material_type":[{"locale":null,"scope":null,"data":"FERT"}],"length":[{"locale":null,"scope":null,"data":{"amount":0.2,"unit":"METER"}}],"width":[{"locale":null,"scope":null,"data":{"amount":0.12,"unit":"METER"}}],"height":[{"locale":null,"scope":null,"data":{"amount":0.019,"unit":"METER"}}],"packaging_quantity":[{"locale":null,"scope":null,"data":1}],"pruftext":[{"locale":"de_DE","scope":null,"data":""}],"ean128":[{"locale":"de_DE","scope":null,"data":"7612583321026"}],"length_vpe":[{"locale":null,"scope":null,"data":{"amount":0.2,"unit":"METER"}}],"width_vpe":[{"locale":null,"scope":null,"data":{"amount":0.12,"unit":"METER"}}],"heigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.076,"unit":"METER"}}],"gross_weigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.22,"unit":"KILOGRAM"}}],"nett_weigth_vpe":[{"locale":null,"scope":null,"data":{"amount":0.054,"unit":"KILOGRAM"}}],"lhm_menge":[{"locale":null,"scope":null,"data":"1760"}],"rotho_colors":[{"locale":null,"scope":null,"data":["02238"]}]}}
]
Code example:
spark = glueContext.spark_session
spark = SparkSession.builder
.appName("GlueJob")
.config("spark.sql.jsonGenerator.ignoreNullFields", "false")
.getOrCreate()
job = Job(glueContext)
def map_unit(unit_column):
return when(col(unit_column) == 'M', 'METER')
.when(col(unit_column) == 'KG', 'KILOGRAM')
.when(col(unit_column) == 'M3', 'CUBIC_METER')
.when(col(unit_column) == 'MM', 'MILLIMETER')
.otherwise(col(unit_column))
def create_structure(column_name, unit_column):
return when(
(col(column_name) != 0) & (col(unit_column).isNotNull()) & (col(unit_column) != ""),
array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
struct(
col(column_name).cast("float").alias("amount"),
map_unit(unit_column).alias("unit")
).alias("data")
))
).otherwise(None)
def create_currency_structure(column_name, currency_column):
return when(
(col(column_name) != 0) & (col(currency_column).isNotNull()) & (col(currency_column) != ""),
array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
array(struct(col(column_name).cast("float").alias("amount"), col(currency_column).alias("currency"))).alias("data")
))
).otherwise(None)
database_name = "project_example"
table_name = "input"
df = glueContext.create_dynamic_frame.from_catalog(
database=database_name,
table_name=table_name,
transformation_ctx="catalog_node"
).toDF()
df = df.filter(col('material').isNotNull()).withColumnRenamed('material', 'identifier')
df = df.withColumn('erp_name', array(struct(
when(lit("de_DE") == "", None).otherwise(lit("de_DE")).alias("locale"),
lit(None).alias("scope"),
col('materialkurztext').alias("data")
)))
df = df.withColumn('ean13', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
regexp_replace(col('ean13'), "^'", "").cast('string').alias("data")
)))
df = df.withColumn('internal_note', array(struct(
lit("de_DE").alias("locale"),
lit(None).alias("scope"),
col('`int.Vermerk`').alias("data") # Usar backticks para nombres con caracteres especiales
)))
df = df.withColumn('expiry_date', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
col('auslaufdatum').cast("string").alias("data")
)))
df = df.withColumn('country_iso_code', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
col('ursprungsland').alias("data")
)))
df = df.withColumn('material_type', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
col('materialart').alias("data")
)))
df = df.withColumn('packaging_quantity', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
col('verpackungsmenge').alias("data")
)))
df = df.withColumn('pruftext', array(struct(
lit("de_DE").alias("locale"),
lit(None).alias("scope"),
col('prüftext').alias("data")
)))
df = df.withColumn('ean128', array(struct(
lit("de_DE").alias("locale"),
lit(None).alias("scope"),
regexp_replace(col('ean128'), "^'", "").cast('string').alias("data")
)))
df = df.withColumn('lhm_menge', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
regexp_replace(col('`pal.-Mng.`'), "^'", "").cast('string').alias("data")
)))
df = df.withColumn('rotho_colors', array(struct(
lit(None).alias("locale"),
lit(None).alias("scope"),
array(regexp_replace(col('Farbcode'), "^'", "").cast('string')).alias("data") # Eliminar ' al inicio
)))
df = df.withColumn('inside_length', create_structure('Innenlänge', 'Innenmaßeinheit'))
.withColumn('inside_width', create_structure('Innenbreite', 'Innenmaßeinheit'))
.withColumn('inside_height', create_structure('Innenhöhe', 'Innenmaßeinheit'))
.withColumn('diagonal', create_structure('Diagonale', 'Innenmaßeinheit'))
df = df.withColumn('gross_weigth', create_structure('bruttogewicht st', 'gewichtseinheit st'))
.withColumn('nett_weigth', create_structure('nettogewicht st', 'gewichtseinheit st'))
df = df.withColumn('uvp', create_currency_structure('unverbindliche preisempfehlung', 'uvp w'))
df = df.withColumn('volume', create_structure('volumen st', 'volumeneinheit st'))
df = df.withColumn('length', create_structure('länge st', 'abmessungseinheit st'))
.withColumn('width', create_structure('breite st', 'abmessungseinheit st'))
.withColumn('height', create_structure('höhe st', 'abmessungseinheit st'))
df = df.withColumn('length_vpe', create_structure('länge vpe', 'abmessungseinheit vpe'))
.withColumn('width_vpe', create_structure('breite vpe', 'abmessungseinheit vpe'))
.withColumn('heigth_vpe', create_structure('höhe vpe', 'abmessungseinheit vpe'))
df = df.withColumn('gross_weigth_vpe', create_structure('bruttogewicht vpe', 'gewichtseinheit vpe'))
.withColumn('nett_weigth_vpe', create_structure('vpe nettogewicht', 'gewichtseinheit vpe'))
df = df.withColumn("values", struct(
col('erp_name').alias("erp_name"),
when(col('inside_length').isNotNull(), col('inside_length')).alias("inside_length"),
when(col('inside_width').isNotNull(), col('inside_width')).alias("inside_width"),
when(col('inside_height').isNotNull(), col('inside_height')).alias("inside_height"),
when(col('diagonal').isNotNull(), col('diagonal')).alias("diagonal"),
when(col('gross_weigth').isNotNull(), col('gross_weigth')).alias("gross_weigth"),
when(col('nett_weigth').isNotNull(), col('nett_weigth')).alias("nett_weigth"),
col('ean13').alias("ean13"),
when(col('uvp').isNotNull(), col('uvp')).alias("uvp"),
col('internal_note').alias("internal_note"),
col('expiry_date').alias("expiry_date"),
when(col('volume').isNotNull(), col('volume')).alias("volume"),
col('country_iso_code').alias("country_iso_code"),
col('material_type').alias("material_type"),
when(col('length').isNotNull(), col('length')).alias("length"),
when(col('width').isNotNull(), col('width')).alias("width"),
when(col('height').isNotNull(), col('height')).alias("height"),
col('packaging_quantity').alias("packaging_quantity"),
col('pruftext').alias("pruftext"),
col('ean128').alias("ean128"),
when(col('length_vpe').isNotNull(), col('length_vpe')).alias("length_vpe"),
when(col('width_vpe').isNotNull(), col('width_vpe')).alias("width_vpe"),
when(col('heigth_vpe').isNotNull(), col('heigth_vpe')).alias("heigth_vpe"),
when(col('gross_weigth_vpe').isNotNull(), col('gross_weigth_vpe')).alias("gross_weigth_vpe"),
when(col('nett_weigth_vpe').isNotNull(), col('nett_weigth_vpe')).alias("nett_weigth_vpe"),
# col('lhm_menge').alias("lhm_menge"),
col('rotho_colors').alias("rotho_colors")
))
df_transformed = df.select('identifier', 'values')
output_path = "s3://project_example/export"
df_transformed.write.json(output_path, mode='overwrite', ignoreNullFields=False)
job.commit()
What is the best approach to ensure that all fields, including those with null values, are preserved in the JSON output?
I am using the following versions:
AWS Glue version 4.0
Python 3
Any guidance or sample code would be greatly appreciated.