I am trying to develop a program to insert tick level data for 300+ stocks received in a webstream dictionary format from a broker API.
I already have this program run successfully without trying to implement multiprocessing through concurrent.futures library.
But I face this big issue that various ticks are being missed to be captured in my mysql db as the ticks arrive at superfast speeds. I am now trying to solve this issue with multiprocessing(to which I am a newbie).
Below program does insert ticks but the queue size gets filled quickly and consequently ticks are being missed.
`<…OTHER IMPORTS…>
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import queue
import threading
<…OTHER CODES…>
def insert_ticks(tick):
#print(f”insert_ticks function called with tick: {len(tick)}”) # Debug statement
db = mysql.connector.connect(**db_config)
c = db.cursor()
if ‘exchange_timestamp’ not in tick:
print(f”Skipping tick with missing ‘exchange_timestamp’: {tick[‘instrument_token’]}”)
return
tok = "TOKEN" + str(tick['instrument_token'])
symbol = symbolLookup(instrument_df, [tick['instrument_token']])[0]
vals = [
tick['exchange_timestamp'],
tick['last_price'],
tick['last_traded_quantity'],
tick.get('total_buy_quantity', 0),
tick.get('total_sell_quantity', 0),
tick.get('volume_traded', 0),
symbol
]
token_query = f"""
INSERT INTO {tok} (ts, price, volume, total_buy_quantity, total_sell_quantity, volume_consecutive)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
price = VALUES(price),
volume = VALUES(volume),
total_buy_quantity = VALUES(total_buy_quantity),
total_sell_quantity = VALUES(total_sell_quantity),
volume_consecutive = VALUES(volume_consecutive)
"""
try:
c.execute(token_query, vals[:-1])
print(f'Tick inserted into - {symbol}')
except Exception as e:
print(f'Tick data error for - {tok}: {e}')
db.commit()
db.close()
def worker(data_queue, stop_event):
while not stop_event.is_set() or not data_queue.empty():
try:
tick = data_queue.get(timeout=0.1)
insert_ticks(tick)
except queue.Empty:
continue
if name == ‘main‘:
tokens = tokenLookup(instrument_df, tickers)
create_tables(tokens)
print(‘Tables created and new columns added’)
data_queue = queue.Queue(maxsize=90000) # Adjusted queue size
stop_event = threading.Event()
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
futures = [executor.submit(worker, data_queue, stop_event) for _ in range(cpu_count())]
def on_ticks(ws, ticks):
#print(f"Ticks received: {len(ticks)}")
for tick in ticks:
try:
data_queue.put(tick, timeout=0.1)
except queue.Full:
print(f"Queue is full. Skipping tick: {tick['instrument_token']}")
print(f"Ticks added to queue, current queue size: {data_queue.qsize()}")
def on_connect(ws, response):
ws.subscribe(tokens)
ws.set_mode(ws.MODE_FULL, tokens)
def on_error(ws, code, reason):
print(f"WebSocket error: {code} - {reason}")
kws = KiteTicker(key_secret[0], kite.access_token)
kws.on_ticks = on_ticks
kws.on_connect = on_connect
kws.on_error = on_error
is_connected = False
try:
while True:
now = datetime.datetime.now()
if is_time_between(8, 59, 23, 30):
if not is_connected:
print('Ticks started now...at {}'.format(now.strftime("%Y-%m-%d %H:%M:%S")))
kws.connect()
is_connected = True
else:
if is_connected:
kws.unsubscribe()
print('Exiting script...')
stop_event.set()
break
tm.sleep(1)
except KeyboardInterrupt:
stop_event.set()
print('Script interrupted and stopped.')
for future in as_completed(futures):
future.result()
print("All tasks completed.")
sys.exit()`
I have already built ticks insertion program that inserts ticks but misses at least 5-10 ticks per 10 seconds of time per stock. This is undesirable as I cannot build a decent analytical chart from this incomplete.
To overcome this issue, I am trying to use multiprocessing concepts with the concurrent.futures library.
My expectation is that I could capture(in the mysql table) at least 1 tick per second per stock for 500+ stocks.
shrinivas iyer is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.