how can I Utilize multiple consumer thread from one consumer application, do I just use threads
or there is a better way
this is my code
this is my main function i just call 3 threads and give the the function ConsumeMessages
but it end up using just one thread
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
Order is Being Processed, Thread Id: 5
<pre>
static void Main(string[] args)
{
Console.WriteLine("Starting");
// Define the number of consumers (threads) you want
int numberOfConsumers = 3;
for (int i = 0; i < numberOfConsumers; i++)
{
// Start a new thread for each consumer
var thread = new Thread(new ThreadStart(ConsumeMessages));
thread.Start();
}
var waitHandle = new ManualResetEvent(false);
waitHandle.WaitOne();
}
</pre>
This is my function, ConsumeMessages. I literally do everything in it. I create a new connection and channel for each thread. Initially, I wanted to have the same connection but have a channel for each thread, but this also doesn’t work, as the work is done by just one thread. I also set the prefetch to 1 so that each thread would consume one message at a time
<pre>
private static void ConsumeMessages()
{
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.BasicQos(0, 1, false); // Set the prefetch count to 1 for each consumer
Console.WriteLine($"Create channel, Thread Id: {Thread.CurrentThread.ManagedThreadId}");
channel.QueueDeclare(queue: "MS",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine($"Create consumer, Thread Id: {Thread.CurrentThread.ManagedThreadId}");
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderInfo = JsonSerializer.Deserialize<OrderDto>(message);
Console.WriteLine($"Order is Being Processed, Thread Id: {Thread.CurrentThread.ManagedThreadId}");
await OrderCheckOut(orderInfo!);
Console.WriteLine($"Order Processed");
};
channel.BasicConsume(queue: "MS", autoAck: true, consumer: consumer);
}
</pre>
The above function simply calls another function to update the order. Additionally, I have a problem here where I need to create a new instance of the DbContext because it gets disposed each time.”
<pre>
{
private static async Task OrderCheckOut(OrderDto dto)
{
var builder = new DbContextOptionsBuilder<MSContext>();
builder.UseSqlServer("data source=.;integrated security=SSPI;initial catalog=MSDatabase;trustservercertificate=True");
//for docker
// builder.UseSqlServer("Server=sqlserverhost;Database=msDatabase;User=sa;Password=Admin@123;TrustServerCertificate=true");
using var _context = new MSContext(builder.Options);
var orderRepository = new OrderRepository(_context);
var order = await orderRepository.GetOrderByIdAsync(dto.Id);
if (order is null)
return;
order.Status = OrderStatus.Processing;
order.UpdatedAt = DateTime.Now;
order.Address = order.Address;
order.TotalAmount = order.OrderProducts.Sum(x => x.Quantity * x.UnitPrice);
order = await orderRepository.UpdateAsync(order);
}
</pre>
Mohamed Warda is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.