I’m trying to create a process that listens to three COM ports and process the data in parallel. The problem is that I have a machine that sends an ID and proceeds to do a test in one of the COM ports, when this one is finished, it goes to another test and other id number enter to do the first test. How can I deal with this kind of situation?
Machine sends ID –
First test –
Machine send ID (second test) -| Machine sends different id (first test) –
Second test -| First test –
I’ve tried to use asyncio, aioserial for the com ports.
Here’s the code so far.
async def read_motor_id(self, motor_serial=1):
motor_id = None
while motor_id == None:
# id_data = await motor_serial.read_until_async(b'n')
# motor_id = id_data.decode().strip()
motor_id = input("Digite o ID do motor: ")
if motor_id:
print(f"Motor ID: {motor_id}")
await self.queue.put(motor_id)
# I had to put the class cause stack couldn't let me put at the top sorry
Class MotorTest()
def __init__(self):
self.tasks = []
self.queue = asyncio.Queue()
async def process_queue(self, oil_serial = 1, water_serial = 1):
while True:
motor_id = await self.queue.get()
await self.process_motor_id(motor_id, oil_serial, water_serial)
async def process_motor_id(self, motor_id, oil_serial = 1, water_serial = 1):
motor_type = motor_id[-2:] # OL ou AG
if motor_type == "OL":
self.tasks.append(motor_id[:-2])
await self.test_oil(motor_id, oil_serial)
elif motor_type == "AG":
if motor_id[:-2] in self.tasks:
await self.test_water(motor_id, water_serial)
self.tasks.remove(motor_id[:-2])
else:
print(f"Tarefa para motor {motor_id} não encontrada")
await self.read_motor_id()
async def test_oil(self, motor_id, oil_serial =1):
print(f"Iniciando teste de óleo para motor {motor_id}")
await asyncio.sleep(2) # Simulação do teste
print(f"Finalizando teste de óleo para motor {motor_id}")
# await oil_serial.read_async_until(aioserial.LF)
async def test_water(self, motor_id, water_serial =1):
print(f"Iniciando teste de água para motor {motor_id}")
await asyncio.sleep(2) # Simulação do teste
print(f"Finalizando teste de água para motor {motor_id}")
# await water_serial.read_async_until(aioserial.LF)
async def main()
# motor_serial = aioserial.AioSerial(port='COM1', baudrate=9600)
# oil_serial = aioserial.AioSerial(port='COM2', baudrate=9600)
# water_serial = aioserial.AioSerial(port='COM3', baudrate=9600)
motor_test = MotorTest()
async with asyncio.TaskGroup() as tg:
tg.create_task(motor_test.read_motor_id())
tg.create_task(motor_test.process_queue())
if __name__ == "__main__":
asyncio.run(main())
`
Reading the docs I think i got a provisory solution, here’s the code for it:
class MotorTest:
def __init__(self):
self.tasks = []
self.queue = asyncio.Queue()
async def read_motor_id(self, motor_serial=1):
motor_id = None
while motor_id == None:
# id_data = await motor_serial.read_until_async(b'n')
# motor_id = id_data.decode().strip()
motor_id = input("Digite o ID do motor: ")
if motor_id:
print(f"Motor ID: {motor_id}")
await self.queue.put(motor_id)
async def process_queue(self, oil_serial = 1, water_serial = 1):
while True:
motor_id = await self.queue.get()
asyncio.create_task(self.process_motor_id(motor_id, oil_serial, water_serial))
async def process_motor_id(self, motor_id, oil_serial = 1, water_serial = 1):
motor_type = motor_id[-2:] # OL ou AG
motor_clean = motor_id[:-2]
if motor_type == "OL":
if motor_clean not in self.tasks : self.tasks.append(motor_clean)
asyncio.create_task(self.test_oil(motor_id, oil_serial))
elif motor_type == "AG":
if motor_id[:-2] in self.tasks:
asyncio.create_task(self.test_water(motor_id, water_serial))
else:
print(f"Tarefa para motor {motor_id} não encontrada")
asyncio.run_coroutine_threadsafe(self.read_motor_id(), asyncio.get_event_loop())
async def test_oil(self, motor_id, oil_serial =1):
print(f"Iniciando teste de óleo para motor {motor_id}")
await asyncio.sleep(5) # Simulação do teste
print(f"Finalizando teste de óleo para motor {motor_id}")
# await oil_serial.read_async_until(aioserial.LF)
async def test_water(self, motor_id, water_serial =1):
print(f"Iniciando teste de água para motor {motor_id}")
await asyncio.sleep(5) # Simulação do teste
print(f"Finalizando teste de água para motor {motor_id}")
# await water_serial.read_async_until(aioserial.LF)
async def main():
# motor_serial = aioserial.AioSerial(port='COM1', baudrate=9600)
# oil_serial = aioserial.AioSerial(port='COM2', baudrate=9600)
# water_serial = aioserial.AioSerial(port='COM3', baudrate=9600)
motor_test = MotorTest()
async with asyncio.TaskGroup() as tg:
tg.create_task(motor_test.read_motor_id())
tg.create_task(motor_test.process_queue())
asyncio.run(main())
´
user26427062 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
4