I am relatively new to Python and this is the usecase i am trying to solve as part of the learning.
There will be multiple threads calling a callback function. If the event is not set, the callbacks just get queued. And once the event is set, we must first call all the pending callbacks that got queued. If the event is set, there is no need to queue the callback but just execute the func directly.
This is more like a multiple producer single consumer problem. I tried solving it with a single lock and multiple locks but that did not work as we need a shared pipeline. So, I am trying to use queues. Also, since this is FIFO queue, the callbacks would get executed in the order it was inserted.
This is what I came up with:
import queue
import threading
import concurrent.futures
import time
pipeline = queue.Queue()
class EventWrapper():
def __init__(self, ev):
self._ev = ev
def set(self):
self._ev.set()
consumer()
def __getattr__(self, name):
# return default for other methods.
return getattr(self._ev, name)
def reg_cb():
print("Hey, I am a cb")
def producer(myEvent, i):
if not myEvent.is_set():
print("Thread %d registered cb" %i)
pipeline.put(reg_cb)
else:
print("Event is already set, so I am directly calling cb")
reg_cb()
def consumer():
print("Event is now set")
while not pipeline.empty():
func = pipeline.get()
func()
def main():
myEvent = EventWrapper(threading.Event())
threads = []
for i in range(15):
t = threading.Thread(target=producer, args=(myEvent, i,))
t.start()
time.sleep(1)
threads.append(t)
if i == 5:
# call's EventWrapper's set() method.
myEvent.set()
for t in threads:
t.join()
if __name__ == "__main__":
main()
It solves the basic case.
-
But i know, i am missing a lot of cases. what all am i missing (considering deadlock) ?
-
How to pass args to reg_cb method, something like pipeline.put(reg_cb, args) and let reg_cb do some stuff?
-
How can i improve the design ?
-
Let’s say – a lot of callbacks from multiple threads come, we checked if not myEvent.is_set() in producer but event got set exactly after the check, this behavior is not describable.
-
Would there be any possibility of a thread registering callback twice ?
-
In case if i want to ensure the callbacks don’t just wait excessively wait in the queue, i could have a queue size limit. Would that just be enough ?
Sorry, if the questions or something is not clear, will keep updating the question based on the responses, thank you.