Im working in an backend solution that has: an api, a worker with the masstransit and rabbit mq configuration and a rabbitmq instance.
All of that services are mounted on a docker-compose. In production we have a load balancer that is going to create a ‘n’ amount of instances of my worker.
I have a process that have differents steps and im applying a retry logic for each step. Here is my saga configuration
public RequestWorkflowSaga()
{
InstanceState(x => x.CurrentState);
// Define the events that the saga will listen to.
Event(() => RequestCreatedEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Event(() => AuditRequestCreatedEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Event(() => RequestSentToProvenirEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Event(() => RequestProcessedEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Event(() => ResultSentEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Event(() => RequestFinishedEvent, e => e.CorrelateById(m => m.Message.RequestGuid));
Initially(
When(RequestCreatedEvent)
.Then(context =>
{
context.Saga.RequestId = context.Message.RequestId;
})
.TransitionTo(Auditing)
.Publish(context => new AuditRequestCommand(context.Message.RequestId, context.Message.RequestGuid)));
During(Auditing,
When(AuditRequestCreatedEvent)
.Then(context => context.Saga.RequestAudited = true)
.TransitionTo(SendingWithProvenir)
.Publish(context => new SendRequestCommand(context.Message.RequestId, context.Message.RequestGuid)));
During(SendingWithProvenir,
When(RequestSent)
.Then(context =>
{
context.Saga.RequestSentToProvenir = true;
context.Saga.RequestId = context.Message.RequestId;
})
.TransitionTo(Processing)
.Publish(context => new ProcessRequestCommand(context.Message.RequestId, context.Message.RequestGuid)));
During(Processing,
When(RequestProcessedEvent)
.Then(context => context.Saga.RequestProcessed = true)
.TransitionTo(SendigResult)
.Publish(context => new SendResultCommand(context.Message.RequestId, context.Message.RequestGuid)));
During(SendigResult,
When(ResultSentEvent)
.Then(context => context.Saga.RequestSentToExternalService = true)
.TransitionTo(Finished)
.Publish(context => new FinishRequestCommand(context.Message.RequestId, context.Message.RequestGuid)));
During(Finished,
When(RequestFinishedEvent)
.Then(context => context.Saga.RequestFinished = true)
.Finalize());
}
And here is my masstransit configuration
services.AddMassTransit(
x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(ApplicationAssemblyReference.Assembly);
x.AddConsumer<SendRequestToProvenirEventConsumer>(typeof(RetriesDefinition<SendRequestToProvenirEventConsumer>));
x.AddConsumer<SendResultConsumer>(typeof(RetriesDefinition<SendResultConsumer>));
x.AddSagaStateMachine<RequestWorkflowSaga, RequestWorkflowSagaData>()
.InMemoryRepository();
x.UsingRabbitMq(
(context, cfg) =>
{
cfg.Host(rabbitMqConnection, h => { h.ConfigureBatchPublish(b => b.Enabled = false); });
cfg.UseInMemoryOutbox(context);
cfg.ConfigureEndpoints(context);
}
);
}
);
The problem that im facing now is that if a create 2 instances of my worker and i published like 100 messages to be consumed my workers become crazy and start to consume messages from the other instance and the traceability is lost and some messages are not being complety consumed.
I’m 100% for sure that i’m missing some configuration so if you can help me i will be grateful.
That each instance of my worker can complety consume each of the messages that are in the queue.