Here is the issue. I need to load data from a remote database, with a bit of an awkward query:
query = f"""
(SELECT id, key, MAX(time) AS created_at
FROM remote.table
WHERE remote_id = {id}
AND id IN ({contact_ids_str})
GROUP BY id, key) as subquery
"""
The contact_ids_str is a rather lengthy string of 50,000 ids. The total statement it probably 8MB in size. This seems cause an issue on the jdbc driver.
Next up i read the data using the spark jdbc driver
df = spark.read.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", query)
.option("user", jdbc_properties["user"])
.option("password", jdbc_properties["password"])
.option("driver", jdbc_properties["driver"])
.load()
print(f"Query {df.count()} rows returned.")
df.write.csv('/home/slake/checkpoints/temp/', header=True, mode="append")
After this i delete any dataframe and the files are written to disk. This works but only for 30-40 queries. After this the spark driver runs out of memory. The dataframe it self is not the memory problem though. It seems to be a memory issue on the jdbc driver itself. Because when i first create a view (using psycopg2) with the original query on the postgres db with the huge query and then make spark load from the view itself, the memory issue disappears.
So the question is, is this a problem i can solve without doing the work around with psycopg2
For example forcing a jdbc memory cleanup or something like it.
Lennart Reus is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.