Description:
-
Using the dbt functionality that allows one to create a python model, I created a model that reads from some BigQuery table, performs some calculations and writes back to BigQuery.
-
It uses dataproc (serverless submission mode) to submit the model as a PySpark job.
Issue
When one runs the model with a table materialization, everything works as intended. However, when trying to use the incremental materialization and using the property dbt.this
to acess the location of the current model, the code breaks.
Here’s the faulty code:
# Processs new rows only
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(created_at) from {dbt.this}"
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
Here’s the error output:
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: spark_catalog requires a single-part namespace, but got [x, y]
References:
- Python models in dbt
I’ve tried a different approach where instead of querying the table using session.sql
I first retrieve the table using session.table
and then perform a simple computation with the dataframe returned, but the error was still present
Carlos Veríssimo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.