I am new to python and pyspark, so need your help in understanding and resolving this problem. I have some parquet log files with the following schema:
|-- metadata
...
|-- request
...
|-- response: struct (nullable = true)
| |-- response_timestamp: string (nullable = true)
| |-- response_status_code: string (nullable = true)
| |-- results: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- filters: struct (nullable = true)
| | | | |-- price: struct (nullable = true)
| | | | | |-- min: integer (nullable = true)
| | | | | |-- max: integer (nullable = true)
| | | | |-- keywords: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- sort_by: struct (nullable = true)
| | | | | |-- search_sort_type: string (nullable = true)
| | | | | |-- sort_ascending: boolean (nullable = true)
| | | | |-- test_filters: struct (nullable = true)
| | | | | |-- integer1: struct (nullable = true)
| | | | | | |-- min: integer (nullable = true)
| | | | | | |-- max: integer (nullable = true)
| | | | | |-- double1: struct (nullable = true)
| | | | | | |-- double1: double (nullable = true)
| | | | | |-- double2: struct (nullable = true)
| | | | | | |-- double2: double (nullable = true)
| | | | | |-- integer2: struct (nullable = true)
| | | | | | |-- integer2: integer (nullable = true)
I created the ‘test_filters’ object incorrectly in the schema and some of the parquet files got corrupted, and querying through hive broke.
I corrected the schema since, to fix subsequent logs, but I am now trying to clean up the corrupted files by deleting the test_filters object.
How can I remove the test_filters object from nested results array, and update the parquet schema? Please advise.
I tried loading the parquet files using pyspark, filtering out all rows where none of the results have test_filters:
filtered_result = df.filter(~expr("exists(response.results, x -> x.filters.test_filters is not null)"))
and then creating another dataframe with updated schema (with no test_filters):
df_with_schema = spark.createDataFrame(filtered_result.rdd, schema)
But when I try to write df_with_schema back to parquet files, I get the error:
field filters in element in array field results in field response: Length of object (4) does not match with length of fields (3)
I also tried to write filtered_result directly to parquet files, without updating the schema and creating a new dataframe. But then the hive queries still fail due to schema mismatch.
prince jain is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.