Im working on a project where I have to import large xml file such as 4BG with 40 millions of recods with person data that is then to be processed. Currently I have my code where at the start it would process around 5k/s although with the time the upload slows down & when at 300K + it would slow down to 20-30/s & eventually stopping.
The heap memory stays around 150-180 at all times & this is a requirement that I would keep it under 200.
What could be done here to optimise the memory usage & have it not clog up as much after some time & run more effectivley?
@Component
@Slf4j
public class FileImporter {
private final JdbcTemplate jdbcTemplate;
public FileImporter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void insert(FileInformation fileInformation) throws DataAccessException {
String sql = "INSERT INTO file_import (file_path, last_processed_row, status, created_at) VALUES (?, ?, ?, ?)";
try {
jdbcTemplate.update(sql, fileInformation.getFilePath(), fileInformation.getLastProcessedRow(),
fileInformation.getStatus().toString(), fileInformation.getCreatedAt());
} catch (DataAccessException e) {
log.error("Error occurred while inserting file import data: {}", e.getMessage());
throw e;
}
}
public void update(FileInformation fileInformation) throws DataAccessException {
String sql = """
UPDATE file_import
SET
file_path = ?,
last_processed_row = ?,
status = ?,
finished_at = ?,
started_at = ?
WHERE id = ?
""";
try {
jdbcTemplate.update(sql, fileInformation.getFilePath(), fileInformation.getLastProcessedRow(),
fileInformation.getStatus().toString(), fileInformation.getFinishedAt(), fileInformation.getStartedAt(), fileInformation.getId());
} catch (DataAccessException e) {
log.error("Error occurred while updating file import data: {}", e.getMessage());
throw e;
}
}
}
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/file-imports")
@Slf4j
public class FileController {
private final FileService fileService;
@GetMapping("/{id}")
public ResponseEntity<FileImportStatusResponse> getFileImportStatus(@PathVariable Long id) {
return ResponseEntity.ok(fileService.getFileImportStatus(id));
}
@PostMapping
@PreAuthorize("hasAnyRole('ADMIN', 'IMPORTER')")
public ResponseEntity<FileUploadResponse> uploadFile(@RequestParam("file") MultipartFile file) throws IOException {
FileUploadResponse uploadResponse = fileService.uploadFile(file.getInputStream(), file.getOriginalFilename(), file.getSize());
return ResponseEntity.status(HttpStatus.ACCEPTED).body(uploadResponse);
@Service
@RequiredArgsConstructor
@Slf4j
public class FileService {
private final FileImportRepository fileImportRepository;
private final FileProcessor fileProcessor;
private final FileStorage fileStorage;
private final FileImporter fileImporter;
public FileUploadResponse uploadFile(InputStream inputStream, String originalFilename, long byteSize) {
if (byteSize <= 0) {
return new FileUploadResponse("Error occurred when uploading a file", originalFilename);
}
try {
String uniqueFilename = fileStorage.save(inputStream, originalFilename, byteSize);
FileInformation fileInformation = FileInformation.builder()
.filePath(uniqueFilename)
.lastProcessedRow(0L)
.status(FileStatus.PENDING)
.createdAt(LocalDateTime.now())
.build();
fileImporter.insert(fileInformation);
FileUploadResponse response = new FileUploadResponse("File uploaded successfully. File name: " + uniqueFilename, uniqueFilename);
return response;
} catch (IOException e) {
log.error("Failed to upload the file", e);
return new FileUploadResponse("Failed to upload the file.", originalFilename);
}
}
@Async
public Optional<Long> findFileToProcess() {
return fileImportRepository.findFirstByStatusOrderByCreatedAtAsc();
}
public void processFile(Long fileImportId) {
FileInformation fileInformation = fileImportRepository.findById(fileImportId)
.orElseThrow(() -> new ResourceNotFoundException("Import file with id: " + fileImportId + " hasnt been found"));
try {
if (fileInformation.getStartedAt() == null) {
fileInformation.setStartedAt(LocalDateTime.now());
}
var processing = true;
while (processing) {
Long batchStart = fileInformation.getLastProcessedRow();
FileProcessor.Result batchResult = fileProcessor.processFile(fileInformation, batchStart, 10);
fileInformation.setLastProcessedRow(batchResult.lastProcessedRow());
fileInformation.setStatus(FileStatus.IN_PROGRESS);
processing = !batchResult.isFinished();
fileImporter.update(fileInformation);
}
fileInformation.setFinishedAt(LocalDateTime.now());
fileInformation.setStatus(FileStatus.SUCCESS);
} catch (Exception e) {
log.error("Error when processing file {}", fileImportId, e);
fileInformation.setFinishedAt(LocalDateTime.now());
fileInformation.setStatus(FileStatus.FAILED);
}
fileImporter.update(fileInformation);
}
public FileImportStatusResponse getFileImportStatus(Long id) {
Optional<FileInformation> fileImportOptional = fileImportRepository.findById(id);
return fileImportOptional.map(this::buildStatusResponse)
.orElseThrow(() -> new ResourceNotFoundException("File import status not found!"));
}
private FileImportStatusResponse buildStatusResponse(FileInformation fileInformation) {
return new FileImportStatusResponse(
fileInformation.getStatus(),
fileInformation.getCreatedAt(),
fileInformation.getStartedAt(),
fileInformation.getLastProcessedRow());
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class FileProcessor {
public record Result(long lastProcessedRow, boolean isFinished) {
}
private final PersonRepository personRepository;
private final PersonValidator personValidator;
private final Map<String, PersonCreationStrategy> creationStrategyMap;
private final ComposedCsvFileRowToCreateCommandStrategy csvFileRowToCreateCommandStrategy;
private final FileStorage fileStorage;
@Transactional
public Result processFile(FileInformation fileInformation, long batchStart, long batchSize) throws IOException {
AtomicInteger processedLines = new AtomicInteger();
try (BufferedReader reader = fileStorage.load(fileInformation.getFilePath())) {
var lines = reader.lines();
Stream<String> batchLines = lines.skip(1).skip(batchStart).limit(batchSize);
List<Person> entities = batchLines.map(line -> {
Person person = processFileLine(line);
personValidator.validate(person);
processedLines.getAndIncrement();
return person;
}).collect(Collectors.toList());
personValidator.validatePersonsForBatchSave(entities);
personRepository.saveAllAndFlush(entities);
}
boolean isFinished = processedLines.get() < batchSize;
return new Result(batchStart + processedLines.get(), isFinished);
}
private Person processFileLine(String line) {
String[] data = line.split(",");
String type = data[0];
String key = type.toLowerCase() + "CreationStrategy";
PersonCreationStrategy strategy = creationStrategyMap.get(key);
Person person;
if (strategy != null) {
person = createAndAddToDatabase(strategy, data);
} else {
throw new ResourceNotFoundException("Unknown type: " + type);
}
return person;
}
private Person createAndAddToDatabase(PersonCreationStrategy strategy, String[] data) {
CreatePersonCommand command = mapDataToCommand(data);
return strategy.create(command);
}
private CreatePersonCommand mapDataToCommand(String[] data) {
return csvFileRowToCreateCommandStrategy.toCommand(data);
}
}
I have attempted to use garbage collector on demand but it seems that the heap usage is much lower although it still slows down after some time.