I have a Hive process that I’m migrating to PySpark.
I’m encountering a problem that I can’t seem to solve.
I have an insert into from a staging table to a final table where the partition is generated from the file name.
The select statement looks something like this:
insert into table cdr_diameter_staging partition (data_date)
select .
.
.
.
CASE WHEN TRIM(LCASE(arp_3)) = '(null)' THEN NULL ELSE arp_3 END arp_3,
srv_attr_id string,
CAST(split(reverse(split(reverse(INPUT__FILE__NAME),'/')[0]),'_')[1]/1000000 AS INT) data_date
from cdr_diameter_staging
In Hive, executing the following:
hive -e "select CAST(split(reverse(split(reverse(INPUT__FILE__NAME),'/')[0]),'_')[1]/1000000 AS INT) data_date from sandbox_datascientist.am_cdr_diameter_staging limit 1;"
would yield:
+------------+
| data_date |
+------------+
| 20240905 |
+------------+
Because the file in the staging directory of that table is
/apps/hive/warehouse/sandbox_datascientist.db/am_cdr_diameter_staging/CDRDiameter_20240905085230_000_00000000.csv
I tried the following in PySpark:
df_ori = spark.table("{}.{}".format("sandbox_datascientist","am_cdr_diameter_staging"))
df_transformed = df_ori.select(when(trim(lower(col("arp_3"))) == "(null)", None).otherwise(col("arp_3")).alias("arp_3"),
col("srv_attr_id"),
(split(reverse(split(reverse(col("INPUT__FILE__NAME")), '/')[0]), '_')[1] / 1000000).cast("int").alias("data_date"))
but it obviously doesn’t work.
I wanted to use this code as an example but it doesn’t work either:
import os
path = os.path.abspath(file)
df = spark.read.csv(path) #folder of staging table
df.select(input_file_name()).first()
Honestly, I don’t know how to replicate that column from that Hive process and bring it to PySpark.”