I am new to RabbitMQ and MassTransit. I have a RabbitMQ producer that successfully publishes messages to a queue named bot_notification_queue. However, when I start my consumer using MassTransit, the existing queue gets renamed to bot_notification_queue_skipped, and a new bot_notification_queue is created.
Producer Code:
public class BasicProducer : IDisposable
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogger _logger;
private bool _disposed = false;
public BasicProducer(ILogger logger, string hostName, string username, string password)
{
try
{
_logger = logger;
_factory = new ConnectionFactory()
{
HostName = hostName,
UserName = username,
Password = password
};
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create RabbitMQ connection and channel.");
throw;
}
}
public void PublishMessage<T>(T message, QueueOptions queueOptions)
{
try
{
DeclareQueue(queueOptions);
var jsonString = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(jsonString);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.ContentType = "application/json";
_channel.BasicPublish(exchange: "", routingKey: queueOptions.Name, basicProperties: properties, body: body);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to publish message to queue {queueOptions.Name}");
throw;
}
}
private void DeclareQueue(QueueOptions options)
{
_channel.QueueDeclare(queue: options.Name,
durable: options.Durable,
exclusive: options.Exclusive,
autoDelete: options.AutoDelete,
arguments: options.Arguments);
}
public void Dispose()
{
try
{
if (!_disposed)
{
_channel?.Close();
_connection?.Close();
_channel?.Dispose();
_connection?.Dispose();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to dispose RabbitMQ connection and channel.");
throw;
}
_disposed = true;
}
}
How the message produced:
static void Main(string[] args)
{
var producer = new BasicProducer(_logger, "localhost", "username", "passoword");
var options = new QueueOptions("bot_notification_queue")
{
Durable = true,
Arguments = null,
Exclusive = false,
AutoDelete = false,
};
var view = new PostNotificationViewModel
{
Message = "hello world",
PhoneNumber = "test number",
};
producer.PublishMessage(view, options);
Console.ReadLine();
}
Consumer Code:
builder.Services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<BotNotificationMassConsumer>();
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
busFactoryConfigurator.Host("localhost", h =>
{
h.Username("username");
h.Password("password");
});
busFactoryConfigurator.ReceiveEndpoint("bot_notification_queue", e =>
{
e.ConfigureConsumer<BotNotificationMassConsumer>(context);
});
});
});
I am using .net 8, RabbitMQ.Client version is 6.8.1,
MassTransit version is 8.2.3
MassTransit.RabbitMQ version is 8.2.3