is there a way to set colum names and datatypes in a more dynamic way? So if i wanna reuse the code with a different table i dont have to change the whole code?
i already tried to set a placeholder for columns but it ran into severall errors.
i also defined in the code that it will check the metadata before and convert my hana datatypes into spark datatypes
# Update Lakehouse Table
if initialLoad:
df = pd.DataFrame(BWDataInit, columns=columns)
print(f"DataFrame columns: {df.columns}") # Debug output
if 'kennzahl' in df.columns:
df['kennzahl'] = pd.to_numeric(df['kennzahl'], errors='coerce') # Convert datatype of 'kennzahl' to float
else:
print("Error: 'kennzahl' column not found in the data.")
if 'recordmode' in df.columns:
df = df.drop(['recordmode'], axis=1)
# Save initial load to SQL table
append_records(df, DBWRITESCHEMA, DBWRITETABLE)
# Get requests from BWData
dfRequests = pd.DataFrame(BWDataRequest, columns=['REQTSN'])
# Update last Request table
update_last_request_table(dfRequests, DBWRITESCHEMA, DBWRITETABLE)
else:
df = pd.DataFrame(BWDataChange, columns=columns)
print(f"DataFrame columns: {df.columns}") # Debug output
if 'kennzahl' in df.columns:
df['kennzahl'] = pd.to_numeric(df['kennzahl'], errors='coerce') # Convert datatype of 'kennzahl' to float
else:
print("Error: 'kennzahl' column not found in the data.")
# Get list of all new Requests in Change Log
dfNewReq = df['reqtsn'].unique()
dfNewReq = pd.DataFrame(dfNewReq, columns=["REQTSN"])
# Set Key Columns for Join Condition
colsKey = ['name']
# Set KPI Columns for Join
colsKPI = ['kennzahl']
if not dfNewReq.empty:
# Read content of SQL Table
sdfTable = spark.read.table(f"{DBWRITESCHEMA}.{DBWRITETABLE}")
# Ensure 'kennzahl' column in Spark DataFrame has the same datatype as in Pandas DataFrame
sdfTable = sdfTable.withColumn('kennzahl', sdfTable['kennzahl'].cast('float'))
# Update SQL table for each request and record mode
for request in dfNewReq['REQTSN']:
# Select only new records (Recordmode = N)
dfNew = df[(df['reqtsn'] == request) & (df['recordmode'] == "N")]
if not dfNew.empty:
# Drop support columns
dfNew = dfNew.drop(['reqtsn', 'datapakid', 'record', 'recordmode'], axis=1)
# Convert pandas df to Spark df
sdfNew = spark.createDataFrame(dfNew)
# Append new data to old data
sdfTable = sdfTable.union(sdfNew)
# Update changed records (Recordmode = "")
dfUpdate = df[(df['reqtsn'] == request) & (df['recordmode'] == "")]
if not dfUpdate.empty:
# Drop unnecessary data left over from changelog structure
dfUpdate = dfUpdate.drop(['reqtsn', 'datapakid', 'record', 'recordmode'], axis=1)
# Convert pandas to spark df
sdfUpdate = spark.createDataFrame(dfUpdate)
# Inner join sdfTable with sdfUpdate
sdfTable = sdfTable.alias('old').join(
sdfUpdate.alias('new'), colsKey, how='inner'
)
for col in colsKPI:
sdfTable = sdfTable.withColumn(f"old.{col}", col("new.{col}"))
# Delete old records (Recordmode = R)
dfDelete = df[(df['reqtsn'] == request) & (df['recordmode'] == "R")]
if not dfDelete.empty:
# Drop unnecessary data left over from changelog structure
dfDelete = dfDelete.drop(['reqtsn', 'datapakid', 'record', 'recordmode'], axis=1)
# Convert pandas to spark df
sdfDelete = spark.createDataFrame(dfDelete)
# Anti join sdfTable with sdfDelete to remove all records
sdfTable = sdfTable.alias('old').join(
sdfDelete.alias('new'), colsKey, how='left_anti'
)
# Overwrite old content of SQL Table with updated Spark df
update_last_request_table(dfNewReq, DBWRITESCHEMA, DBWRITETABLE)
sdfTable.write.mode("overwrite").format("delta").saveAsTable(f"{DBWRITESCHEMA}.{DBWRITETABLE}")
#print result table
print("Result Table:")
spark.read.table("LHS_SAP.tbl_rbr_01").sort("Name").show()