I need to pass a message from one microservice and receive it in another one via RabbitMQ. Acknowledgement is not required, but nice to have. I use package @golevelup/nestjs-rabbitmq
.
Here are the module settings:
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
// ...
RabbitMQModule.forRootAsync(RabbitMQModule, {
imports: [ ConfigModule ],
useFactory: (configService: ConfigService) => ({
exchanges: [
{
name: configService.get('RABBITMQ_EXCHANGE'),
type: 'topic',
createExchangeIfNotExists: true,
},
],
uri: `amqp://${configService.get<string>('RABBITMQ_DEFAULT_USER')}:${configService.get<string>('RABBITMQ_DEFAULT_PASS')}@${configService.get<string>('RABBITMQ_URI')}`,
connectionInitOptions: { wait: true },
channels: {
'channel-1': {
prefetchCount: 1,
default: true,
},
},
queues: [
{
name: configService.get('RABBITMQ_QUEUE'),
createQueueIfNotExists: true,
exchange: configService.get('RABBITMQ_EXCHANGE'),
routingKey: configService.get('RMQ_ROUTING_KEY_REDDIT_ANALYZE'),
channel: 'channel-1',
},
],
enableControllerDiscovery: true,
}),
inject: [ ConfigService ],
}),
],
controllers: [ AppController, QueueController ],
providers: [
//...
],
})
export class AppModule {}
Here is where I receive and process messages:
import { Controller } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { config } from 'dotenv';
config();
@Controller()
export class QueueController {
constructor(
private readonly configService: ConfigService,
private readonly amqpConnection: AmqpConnection,
) {}
@RabbitSubscribe({
exchange: process.env.RABBITMQ_EXCHANGE,
routingKey: process.env.RMQ_ROUTING_KEY_REDDIT_ANALYZE,
queue: process.env.RABBITMQ_QUEUE,
})
public async handleMessage(message: {}, amqpMsg: any) {
console.log(`Received message: ${JSON.stringify(message)}`);
console.log(JSON.stringify(amqpMsg, null, 2));
}
}
There is one message in the queue, and it is being received infinitely. I probably need to acknowledge the message.
How can I do this?
P.S. ChatGPT advices to do these (not working):
- Call
amqpMsg.ack()
. - Call
this.amqpConnection.channel.ack(amqpMsg)
.