Short question:
I’m publishing a message to a dead letter queue with specific headers in x-dead
. When the message is consumed by the dead letter consumers, the headers are gone. Why is this?
Long question:
I have an exchange with a dead letter queue:
def create_dlx(channel):
channel.exchange_declare(exchange=f"{EXCHANGE_NAME}.dlx", exchange_type='fanout')
channel.queue_declare(DLX_QUEUE)
channel.queue_bind(exchange=f"{EXCHANGE_NAME}.dlx", queue=DLX_QUEUE)
def create_exchange(channel):
args = {
"x-dead-letter-exchange": f"{EXCHANGE_NAME}.dlx"
}
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='topic', arguments=args)
# some queues with specific routing keys are declared here
This is the dlx consumer:
def get_dlx_channel():
print(f"Consumer: Starting connection to DLX {EXCHANGE_NAME}.dlx on queue {DLX_QUEUE}")
connection = pika.BlockingConnection(rabbitmq_params)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=DLX_QUEUE, on_message_callback=dlx_callback)
return channel, connection
def dlx_callback(ch, method, properties, body):
original_routing_key = properties.headers['x-death'][0]['routing-keys'][0]
request_key = original_routing_key
failure_reason = 'Dead Letter'
print(f'{str(datetime.now())} - {failure_reason} {request_key}')
ch.basic_ack(delivery_tag=method.delivery_tag)
Currently the setup works. If a consumer rejects or nacks a message, it is correctly routed to the dlx queue.
Now I’m trying to make the rejections more informative. A lot of my consumers have the logic:
try:
do_thing()
ch.basic_ack(delivery_tag=method.delivery_tag)
send_response()
except:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
I want to update this so the rejected message is sent to the dlx queue with an error reason. In the except block, I do this:
def get_reject_properties(method, reason):
properties = pika.BasicProperties(
headers={
'x-death': [{
'reason': 'rejected',
'queue': method.routing_key,
'time': int(time.time()),
'exchange': method.exchange,
'routing-keys': [method.routing_key]
}],
'x-rejection-reason': reason
}
)
return properties
reject_properties = get_reject_properties(method, reason)
ch.basic_publish(
exchange=f"{EXCHANGE_NAME}.dlx",
routing_key=method.routing_key,
body=body,
properties=reject_properties
)
This creates an error on the consumer side in the line properties.headers['x-death'][0]['routing-keys'][0]
. When the message is received, properties.headers['x-death']
is an empty list even though the x-death
headers are explicitly specified in the message properties when publishing.
Why is this? How can I send a message to the dlx queue with the original message header in x-death
and an x-rejection-reason
header?