I am trying to perform in-memory updates on a very large PySpark DataFrame instead of making disk-based updates in a PostgreSQL database. I chose PySpark for its speed and scalability over direct database updates.
Here’s what I’m doing:
- I have a
.csv
file namedsource_table_details
that I read and convert into a PySpark DataFrame. - I perform some data manipulations like:
<code>df = df.withColumn('column1', df['column2'] * 2)df = df.withColumn('column2', concat(['column1', 'column2']))</code><code>df = df.withColumn('column1', df['column2'] * 2) df = df.withColumn('column2', concat(['column1', 'column2'])) </code>
df = df.withColumn('column1', df['column2'] * 2) df = df.withColumn('column2', concat(['column1', 'column2']))
- The final step would be to load this DataFrame into a database table.
However, I want to perform complex update operations similar to what I would do in SQL. For example, I have a SQL query that updates a table based on joins and conditions involving other database tables:
UPDATE source_table_details
SET remark1 = 'AUTO REVERSAL'
FROM source_lrsapirefund_tmp_details
WHERE source_lrscbs_tmp_details.creditdebit = 'CREDIT'
AND source_lrsapirefund_tmp_details.status NOT IN ('1','2')
AND source_lrscbs_tmp_details.utrrrnmerge = source_lrsapirefund_tmp_details.refutrno
AND source_lrsapirefund_tmp_details.refundtype IN ('1','2')
AND source_lrscbs_tmp_details.remark1 IS NULL;
To execute this query in PySpark, I created a temporary view:
df.createOrReplaceTempView("source_table_details")
I tried to use spark.sql()
with this query:
query = """
UPDATE source_table_details
SET remark1 = 'AUTO REVERSAL'
FROM source_lrsapirefund_tmp_details
WHERE source_lrscbs_tmp_details.creditdebit = 'CREDIT'
AND source_lrsapirefund_tmp_details.status NOT IN ('1','2')
AND source_lrscbs_tmp_details.utrrrnmerge = source_lrsapirefund_tmp_details.refutrno
AND source_lrsapirefund_tmp_details.refundtype IN ('1','2')
AND source_lrscbs_tmp_details.remark1 IS NULL;
"""
df_ops.df = spark.sql(query)
But I encountered the error: UPDATE TABLE is not supported temporarily
.
Question:
Is there an optimized way to perform SQL-like UPDATE queries directly on a PySpark DataFrame? Given that the other tables (source_lrsapirefund_tmp_details and source_lrscbs_tmp_details) are in the database, is it possible to execute these queries with minimal modification?
Any help or guidance would be appreciated.
1