<code>new_DF=old_DF
.select(col("id"),
col("COL1"),
col("COL2"),
).distinct()
new_JSON_DF = new_DF
.withColumn("PROP",struct(col("COL1"),col("COL2")))
.drop("COL1", "COL2")
</code>
<code>new_DF=old_DF
.select(col("id"),
col("COL1"),
col("COL2"),
).distinct()
new_JSON_DF = new_DF
.withColumn("PROP",struct(col("COL1"),col("COL2")))
.drop("COL1", "COL2")
</code>
new_DF=old_DF
.select(col("id"),
col("COL1"),
col("COL2"),
).distinct()
new_JSON_DF = new_DF
.withColumn("PROP",struct(col("COL1"),col("COL2")))
.drop("COL1", "COL2")
Col1, Col2 could have nulls.
If data is like following
id COL1 COL2
1 null def
2 abc null
3 null null
I want the output in new JSON DF to be like
{
id : 1
PROP : {
“COL2” : “def”
}
},
{
id : 2
PROP : {
“COL1” : “abc”
}
},
{
id : 3
PROP : {}
}
The above dataframe is a spark dataframe and I tried to use this
<code>new_JSON_DF = new_JSON_DF.withColumn("PROP", map_filter("PROP", lambda k, v: v.isNotNull()))
</code>
<code>new_JSON_DF = new_JSON_DF.withColumn("PROP", map_filter("PROP", lambda k, v: v.isNotNull()))
</code>
new_JSON_DF = new_JSON_DF.withColumn("PROP", map_filter("PROP", lambda k, v: v.isNotNull()))
This is not working and I also tried using the pandas and following command works for it, but since there is a memory limitation of toPandas function it is throwing memory error.
<code>pandas_df = new_JSON_DF.toPandas()
pandas_df['PROP'] = pandas_res_df['PROP'].apply(lambda x: {k: v for k, v in x.items() if v is not None})
schema = StructType([
StructField("id", StringType(), True),
StructField("PROP", MapType(StringType(), StringType()), True)
])
new_JSON_DF = spark.createDataFrame(pandas_df, schema=schema)
</code>
<code>pandas_df = new_JSON_DF.toPandas()
pandas_df['PROP'] = pandas_res_df['PROP'].apply(lambda x: {k: v for k, v in x.items() if v is not None})
schema = StructType([
StructField("id", StringType(), True),
StructField("PROP", MapType(StringType(), StringType()), True)
])
new_JSON_DF = spark.createDataFrame(pandas_df, schema=schema)
</code>
pandas_df = new_JSON_DF.toPandas()
pandas_df['PROP'] = pandas_res_df['PROP'].apply(lambda x: {k: v for k, v in x.items() if v is not None})
schema = StructType([
StructField("id", StringType(), True),
StructField("PROP", MapType(StringType(), StringType()), True)
])
new_JSON_DF = spark.createDataFrame(pandas_df, schema=schema)