I’ve been tasked with loading data from three postgreSQL tables, combining the data in databricks, and then writing the data to Azure blob storage as .csv files. I’m able to load the data into databricks directly from the PostgreSQL database. First, I load data from the first database and combine it accordingly on the load:
# Connection properties
jdbc_url = "jdbc:postgresql://server1.postgres.database.azure.com:port_num/database1"
connection_properties = {
"user": "username1@server1",
"password": "password1",
"driver": "org.postgresql.Driver",
"sslmode": "sslmodeoption"
}
#Schema / Table details
schema_name = "schemaname1"
table1_name = '"table1"'
table2_name = '"table2"'
# Construct table paths
table1_path = f"{schema_name}.{table1_name}"
table2_path = f"{schema_name}.{table2_name}"
# SQL query with join operations to join data on load
query = f"(SELECT t1.*, t2."TotalValue", t2."TotalTime" FROM {table1_path} AS t1 INNER JOIN {table2_path} AS t2 ON t1."PKey" = t2."Id") AS joined_data"
# Read data from the specified tables and create spark dataframe
df1 = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
This successfully creates my first dataframe called df1. I then load my data from a second database and create a second dataframe called df2:
# Connection properties
jdbc_url = "jdbc:postgresql://server1.postgres.database.azure.com:port_num/database2"
connection_properties = {
"user": "username2@server1",
"password": "password2",
"driver": "org.postgresql.Driver",
"sslmode": "sslmodeoption"
}
# Schema / table properties
schema_name = "schemaname1"
table_name = '"table3"'
# Construct table paths
table_path = f"{schema_name}.{table_name}"
df2 = spark.read.jdbc(url=jdbc_url, table=table_path, properties=connection_properties)
When is now use the display(df1) or display(df2) I am able to do so. I now combine the two dataframes further using SQL as follows (after creating tempviews of each dataframe):
final_df = spark.sql("""SELECT d1.*,d2.name FROM df1 as d1 INNER JOIN df2 as d2 ON df1.a_ID = df2.ID""")
But if I do display(final_df)
I run into problems as it times out. Also when I do the following to write to a staging area it times out:
filePath = "/mnt/area1/staging"
final_df.write.format("parquet").mode("overwrite").save(filePath)
I have tried the following method to partiion the data due to its size (the main table had 42gb of data):
# filepath
file_path = "/mnt/area1/Staging"
# Define the columns to partition by
partition_columns = ["name", "id2"]
# Write the DataFrame to Azure Blob Storage partitioned by the specified columns
final_df.write
.format("parquet")
.mode("overwrite")
.partitionBy(*partition_columns)
.save(file_path)
But when i do this it appears to run for a while but spits out this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 36) (10.139.64.11 executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 125857 ms
I’ve tried increasing the compute resources but this still occurrs. I know it is becuase I’m working with a large dataset but not sure how to go abaout making things more efficient.