I’m using NATS JetStream to publish and consume messages in a .NET 8 application using NATS.Net. While publishing 10 messages, my consumer processes each message twice, despite calling msg.AckAsync()
.Interestingly, changing the MaxDeliver
setting results in messages being processed more times accordingly. If I don’t set MaxDeliver
then message get processed only once. Below is my code for both publishing and consuming messages:
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.Serializers.Json;
using System.Buffers;
using System.Text;
using System.Text.Json;
try
{
int count = 0;
NatsAuthOpts authOpts = new NatsAuthOpts();
var natsOpts = NatsOpts.Default with
{
//SerializerRegistry = new MyProtoBufSerializerRegistry(),
SerializerRegistry = NatsJsonSerializerRegistry.Default,
Url = "nats://localhost:53810",
AuthOpts = authOpts
};
string subject = "test.new.*";
string streamName = "sample-new";
await using var nats = new NatsConnection(natsOpts);
var js = new NatsJSContext(nats);
await js.CreateStreamAsync(new StreamConfig(name: streamName, subjects: new[] { subject }));
// Publish new order messages
for (var i = 0; i < 10; i++)
{
var message = new SampleEvent("John.Doe", "Sample Message", DateTime.Now.ToString(), i.ToString());
var ack = await js.PublishAsync("test.new.first", message);
ack.EnsureSuccess();
}
List<long> backOff = new List<long> { 10000 };
ConsumerConfig consumerConfig = new ConsumerConfig(name: "first-new-consumer");
consumerConfig.DeliverPolicy = ConsumerConfigDeliverPolicy.All;
consumerConfig.MaxDeliver = 2;
consumerConfig.Backoff = backOff;
var consumer = await js.CreateOrUpdateConsumerAsync(stream: streamName, consumerConfig);
await foreach (var msg in consumer.ConsumeAsync<SampleEvent>())
{
var order = msg.Data;
Console.WriteLine($"Processing {msg.Data} {order.ToString()}...");
await msg.AckAsync();
Console.WriteLine(++count);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
public class SampleEvent
{
public SampleEvent()
{
}
public SampleEvent(string userName, string message, string utcTime, string count)
{
UserName = userName;
Message = message;
UtcTime = utcTime;
Count = count;
}
public string UserName { get; set; }
public string Message { get; set; }
public string UtcTime { get; set; }
public string Count { get; set; }
public override string ToString()
{
return JsonSerializer.Serialize(this);
}
}
I expect each message to be processed only once by the consumer, regardless of the MaxDeliver setting.If a record is not acknowledged only then that record should get processed again.