Probably I’m missing a whole design concept about Saga states, MassTransit’s sagas, or distributed systems in general. Anyway, sorry if the question is dumb
For instance, I have a simple State Machine with three steps. Using MongoDb as a persistence
Program
using MassTransit;
using Mb.Pipelines;
using Mb.Pipelines.Saga;
using MongoDB.Driver;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var connectionString = builder.Configuration
.GetConnectionString("MongoDb");
builder.Services.AddSingleton<IMongoClient>(_ => new MongoClient(connectionString));
builder.Services.AddSingleton<IMongoDatabase>(
provider => provider
.GetRequiredService<IMongoClient>()
.GetDatabase("Test")
);
builder.Services.AddMassTransit(busConfig =>
{
busConfig.SetKebabCaseEndpointNameFormatter();
busConfig
.AddSagaStateMachine<MySaga, MyState>()
.MongoDbRepository(repository =>
{
repository.Connection = connectionString;
});
busConfig.AddConsumer<FirstStepConsumer, FirstStepConsumer.Definition>();
busConfig.AddConsumer<SecondStepConsumer, SecondStepConsumer.Definition>();
busConfig.AddConsumer<ThirdStepConsumer, ThirdStepConsumer.Definition>();
busConfig.UsingInMemory((context, memoryBusConfig) =>
{
memoryBusConfig.ConfigureEndpoints(context);
});
});
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.MapPost("/test-saga", async (IBus bus) =>
{
await bus.Publish(new StartFirstStep { Id = Guid.Parse("8C117470-1F40-4711-990C-395612745F2B") });
Results.Ok();
})
.WithOpenApi();
app.Run();
public class MySaga : MassTransitStateMachine<MyState>
{
public State FirstStep { get; private set; }
public State SecondStep { get; private set; }
public State ThirdStep { get; private set; }
public Event<FirstStepCompleted> FirstStepCompleted { get; set; }
public Event<SecondStepCompleted> SecondStepCompleted { get; set; }
public Event<ThirdStepCompleted> ThirdStepCompleted { get; set; }
public MySaga(ILogger<MySaga> logger)
{
InstanceState(data => data.CurrentState);
Event(() => FirstStepCompleted, cfg => cfg.CorrelateById(context => context.Message.Id));
Event(() => SecondStepCompleted, cfg => cfg.CorrelateById(context => context.Message.Id));
Event(() => ThirdStepCompleted, cfg => cfg.CorrelateById(context => context.Message.Id));
Initially(
When(FirstStepCompleted)
.Then(context =>
{
logger.LogInformation("Step1 called");
context.Saga.FirstStepCompleted = true;
})
.Publish(context => new StartSecondStep
{
Id = context.Message.Id
})
.TransitionTo(SecondStep)
);
During(SecondStep,
When(SecondStepCompleted)
.Then(context =>
{
logger.LogInformation("Step2 called");
context.Saga.SecondStepCompleted = true;
})
.Publish(context => new StartThirdStep
{
Id = context.Message.Id
})
.TransitionTo(ThirdStep)
);
During(ThirdStep,
When(ThirdStepCompleted)
.Then(context =>
{
logger.LogInformation("Step3 called");
context.Saga.ThirdStepCompleted = true;
})
.Finalize()
);
}
}
FirstStepConsumer
public class FirstStepConsumer : IConsumer<StartFirstStep>
{
private readonly ILogger<FirstStepConsumer> _logger;
public FirstStepConsumer(ILogger<FirstStepConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<StartFirstStep> context)
{
_logger.LogInformation("FirstStepConsumer consumed");
await context.Publish(new FirstStepCompleted
{
Id = context.Message.Id
});
}
}
SecondStepConsumer
public class SecondStepConsumer : IConsumer<StartSecondStep>
{
private readonly ILogger<SecondStepConsumer> _logger;
public SecondStepConsumer(ILogger<SecondStepConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<StartSecondStep> context)
{
_logger.LogInformation("SecondStepConsumer consumed");
// Service was restarted during consuming
await context.Publish<SecondStepCompleted>(new SecondStepCompleted
{
Id = context.Message.Id
});
}
}
ThirdStepConsumer
public class ThirdStepConsumer : IConsumer<StartThirdStep>
{
private readonly ILogger<ThirdStepConsumer> _logger;
public ThirdStepConsumer(ILogger<ThirdStepConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<StartThirdStep> context)
{
_logger.LogInformation("ThirdStepConsumer consumed");
await context.Publish(new ThirdStepCompleted
{
Id = context.Message.Id
});
}
}
For testing needs, I’m launching the saga from the endpoint using StartFirstStep
command. The bus is in-memory
app.MapPost("/test-saga", async (IBus bus) =>
{
await bus.Publish(new StartFirstStep { Id = Guid.Parse("8C117470-1F40-4711-990C-395612745F2B") });
Results.Ok();
})
.WithOpenApi();
What I am actually trying to understand is how to proceed saga execution from the third step if the service crashes during the execution of it
In case just re-calling the endpoint with the same ID I receive the error
The FirstStepCompleted event is not handled during the ThirdStep state for the MySaga state machine
which is completely understandable since I’m sending StartFirstStep
command to launch saga
But how to configure saga in a way to be able to ignore the first steps since they were successfully completed and proceed immediately to the third once again?
As far as I understand, to continue from the needed state I have to send the Start[Needed]Step
command to the bus already having the current state of the saga OR, which seems to me more attractive, to configure it somehow in saga itself but I just do not know how.
Looks like I can use .If()
statement to check the current state of the saga manually and then make a transition to the needed state but I’m not sure if this is something that I need
So, to put it simply, how to restart saga from the last not-finished state?