I’m using PySpark to read parquet files from HDFS location partitioned by DATE_KEY. Following code always reads the file from the MAX(DATE_KEY) partition and converts to Polars dataframe.
def hdfs_fetch_latest_parquet_file(parquet_file_name):
sc = SparkSession.builder.appName("hdfs-spark").config("spark.sql.execution.arrow.pyspark.enabled", "true").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "16g").config("spark.driver.maxResultSize", "4g").config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED").config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED").getOrCreate()
spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/").createOrReplaceTempView("allDateKeysParquet")
spark_df_latest_date_key = sc.sql('select * from allDateKeysParquet where DATE_KEY = (select MAX(DATE_KEY) from allDateKeysParquet)')
df = pl.from_arrow(pa.Table.from_batches(spark_df_latest_date_key._collect_as_arrow())).drop('DATE_KEY')
return df
This method works well to find the max DATE_KEY if it doesn’t have any sub partition in it i.e.,
But the challenge is if the DATE_KEY further has sub partitions based on BASE_FEED, the code fails.
My objective is to read the parquet file within MAX(DATE_KEY) and if that contains sub folders, then read everything inside them too.
I tried using the below code but it comes with the exception
spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/*").createOrReplaceTempView("allDateKeysParquet")
Conflicting directory structures detected. Suspicious paths
Is there any other way that we could solve this so that the code always find the MAX(DATE_KEY) partition and reads all the parquet files inside it irrespective of the date_key containing further partitions i.e., BASE_FEED in this case?
Can someone please help me on this?