I want to develop a batch process in .NET Core to synchronize a CSV file containing 3 million lines. The process reads data from the file, updates existing items in the database, and creates new items if they do not exist. However, this operation currently takes a significant amount of time; for example, processing 100,000 lines takes 30 minutes. I have used parallel tasks to improve performance, but I am still facing latency issues. Is there any solution to reduce this processing time?
Below my code:
public async Task<bool> ImportDataFromCsvFile()
{
try
{
// Read CSV file asynchronously
List<CsvInvoice> data = await _azureBlobService.ReadCsvFileAsync();
List<InvoiceDb> Invoices = new List<InvoiceDb>();
// Parallel processing of data to create InvoiceAutoDb list
Parallel.ForEach(data, Invoice =>
{
var entity = new InvoiceDb
{
Id_Invoice = Invoice.IdInvoice,
DateCheck = Invoice.DateCheck
};
lock (Invoices) // Ensure thread-safety when adding to the list
{
Invoices.Add(entity);
}
});
// Get distinct contracts
var distinctInvoice = Invoices
.GroupBy(c => c.Id_Invoice)
.Where(x => x.Count() == 1)
.Select(g => g.First())
.ToList();
var nonDistinctInvoice = Invoices
.Except(distinctInvoice)
.ToList();
if (nonDistinctInvoice.Any())
{
Log.Warning("Invoice Dupliqué(s) {duplicata}", nonDistinctInvoice.Select(c => c.Id_Invoice).Distinct());
}
// Use Parallel.ForEachAsync for parallel asynchronous processing
//this is the part where it consume time
await Parallel.ForEachAsync(distinctInvoice, async (Invoice, cancellationToken) =>
{
using (var scope = _serviceProvider.CreateScope())
{
var service = scope.ServiceProvider.GetRequiredService<IInvoiceService>();
var existingInvoice = await service.GetByNumeroInvoiceAsync(Invoice.Id_Invoice);
if (existingInvoice == null)
{
await service.ImportNewInvoiceAsync(Invoice);
}
else
{
existingInvoice.DateCheck = Invoice.DateCheck;
await service.UpdateImportedInvoiceAsync(existingInvoice);
}
await service.SaveChangesAsync();
}
});
Log.Information("{result} line imported successfully", distinctInvoice.Count);
// Publish to Kafka
await PublishInvoice(distinctInvoice);
return true;
}
catch (Exception ex)
{
Log.Error("Import Data failed {errorMessage}", ex.Message);
throw new Exception(ex.Message);
}
}
`