One of my task today is read a simpe json file convert into dataframe and do a loop over the dataframe and do some validations, etc…
This is part of my code:
bucket_name = 'julio-s3'
json_source = 'source/'
file_2 = "tmp.json"
json_s3_path = f"s3://{bucket_name}/{json_source}/{file_2}"
print(json_s3_path)
df = spark.read.json(json_s3_path)
df.printSchema()
df.show()
And here is the first error:
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV
files are disallowed when the referenced columns only include the
internal corrupt record column (named _corrupt_record by default). For
example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and
spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the
same query. For example, val df =
spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().
So, I test with the folloging :
multiline_df = spark.read.option("multiline","true").json(json_s3_path)
multiline_df.show(truncate=False)
print(type(multiline_df))
and this is the result:
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|mySchema |
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|{{{NVARCHAR2, NUMBER, NVARCHAR2, NVARCHAR2}, Delta}, {{NVARCHAR2, NVARCHAR2, NVARCHAR2}, Delta}, {{NVARCHAR2, NVARCHAR2, NUMBER, NVARCHAR2}, Delta}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------+
<class 'pyspark.sql.dataframe.DataFrame'>
And well my json file is something like this:
{
"myschema": {
"accounts": {
"load_type": "daily",
"fields": {
"id": "nvarchar2",
"isdeleted": "number",
"master": "nvarchar2",
"name": "nvarchar2"
}
},
"customer": {
"load_type": "daily",
"fields": {
"id": "nvarchar2",
"accountid": "nvarchar2",
"usergroupid": "nvarchar2"
}
},
"resources": {
"load_type": "daily",
"fields": {
"id": "nvarchar2",
"isdeleted": "number",
"name": "nvarchar2",
"currency": "nvarchar2"
}
}
}
}
I need to do a loop over the FIELDS object to find which of them are “NVARCHAR2” and print the key and the value, for example have something like this:
TABLE |COLUMN |COLUMN_TYPE |
+-----------+-----------------+--------------+
| accounts |id |NVARCHAR2 |
| accounts |master |NVARCHAR2 |
| accounts |name |NVARCHAR2 |
| customer |id |NVARCHAR2 |
| customer |accountid |NVARCHAR2 |
| customer |usergroupid |NVARCHAR2 |
| resources |id |NVARCHAR2 |
| resources |name |NVARCHAR2 |
| resources |currency |NVARCHAR2 |
+-----------+-----------------+--------------+
Can somebody help me to resolve this problem reading the json in a correct structure?
Regards
you can get the desired output using this code
final = []
table_level_df = df.select("myschema.*")
for i in table_level_df.columns:
column_level_df = table_level_df.select(f"{i}.fields.*")
row = [row.asDict() for row in column_level_df.collect()]
for key in row[0]:
final.append(
{
"TABLE":i,
"COLUMN":key,
"COLUMN_TYPE":row[0][key]
}
)
final_df = spark.createDataFrame(final)
final_df.where(col("COLUMN_TYPE") == "NVARCHAR2")