aio_pika and rabbitMQ : message get treated but never gets published

Using rabbitMQ 3.12, python 3.8 and aio_pika 9.4.0, I have an architecture consisting of 9 different microservices that treat text. Each is in its own Docker container. Text is input in the main entrypoint of the API, sent to services, and all services send their response to a sink node that writes all in a MongoDB.

I need the result of one service (A) to flow to 2 other services (B and C), and then the sink. This is the case in two places.

In one of those instances, the first treatment (in A) is rather slow (up to 1ms) and creates larger messages than other services (but still under 1MB). With very few inputs (up to 100), things go though smoothly. However, when I start sending more than 1000 inputs to the system, in service A messages are consumed, and treated correctly (the result of the treatment is there), but do not get published to the exchange. They stay unacknowledged, and consumed over and over again.

In the other place, where all the services are fast, the publishing happens without problem.

Why, with the same code, in one case everything flow, and in the other, messages get unacked and never published ? It could be an issue of the next services (B and C)never ack’ing, but for the few messages that do make it to the next stage, they all get treated, acked and sent to the sink properly. If I only send a single message every ten seconds, no issues arise either

What can I do to get these treated texts published to the next services ?

The following class is what handles the consuming and publishing. It is copied into every Docker container so I do not have to re-write the same aio_pika code in every service.

import logging
from aio_pika import connect_robust, ExchangeType, Message, DeliveryMode
import json
from dotenv import load_dotenv
import os
import asyncio

load_dotenv()

user = os.environ["RABBITMQ_USER"]
pwd = os.environ["RABBITMQ_PWD"]

class AsyncPikaClient:

    def __init__(self, on_message_callback, i_consume_from, i_publish_to=None, max_concurrent_tasks=5):
        if on_message_callback :
            self.on_message = on_message_callback
        else :
            self.on_message = lambda x : x #identity function
        self.i_consume_from = i_consume_from
        self.i_publish_to = i_publish_to
        self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
        self.publish_queue = []
        self.consume_queue = None

    async def do_initialization(self):#, on_message_callback, i_consume_from, i_publish_to):
        self.connection = await connect_robust(f"amqp://{user}:{pwd}@rabbitmq:5672/")
        self.channel = await self.connection.channel()
        self.exchange = await self.channel.declare_exchange("direct", ExchangeType.DIRECT,)

        if self.i_consume_from:
            print("Initializing consume queue : ", self.i_consume_from)
            self.consume_queue = await self.channel.declare_queue(self.i_consume_from, durable=True)
            await self.consume_queue.bind(self.exchange, routing_key=self.i_consume_from)

        if self.i_publish_to:
            for name in self.i_publish_to :
                self.publish_queue.append(await self.channel.declare_queue(name, durable=True))
                await self.publish_queue[-1].bind(self.exchange, routing_key=name)
        print('AIO pika connection initialized, queues are :', self.i_consume_from, self.i_publish_to)


    async def consume(self):
        await self.consume_queue.consume(self.process_incoming_message, no_ack=False)   

    async def process_incoming_message(self, message):
        print(f"Got message on queue {self.i_consume_from}")
        body = message.body
        if body:
            try :
                mess_body = json.loads(body)
                try :
                    data = await self.on_message(mess_body)
                except :
                    print("Bug in treating the text !")
                print(f"Now the treatment is done in {self.i_consume_from}")
                await self.publish_all(data)
                print("Published !")
                await message.ack()
                print("Acked")
            except json.decoder.JSONDecodeError:
                print("Error : message has to be a JSON doc")
                await message.nack()
            except Exception as e:
                print(f"Unexpected error: {e}")
                await message.nack()
        else :
            print("Error : there is no body in the message")
            await message.nack()

    async def publish_all(self, message_body):
        if isinstance(message_body, list):
            publish_tasks = []
            for item in message_body :
                publish_tasks.extend([ self.publish(item, queue)
                    for queue in self.i_publish_to ])
        else :
            publish_tasks = [
                    self.publish(message_body, queue)
                    for queue in self.i_publish_to
                ]
        await asyncio.gather(*publish_tasks, return_exceptions=True)

    async def publish(self, message_body: dict, routing_key:str):
        """Method to publish message to RabbitMQ"""
        async with self.semaphore:
            message = Message(
                json.dumps(message_body).encode('utf-8'),
                delivery_mode=DeliveryMode.PERSISTENT,
                app_id=self.i_consume_from
            )
            print(len(json.dumps(message_body).encode('utf-8')))
            try :
                print(len(message))
            except :
                print("")
            print(f"Message ready, trying to publish to queue {routing_key}") #everything prints to here
            await self.exchange.publish(message, routing_key=routing_key) #this does not happen
        print(f"Published to exchange using routing key : ", routing_key) #this never gets printed

And the code in any given service :

import asyncio
from src.client_pika import AsyncPikaClient

async def counts_on_udpipe_annotations(received: Dict) -> None:
    #some treatment
    return result

    
if __name__ == "__main__":
    pika_client = AsyncPikaClient(on_message_callback=counts_on_udpipe_annotations, i_consume_from="queueA", i_publish_to=["queueB", "queueC"])

    loop = asyncio.get_event_loop()

    loop.run_until_complete(pika_client.do_initialization())

    loop.create_task(pika_client.consume())
    loop.run_forever()

I’ve added the semaphore to see if the rate of incoming messages was the issue. It has done nothing.
Ack’ing at reception, before doing the treatment, results in a good 60% of messages never being published.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật