I need convert the next json into a parquet file. Convert this kind of json using pyspark it’s really easy, but the complex here is that i have a sub childs and have to do more that one explode, and is in this level I’m having troubles. The source json is like this:
{"responseStatus":"SUCCESS","responseDetails":{"pagesize":1000,"pageoffset":0,"size":111,"total":111},"data":[{"id":1384323,"name":"email","owner":1019693,"Components":[{"ReportFieldName":"Session I","ControlType":"Clusters","Data":[{"Cluster":"C3242"},{"Cluster":"C4321"}]}]},{"id":654343,"name":"call","owner":453233,"Components":[{"ReportFieldName":"Session VX","ControlType":"Phone","Data":"This is a personal SMS"}]},{"id":1383423,"name":"sms","owner":1014393,"Components":[{"ReportFieldName":"Session II","ControlType":"Machines","Data":[{"Machine":"PC3889","Comments":"Tesxt 1"},{"Machine":"PC4321","Comments":null},{"Machine":"PC6554","Comments":"Text 2"}]}]},{"id":6584323,"name":"email","owner":null,"Components":[{"ReportFieldName":"Session X","ControlType":"Clusters","Data":[{"Cluster":"C3889"},{"Cluster":"C4321"},{"Cluster":null}]}]},{"id":9837362,"name":"call","owner":232333,"Components":[{"ReportFieldName":"Session XX","ControlType":"Phone","Data":"This is a test"}]}]}
I start with the next code, but i don’t get it.
from functools import reduce
#from pyspark.sql.functions import col, concat_ws
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DataType, LongType
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import json
df = spark.read.option("multiline", "true").option("encoding", "UTF-8").json('s3://df-julio-poc/source/json/my_test_sample.json')
#df.printSchema()
df = df.select(F.explode("data").alias("data"))
df = df.select("data.*")
#df.show(5, truncate = False)
df = df.withColumn("Components", F.explode("Components"))
#df.show(5, truncate = False)
machines_item_schema = ArrayType(StructType([
StructField("Machine", StringType(), True),
StructField("Comments", StringType(), True)
]))
cluster_schema = ArrayType(StructType([
StructField("Cluster", StringType(), True)
]))
phone_schema = ArrayType(StructType([
StructField("Data", StringType(), True)
]))
df_machines_items = df.filter(F.col("Components.ControlType") == "Machines")
df_machines_items = df_machines_items.withColumn("ActionItem", F.explode(F.from_json(F.col("Components.Data"), machines_item_schema)))
df_machines_items = df_machines_items.select("*", "Components.ControlType","Components.ReportFieldName",F.col("ActionItem.*")).drop("components","actionitem")
df_machines_items.show()
df_cluster_items = df.filter(F.col("Components.ControlType") == "Clusters")
.withColumn("Clusters", F.explode(F.from_json(F.col("Components.Data"), cluster_schema)))
.select("*","Components.ControlType","Components.ReportFieldName",F.col("Clusters.*")).drop("components","clusters")
df_cluster_items.show()
df_phone_items = df.filter(F.col("Components.ControlType") == "Phone")
.withColumn("datavalue", F.col("Components.Data"))
.select("*","Components.ControlType","Components.ReportFieldName").drop("components")
df_phone_items.show()
df = df_machines_items.unionByName(df_cluster_items, allowMissingColumns=True).unionByName(df_phone_items, allowMissingColumns=True)
df.show()
The actual reuslt is this:
But result must be something like this
Can somebody help me with this?
Regards
1