I have a single worker in celery that uses sqlalchemy and takes an input table (in one db) and a output table (in another db) and it retrieves and transfers the data from one to the other.
I’d like to parallelize this across celery workers so that each worker is dealing with x/n-of-workers rows.
I know I could probably refactor this to split up the start/end rows that each worker would deal with, but is there a way to have the celery workers share the proxy.fetchmany() calls until they’re exhausted?
Here is some example code of it working in singular:
logging.info("Connecting to Database")
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": self.SOURCE_CONNECT_STRING})
source_eng = sqlalchemy.create_engine(connection_url)
source_connection = source_eng.connect()
# Select all rows
query = f"SELECT * FROM {sourceTable} WITH (NOLOCK)"
stmt = sqlalchemy.text(query)
proxy = source_connection.execution_options(stream_results=True).execute(stmt)
column_names = list(proxy.keys())
logging.info(column_names)
insert_stmt = f"""INSERT INTO {destTable} ({','.join(column_names)}) VALUES ({','.join(['?']*len(column_names))});"""
if ( hasIdentity ):
insert_stmt = f"""
SET IDENTITY_INSERT {destTable} ON;
SET NOCOUNT ON;
{insert_stmt}
SET NOCOUNT OFF;
SET IDENTITY_INSERT {destTable} OFF"""
completed = 0
while 'batch not empty': # equivalent of 'while True'
batch = proxy.fetchmany(self.INITIAL_BATCH_SIZE)
if not batch:
break
# Strip whitespace from batch
batch = [self._strip_tuple(i) for i in batch]
logging.info(f"Inserting {str(len(batch))} rows.")
with pyodbc.connect(self.DEST_CONNECT_STRING) as conn:
with conn.cursor() as cursor:
cursor.fast_executemany = True
# Send pyodbc call
logging.info(insert_stmt)
cursor.executemany(insert_stmt, list(batch))
conn.commit()
completed += len(batch)
logging.info(f"Completed inserting {completed} rows into {destTable}.")
proxy.close()