I have an implementation in .NET 8 and EF Core of inbox pattern. There is a Hangfire job to process inbox messages below.
This job executes every 30 seconds and I want to process ~100 messages per minute.
I’m fetching 50 messages in every execution and adding and updating some entities when a message processing. So the DbContext
change tracker is inflating after every message processed. Also, I’m worried about the execution time of the transaction increasing due to the use of serializable isolation level.
Are creating a DbContext
and beginning a transaction for each message be more efficient than this code?
Or how can I solve ChangeTracker
issues? I’m losing fetched 50 messages from ChangeTracker
if I clear it after ProcessByType
method.
public class InboxJob
{
private readonly InboxMessageRepository _inboxMessageRepository;
//and other repositories...
public InboxJob(InboxMessageRepository inboxMessageRepository)
{
_inboxMessageRepository = inboxMessageRepository;
}
[DisableConcurrentExecution(timeoutInSeconds: 30)]
[AutomaticRetry(Attempts = 0)]
public async Task ExecuteJob() //every single execution is scoped
{
using var transaction = await _inboxMessageRepository.BeginTransactionAsync(IsolationLevel.Serializable);
var inboxMessages = await _inboxMessageRepository.GetInboxMessagesAsync(takeCount: 50); //WHERE Status in ('Created','Processing') AND RetryCount < 3 ORDER BY Priorty, RetryCount, CreatedTime
foreach (var item in inboxMessages)
{
await ProcessByTypeAsync(item);
}
var updateResult = await _inboxMessageRepository.UpdateRangeAsync(inboxMessages);
if (updateResult > 0)
{
await transaction.CommitAsync();
}
}
private async Task ProcessByTypeAsync(InboxMessage message)
{
//update inbox message status to 'Processing'
message.Status = "Processing";
//deserialize message.Payload
//insert & update some different tables by InboxMessage.Type... (processing a inbox message...)
if(result)
{
message.Status = "Successful";
return;
}
message.RetryCount++;
if (message.RetryCount > 3)
{
message.Status = 'Failed';
}
}
}