This is my code, it is a class which is used to obtain near 70,000 rows from a mysql server.
class Loader:
"""Load a table with multithreading"""
def __init__(self, table: str, connector: Connector, fields: str=None, chunk_size: int=1000, query: str=None):
# Config the table information
self.fields = ['all' if fields is None else fields] # Query all the fields if user do NOT indicate.
self.table = table
self.conn = connector.conn
self.cursor = connector.cursor
self.cursor.execute(f"""SELECT COUNT(*) FROM {table}""")
self.table_size = self.cursor.fetchone()[0]
self.chunk_size = chunk_size
self.num_chunks = self.table_size // self.chunk_size + (1 if self.table_size % self.chunk_size > 0 else 0)
self.query = query # user's query
def load_large_data(self):
all_data = pd.DataFrame() # Store all the data from SQL
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [] # tasks queue
# Commit multithreading tasks
for i in range(self.num_chunks):
offset = i * self.chunk_size
futures.append(executor.submit(self.read_chunked_data, offset))
# Collect the result after all the tasks are completed.
for future in concurrent.futures.as_completed(futures):
data_chunk = future.result()
all_data = pd.concat([all_data, data_chunk], ignore_index=True) # Merge tasks' data.
return all_data
def read_chunked_data(self, offset):
# Execute embedded query either user's modified query.
if self.query is None:
if self.fields == 'all':
query = f"""SELECT * FROM {self.table} LIMIT {self.chunk_size} OFFSET {offset}"""
else:
fields_string = ','.join(self.fields)
query = f"""SELECT {fields_string} FROM {self.table} LIMIT {self.chunk_size} OFFSET {offset}"""
else: # In case of 'WHERE' etc. is added by user.
query = f"""{self.query} LIMIT {self.chunk_size} OFFSET {offset}"""
return pd.read_sql(query, self.conn)
And the ERROR is:
Traceback (most recent call last):
File "D:ApplicationsAnacondaLibsite-packagespandasiosql.py", line 2674, in execute
cur.execute(sql, *args)
File "D:ApplicationsAnacondaLibsite-packagespymysqlcursors.py", line 153, in execute
result = self._query(query)
^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespymysqlcursors.py", line 322, in _query
conn.query(q)
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 563, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 825, in _read_query_result
result.read()
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 1199, in read
first_packet = self.connection._read_packet()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 753, in _read_packet
raise err.OperationalError(
pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "D:ApplicationsAnacondaLibsite-packagespandasiosql.py", line 2678, in execute
self.con.rollback()
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 492, in rollback
self._execute_command(COMMAND.COM_QUERY, "ROLLBACK")
File "D:ApplicationsAnacondaLibsite-packagespymysqlconnections.py", line 843, in _execute_command
raise err.InterfaceError(0, "")
pymysql.err.InterfaceError: (0, '')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:UserswhalePycharmProjectsRisk Controlriskcontroltest.py", line 33, in <module>
result = loader.load_large_data()
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:UserswhalePycharmProjectsRisk Controlriskcontroldatabase.py", line 96, in load_large_data
data_chunk = future.result()
^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibconcurrentfutures_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibconcurrentfutures_base.py", line 401, in __get_result
raise self._exception
File "D:ApplicationsAnacondaLibconcurrentfuturesthread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:UserswhalePycharmProjectsRisk Controlriskcontroldatabase.py", line 112, in read_chunked_data
return pd.read_sql(query, self.conn)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespandasiosql.py", line 706, in read_sql
return pandas_sql.read_query(
^^^^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespandasiosql.py", line 2738, in read_query
cursor = self.execute(sql, params)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:ApplicationsAnacondaLibsite-packagespandasiosql.py", line 2683, in execute
raise ex from inner_exc
pandas.errors.DatabaseError: Execution failed on sql: SELECT CONTENT FROM tb_customer_info LIMIT 1000 OFFSET 2000
(2013, 'Lost connection to MySQL server during query')
unable to rollback
I suspected that the mysql server couldn’t bear the query of large data. But when I alternated the ‘LIMIT’ as ‘10,000’, my code and mysql server was able to obtain the whole rows at only one query.
It make me confused, does there anybody know how to fix it? I can’t get any information avaliable with it on the Google and from chatGPT.
9