I can’t get the ack=’client’ to behave as expected. I think that a message consumed by the code below should nack it and send it to the DLQ. But no! The message just disappears. I guess that it is considered processed.If, on the other hand, I just throw an error, it does end up in the DLQ.
I want to ack it if all is well, and nack it if it does not work as expected.
Producer.py
import stomp
# STOMP server configuration
server = 'localhost'
port = 61613
username = 'artemis'
password = 'artemis'
destination = 'your.queue.name'
message = 'hello :-)'
# Create a connection to the STOMP server
conn = stomp.Connection([(server, port)])
conn.set_listener('', stomp.PrintingListener())
# Connect to the server
conn.connect(username, password, wait=True)
# Send a durable message to the specified destination
conn.send(destination=destination, body=message, headers={'persistent': 'true'})
# Disconnect from the server
conn.disconnect()
Consumer.py
import stomp
import time
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('Received an error:')
print(f'Headers: {frame.headers}')
print(f'Body: {frame.body}')
def on_message(self, frame):
print('Received a message:')
print(f'Headers: {frame.headers}')
print(f'Body: {frame.body}')
time.sleep(1)
#self.conn.ack(frame.headers['message-id'], frame.headers['subscription'])
# This line ha no effect 4 what I can tell.
self.conn.nack(frame.headers['message-id'], frame.headers['subscription'])
#error will send it DLQ!
#raise Exception("I will send this message to DLQ!")
print('Done!!!')
def main():
server = 'localhost'
port = 61613
username = 'artemis'
password = 'artemis'
destination = 'your.queue.name'
conn = stomp.Connection([(server, port)], heartbeats=(4000, 4000))
conn.set_listener('', MyListener(conn))
try:
conn.connect(username, password, wait=True)
except Exception as e:
print(f'Failed to connect to STOMP server: {e}')
return
try:
# Ack modes:
# stuff I tried:
# client-individual
# https://jasonrbriggs.github.io/stomp.py/api.html
# client
# https://activemq.apache.org/components/artemis/documentation/1.4.0/pre-acknowledge.html
# CLIENT_ACKNOWLEDGE
conn.subscribe(destination=destination, id='1', ack='client', headers={'subscription-type': 'ANYCAST'})
except Exception as e:
print(f'Failed to subscribe to queue: {e}')
conn.disconnect()
return
print('Waiting for messages...')
# Keep the main thread alive to receive messages
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('Disconnecting...')
conn.disconnect()
except Exception as e:
print(f'An unexpected error occurred: {e}')
conn.disconnect()
if __name__ == '__main__':
main()
broker.xml
<addresses>
<address name="your.queue.name">
<anycast>
<queue name="your.queue.name" />
</anycast>
<dead-letter-address>DLQ.your.queue.name</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>5000</redelivery-delay> <!-- 5 seconds delay before redelivery -->
<max-delivery-attempts>1</max-delivery-attempts> <!-- Max attempts before moving to DLQ -->
<redelivery-delay-multiplier>2.0</redelivery-delay-multiplier> <!-- Exponential backoff -->
<max-redelivery-delay>60000</max-redelivery-delay> <!-- Max delay of 1 minute -->
</address>
<address name="DLQ.your.queue.name">
<anycast>
<queue name="DLQ.your.queue.name" />
</anycast>
</address>
</addresses>