Trying to use consume/publish filters for logging purposes and everything was going smooth until I came across a microservice that uses batch consumers, where it will throw a nullreferenceexception if a batch consumer is registered.
Here’s how our MassTransit is configured:
services.AddMassTransit(config =>
{
config.AddConsumers(typeof(MyNonBatchConsumer).Assembly);
config.AddBus(provider => Bus.Factory.CreateUsingAmazonSqs(amazonConfig =>
{
//omitted for brevity
DependencyInjectionFilterExtensions.UsePublishFilter(amazonConfig, typeof(LoggingPublishFilter<>), provider);
DependencyInjectionFilterExtensions.UseConsumeFilter(amazonConfig, typeof(LoggingConsumeFilter<>), provider);
amazonConfig.ReceiveEndpoint($"{typeof(Startup).Namespace?.Replace(".", "_")}_{environment.EnvironmentName}", receiveEndpointConfig =>
{
//tried registering the filter here as well, didn't work
receiveEndpointConfig.ConfigureConsumer<MyNonBatchConsumer>(provider);
//Batch consumer
receiveEndpointConfig.ConfigureConsumer<MyBatchConsumer>(provider);
});
}));
});
services.AddScoped(typeof(LoggingPublishFilter<>));
services.AddScoped(typeof(LoggingConsumeFilter<>));
This is how our batch handler looks like:
public class MyBatchConsumerHandler : IIntegrationEventBatchHandler<MyBatchConsumer>
{
private readonly IMediator _mediator;
public MyBatchConsumerHandler(IMediator mediator)
{
_mediator = mediator;
}
public async Task Consume(ConsumeContext<Batch<MyBatchConsumer>> context)
{
//removed for brevity
}
Exception seems to be thrown from the ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> configurator)
method of ScopedConsumerConsumePipeSpecificationObserver
class, because configurator is null for batch handlers.
For the record, both the consume filter and the batch consumer work fine separately, just can’t get them to work together.