Requirements:
We have a scenario where we need to load millions of records from Databricks into a SQL Server table with no keys/indexes. We are facing an issue in that we are unable to handle issues with failures. Deleting partially loaded data is not possible since the table is extremely huge and deletion takes a really long time. Spark seems to commit every few records even if AutoCommit is set to False besides other weird issues. So basically we want an All-In or nothing…
Approach:
To resolve the issue, we are trying to create a temp table in SQL Server from Databricks, load the dataframe into it. Later we will load from that temp table into the actual target table using a transaction that will commit if successful or rollback if not. We are trying to use pyodbc for this.
Issue:
- We are unable to create the temp table using pyspark code.
- Within the same notebook, if we use pyodbc in cell 1 to create the temp table, will it be available for load using the pyspark code in cell 2 ? If cursor and connection is closed in cell 1, will the temp table still be available in cell 2 ?
2.1. cell 1 – pyodbc code – just small part – conn, cursor creation assumed
drop_create_temp_table_query = """
IF OBJECT_ID('tempdb..##TempTable') IS NOT NULL
DROP TABLE #TempTable;
SELECT <COL_LIST> INTO #TempTable FROM <SQL SERVER TABLE>;
"""
cursor.execute(drop_create_temp_table_query)
# Commit the transaction
conn.commit()
# Close the connection
cursor.close()
conn.close()
2.2. Cell 2 – Load from DF to Temp Table
df.write.format("jdbc").options(**jdbc_options).option("dbtable","##TempTable").save()
2.3. Cell 3 – pyodbc code
qry = """BEGIN TRANSACTION
Load from temp table to actual table
COMMIT TRANSACTION
CATCH
Rollback if there is an error.
"""
- Assuming this option works, how should we mention the table name in spark JDBC write ? Just ##TempTable or tempdb..##TempTable ?
4
If you use #TempTable (local temp table), it will only be available in the session that created it.
Once you close the connection (conn.close()
), the table will no longer be available.
I have tried the below approach:
from pyspark.sql import SparkSession
import datetime
jdbcHostname = "slservdilip.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "db02"
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase}"
connectionProperties = {
"user": "admin02",
"password": "Welcome@1"
}
data1 = [(4, "Alice Smith", 35),
(5, "Michael Johnson", 45),
(6, "Emily Davis", 28)]
columns1 = ["id", "name", "age"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [("Alice Smith", "[email protected]"),
("Michael Johnson", "[email protected]"),
("Emily Davis", "[email protected]")]
columns2 = ["name", "email"]
df2 = spark.createDataFrame(data2, columns2)
data3 = [("[email protected]", "San Francisco"),
("[email protected]", "Seattle"),
("[email protected]", "Boston")]
columns3 = ["email", "city"]
df3 = spark.createDataFrame(data3, columns3)
df1.createOrReplaceTempView("people")
df2.createOrReplaceTempView("contacts")
df3.createOrReplaceTempView("locations")
def exportToSql(spark, dbtable, sqltable):
try:
print(f"{dbtable}, start time: {datetime.datetime.now()}")
df = spark.table(dbtable)
df.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", sqltable)
.options(**connectionProperties)
.mode("overwrite")
.save()
print(f"{dbtable}, export completed: {datetime.datetime.now()}")
except Exception as ex:
print(ex)
exportToSql(spark, 'people', 'dbtable1')
exportToSql(spark, 'contacts', 'dbtable2')
exportToSql(spark, 'locations', 'dbtable3')
In the above code I am creating temporary SQL views for these DataFrames
Defining a function (exportToSql)
to write the views to corresponding SQL Server tables using JDBC.
Finally,the data from Spark is exported into three SQL Server tables (dbtable1, dbtable2, dbtable3).
Results:
3