I have a working dbt python incremental model as follows:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
def calculate_final_customer_score(
combined_df: DataFrame, individual_scores: list
) -> DataFrame:
"""
Calculate the final customer score based on individual scores with equal weights.
Args:
combined_df (DataFrame): DataFrame containing the combined customer data.
individual_scores (list): List of individual score columns.
Returns:
DataFrame: DataFrame with the final customer score column.
"""
# Calculate the weighted sum of individual scores
weighted_sum = sum(F.col(score) for score in individual_scores)
# Calculate the final customer score
final_customer_score = weighted_sum / len(individual_scores)
combined_df = combined_df.withColumn("final_customer_score", final_customer_score)
return combined_df
def model(dbt, session):
dbt.config(materialized="incremental", partition_by=["execution_date"])
# Reference the source table
combined_df = dbt.ref("combined_proxy_2023_per_cust_v2")
# Define individual scores
individual_scores = ["rfm_score", "growth_score_adjusted", "points_score_adjusted"]
if dbt.is_incremental:
# Get the max execution_date from the current table
max_execution_date_query = f"SELECT MAX(execution_date) FROM {dbt.this}"
max_execution_date = session.sql(max_execution_date_query).collect()[0][0]
# Filter for new rows since the last execution
combined_df = combined_df.filter(F.col("execution_date") > max_execution_date)
# Calculate final customer score with equal weights
combined_df = calculate_final_customer_score(combined_df, individual_scores)
# Calculate deciles
combined_df = combined_df.withColumn(
"decile", F.ntile(10).over(Window.orderBy(F.col("final_customer_score")))
)
# Sort the decile tables in descending order by decile
combined_df = combined_df.orderBy(F.col("decile").desc())
return combined_df
I am trying to specify the partition column but unable to do so…
The model builds successfully but when I try to check if the partition_by
is working using
spark.sql("SHOW Partitions adi.inst__davi_dev.weighted_avg_target_per_cust").show()
I got the following:
[DELTA_SHOW_PARTITION_IN_NON_PARTITIONED_TABLE] SHOW PARTITIONS is not allowed on a table that is not partitioned: `inst__davi_dev`.`weighted_avg_target_per_cust` SQLSTATE: 42809
Meaning the partition is not applied. But I am unable to understand as what went wrong here? This is my first time working with partition_by in dbt python model.
Any help/ suggestions/ guidance will help!!