I know this may be a trivial question, but I’m not sure what to do. I am using Azure Service Bus together with MassTransit.
I have two services. Service1 is a WebApi application, and Service2 is a couple of Azure Functions units that respond to service bus topics.
In Service1 I have a consumer for the TestMessage topic. If I notify the topic, it passes to the consumer fine. But when an exception occurs inside the consumer it is immediately notified by the Fault consumer.
Config:
services.AddMassTransit(o =>
{
o.AddConsumers(Assembly.GetExecutingAssembly());
o.UsingAzureServiceBus((context, cfg) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
cfg.Host(new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential()
});
cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageConsumer>(context);
});
cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageFaultConsumer>(context);
});
});
});
Consumer:
public class TestMessageConsumer : IConsumer<TestMessage>
{
public Task Consume(ConsumeContext<TestMessage> context)
{
throw new Exception();
return Task.CompletedTask;
}
}
public class TestMessageFaultConsumer : IConsumer<Fault<TestMessage>>
{
public Task Consume(ConsumeContext<Fault<TestMessage>> context)
{
return Task.CompletedTask;
}
}
However, if I send a message to the Azure Function in Service2 and an exception occurs, the call is repeated until the max delivery count (10) is reached. The Fault consumer in Service1 is then called that many times.
Config in service1:
if (!await adminClient.TopicExistsAsync(formatter.MessageName<TestMessage>()))
await adminClient.CreateTopicAsync(new CreateTopicOptions(formatter.MessageName<TestMessage>()));
if (!await adminClient.SubscriptionExistsAsync(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()))
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()));
Config in serivce2:
services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumer<TestMessageConsumer>();
}, "ServiceBusConnection", (context, configuration) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
var settings = new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential(),
};
configuration.Host(settings);
});
Function:
public Function(IMessageReceiver receiver)
{
_receiver = receiver;
}
[Microsoft.Azure.Functions.Worker.Function(nameof(Function))]
public async Task Run([Microsoft.Azure.Functions.Worker.ServiceBusTrigger(TopicName, SubscriptionName, Connection = Connection)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
await _receiver.HandleConsumer<TestMessageConsumer>(TopicName, SubscriptionName, message, cancellationToken);
}
I guess it’s not entirely clear to me the reason why it’s being called like this multiple times and I don’t really know what to do about it. I would expect that if I get a message and it falls into exception, the call is no longer made, because if it runs 10 times, I don’t know if I can duplicate the data in the db etc. It makes sense to me that it will run 10 times if the Azure function is unavailable, the message gets lost over the network, etc.
Thanks a lot