Overview
I need to update the data in a database owned by me with data which is fetched from a Snowflake server DB. I fetch data for a particular date from Snowflake and need to insert that in a local database for that date. This operation currently takes 273 s and I need to optimize it as much as possible. All of this is done in Python using the pyodbc
library for insertion in the local database and sqlalchemy
to fetch the data from Snowflake.
Detailed Process
I am using cProfile
in combination with Snakeviz
to profile and visualize the performance of my operation which runs for a particular date called update_snf
. The whole process consists of the following steps:
- Creating a connection and running an SQL query to fetch data from the Snowflake server. Call this operation
get_data_from_snf
. It is taking 111 s to perform this operation with the majority (108 s) of the time taken by thePandas.read_sql(query, connection)
function call (defined inpandas/io/sql.py
) which executes the query and returns the result in a dataframedf
. - Formatting
df
, which takes relatively less time (17.8 s) - Inserting the formatted dataframe
df
in my local database (call this functioninsert_data_into_db
). This operation takes the majority of the time (143 s out of which 93.1 s is taken by theexecutemany
method of thepyodbc.Cursor
object and 43.3 s by theexecute
method). Let us call the table where this insertion happenstable
.table
is a fairly large table with approximately 150 million tuples and 130 columns.
I am trying to reduce the time for this entire operation in any way possible, by optimizing any or even all of the steps. df
is expected to contain around 200,000 rows for each date.
Current Attempt
Fairly basic Python APIs are being used and no multiprocessing or multithreading is being used at the moment. The insertion of data in a very large table
was taking unacceptable amount of time so an alternative is being used currently which implements insert_data_into_db
as follows:
- A new table is created called
table_temp
(this table already exists in the database; it is not created afresh every time the function is run) cursor.execute
is used to execute the following SQL query:
TRUNCATE TABLE table_temp
DELETE FROM table where Date = {date}
The delete operation is necessary to ensure that only the updated entries from Snowflake are present in the local server and there is no duplication. It basically removes outdated data before fresh data can be inserted back.
-
df
is split into chunks of 18000 rows andcursor.fast_executemany
is set toTrue
. We now usecursor.executemany(insert_query, df_chunk)
to insert each of the chunks (a list of tuples) one-by-one intotable_temp
. -
We now insert the entire
table_temp
(which contains all the entries for the particular date from Snowflake) intotable
using:
INSERT INTO table
SELECT * FROM table_temp
So instead of inserting data directly into table
, we first insert it in chunks into a temporary table and then insert the entire temporary table into table
.
The questions in this current approach are:
- Is there any way to avoid this chunking approach and directly insert into
table
which doesn’t take too much time? It seems like the time taken for this direct insertion increases astable
becomes larger and at the current size of 150 million rows, it seems to take too much time. - Can we use parallelism in the form of multiprocessing or multithreading anywhere in this approach?
- Can we use streaming somewhere over here? If yes, how?
- Can we speed up the fetching of data from Snowflake into a Dataframe in memory before inserting the Dataframe into the database? Is there any way to avoid this altogether?
tl;dr
I want to reduce the time it takes to fetch data from a DB on Snowflake and insert it into a local DB.
I am using Python 3.11 on a Windows 8 server using SQL Server. Any optimizations are welcome! Thanks a lot!
5