I have parquet comming from sql server that has this schema:
root
|-- user_uid: string (nullable = true)
|-- user_email: string (nullable = true)
|-- ud_id: integer (nullable = true)
|-- ud_standard_workflow_id: integer (nullable = true)
|-- ud_is_preview: boolean (nullable = true)
|-- ud_is_completed: boolean (nullable = true)
|-- ud_language: string (nullable = true)
|-- ud_created_date: timestamp (nullable = true)
|-- ud_modified_date: string (nullable = true)
|-- ud_created_by_id: string (nullable = true)
|-- dsud_id: integer (nullable = true)
|-- dsud_user_data_id: integer (nullable = true)
|-- dsud_dynamic_step_id: integer (nullable = true)
|-- dsud_is_completed: boolean (nullable = true)
|-- dsud_answers: string (nullable = true)
Last column dsud_answers is a string, but contains JSON data in form of list:
[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizentship","Value":"66664"}]
How can I transform this column into proper JSON datatype?
Im still getting error: data type mismatch: Input schema “STRING” must be a struct, an array or a map.
My desire result is to have column dsud_answers as JSON datatype so I can flatten content of if. In this case there will be 2 records, because JSON contains 2 QuestionIds.
I managed to transform it in Pandas but cannot figure our Pyspark way to do it.
I tranformed Pyspark dataframe to Pandas dataframe and then looped through all columns.
from pyspark.sql.types import StructField, IntegerType, TimestampType, BooleanType
def batch_function (df_answers, batch_id):
df = df_answers.select("*").filter(df_bgx_answers.dsud_answers != '[]')
.withColumn("ud_modified_date", to_timestamp(df_bgx_answers.ud_modified_date))
.drop_duplicates()
.toPandas()
df_attributes = pd.DataFrame()
df_final = pd.DataFrame()
# Loop through the data to fill the dataframe
for index in df.index:
indexId = df.dsud_id[index]
userDataId = df.dsud_user_data_id[index]
dynamicStepId = df.dsud_dynamic_step_id[index]
languageID = df.ud_language[index]
createdDate = to_datetime(df.ud_created_date[index])
createdBy = df.ud_created_by_id[index]
modifiedDate = df.ud_modified_date[index]
email = df.user_email[index]
workflowId = df.ud_standard_workflow_id[index]
uid = df.user_uid[index]
completed = df.wrn_is_completed[index]
agreed = df.wrn_is_agreed[index]
flow_name = df.wf_name[index]
row_json = json.loads(df.dsud_answers[index])
normalized_row = pd.json_normalize(row_json)
df_attributes = pd.concat([df_attributes, normalized_row], ignore_index=True)
df_attributes['dsud_user_data_id'] = userDataId
df_attributes['dsud_id'] = indexId
df_attributes['dsud_dynamic_step_id'] = dynamicStepId
df_attributes['ud_language'] = languageID
df_attributes['ud_created_date'] = createdDate
df_attributes['ud_created_by_id'] = createdBy
df_attributes['ud_modified_date'] = modifiedDate
df_attributes['user_email'] = email
df_attributes['ud_standard_workflow_id'] = workflowId
df_attributes['user_uid'] = uid
df_attributes['wrn_is_completed'] = completed
df_attributes['wrn_is_agreed'] = agreed
df_attributes['wf_name'] = flow_name
df_attributes = df_attributes.reset_index(drop=True)
df_final = pd.concat([df_final, df_attributes])
df_answers = spark.createDataFrame(df_final)
df_answers.write.mode("append").format("delta").saveAsTable("final_table")