Using rabbitMQ 3.12, python 3.8 and aio_pika 9.4.0, I have an architecture consisting of 9 different microservices that treat text. Each is in its own Docker container. Text is input in the main entrypoint of the API, sent to services, and all services send their response to a sink node that writes all in a MongoDB.
I need the result of one service (A) to flow to 2 other services (B and C), and then the sink. This is the case in two places.
In one of those instances, the first treatment (in A) is rather slow (up to 1ms) and creates larger messages than other services (but still under 1MB). With very few inputs (up to 100), things go though smoothly. However, when I start sending more than 1000 inputs to the system, in service A messages are consumed, and treated correctly (the result of the treatment is there), but do not get published to the exchange. They stay unacknowledged, and consumed over and over again.
In the other place, where all the services are fast, the publishing happens without problem.
Why, with the same code, in one case everything flow, and in the other, messages get unacked and never published ? It could be an issue of the next services (B and C)never ack’ing, but for the few messages that do make it to the next stage, they all get treated, acked and sent to the sink properly. If I only send a single message every ten seconds, no issues arise either
What can I do to get these treated texts published to the next services ?
The following class is what handles the consuming and publishing. It is copied into every Docker container so I do not have to re-write the same aio_pika code in every service.
import logging
from aio_pika import connect_robust, ExchangeType, Message, DeliveryMode
import json
from dotenv import load_dotenv
import os
import asyncio
load_dotenv()
user = os.environ["RABBITMQ_USER"]
pwd = os.environ["RABBITMQ_PWD"]
class AsyncPikaClient:
def __init__(self, on_message_callback, i_consume_from, i_publish_to=None, max_concurrent_tasks=5):
if on_message_callback :
self.on_message = on_message_callback
else :
self.on_message = lambda x : x #identity function
self.i_consume_from = i_consume_from
self.i_publish_to = i_publish_to
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.publish_queue = []
self.consume_queue = None
async def do_initialization(self):#, on_message_callback, i_consume_from, i_publish_to):
self.connection = await connect_robust(f"amqp://{user}:{pwd}@rabbitmq:5672/")
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange("direct", ExchangeType.DIRECT,)
if self.i_consume_from:
print("Initializing consume queue : ", self.i_consume_from)
self.consume_queue = await self.channel.declare_queue(self.i_consume_from, durable=True)
await self.consume_queue.bind(self.exchange, routing_key=self.i_consume_from)
if self.i_publish_to:
for name in self.i_publish_to :
self.publish_queue.append(await self.channel.declare_queue(name, durable=True))
await self.publish_queue[-1].bind(self.exchange, routing_key=name)
print('AIO pika connection initialized, queues are :', self.i_consume_from, self.i_publish_to)
async def consume(self):
await self.consume_queue.consume(self.process_incoming_message, no_ack=False)
async def process_incoming_message(self, message):
print(f"Got message on queue {self.i_consume_from}")
body = message.body
if body:
try :
mess_body = json.loads(body)
try :
data = await self.on_message(mess_body)
except :
print("Bug in treating the text !")
print(f"Now the treatment is done in {self.i_consume_from}")
await self.publish_all(data)
print("Published !")
await message.ack()
print("Acked")
except json.decoder.JSONDecodeError:
print("Error : message has to be a JSON doc")
await message.nack()
except Exception as e:
print(f"Unexpected error: {e}")
await message.nack()
else :
print("Error : there is no body in the message")
await message.nack()
async def publish_all(self, message_body):
if isinstance(message_body, list):
publish_tasks = []
for item in message_body :
publish_tasks.extend([ self.publish(item, queue)
for queue in self.i_publish_to ])
else :
publish_tasks = [
self.publish(message_body, queue)
for queue in self.i_publish_to
]
await asyncio.gather(*publish_tasks, return_exceptions=True)
async def publish(self, message_body: dict, routing_key:str):
"""Method to publish message to RabbitMQ"""
async with self.semaphore:
message = Message(
json.dumps(message_body).encode('utf-8'),
delivery_mode=DeliveryMode.PERSISTENT,
app_id=self.i_consume_from
)
print(len(json.dumps(message_body).encode('utf-8')))
try :
print(len(message))
except :
print("")
print(f"Message ready, trying to publish to queue {routing_key}") #everything prints to here
await self.exchange.publish(message, routing_key=routing_key) #this does not happen
print(f"Published to exchange using routing key : ", routing_key) #this never gets printed
And the code in any given service :
import asyncio
from src.client_pika import AsyncPikaClient
async def counts_on_udpipe_annotations(received: Dict) -> None:
#some treatment
return result
if __name__ == "__main__":
pika_client = AsyncPikaClient(on_message_callback=counts_on_udpipe_annotations, i_consume_from="queueA", i_publish_to=["queueB", "queueC"])
loop = asyncio.get_event_loop()
loop.run_until_complete(pika_client.do_initialization())
loop.create_task(pika_client.consume())
loop.run_forever()
I’ve added the semaphore to see if the rate of incoming messages was the issue. It has done nothing.
Ack’ing at reception, before doing the treatment, results in a good 60% of messages never being published.