I would like to implement a handle function to receive and share msg to the main thread for further processing. The latency between the receive time on the handler and the main thread is quite large. I have tried collection.deque and manager.Queue as the buffer but they look the same. The implementation is like this:
def handler(msg):
global msgBuff
dict_msg=pickle.loads(msg.data)
timer=time.time()
print("handler latency",timer-dict_msg['timestamp'])
msgBuff.append((dict_msg,time.time()))
if __name__ == "__main__":
global msgBuff
#manager = multiprocessing.Manager()
#msgBuff = manager.Queue()
msgBuff=collections.deque()
msg_receiver(handler) # a multiprocessing function to receive message
while 1:
if len(msgBuff)==0:
time.sleep(0.001)
continue
dict_msg,rec_time=msgBuff.popleft()
print("main latency",time.time()-dict_msg['timestamp'],time.time()-rec_time)
And here is the output
handler latency 0.008012533187866211
main latency 0.022023439407348633 0.014010906219482422
handler latency 0.007892608642578125
main latency 0.023406028747558594 0.015513420104980469
handler latency 0.007999897003173828
main latency 0.02299976348876953 0.013998270034790039
handler latency 0.006999969482421875
main latency 0.02259969711303711 0.015599727630615234
handler latency 0.009000301361083984
Any suggestion of improvement?
2
You showed us that elapsed time at certain stages
of processing is roughly 7, 14, or 21 msec.
It appears that serializing / unserializing your data costs about 7 msec,
and you do that more than once.
Shrinking the size of msg.data
would allow for faster {ser,deser}ialize.
Consider passing around a data filename rather than the raw data.
Pickle() can sometimes pull in more dependencies than you were expecting.
Consider using an alternate format, perhaps JSON or CSV, that lets
you conveniently examine data to verify it’s the amount you expect.
Serializing and deserializing costs time.
Consider using a binary format like pyarrow.
It’s even possible to accomplish a zero-copy handoff
to next stage in the pipeline.
1