I need to upload large csv file. Size of this file is around 3GB (20milions of records), but the problem is that i need to fit in 200MB.I am already in VM options using -Xmx200m and-XX:+UseG1GC. I cant add visualVM screenshoot but it immidently stops at 160k record with memory limit. I am also using simple types.
I tried to upload it with small batchs, streams. This is my code:
'
@Slf4j
@Service
@RequiredArgsConstructor
public class CsvTransactionProcessingService {
private final CsvFileReaderService csvFileReaderService;
private final PersonValidationService personValidationService;
private final PersonBatchService personBatchService;
private final ProcessStatusService processStatusService;
private final PersonMappingFacade mappingFacade;
@Transactional
public void processCsvFile(Path pathToFile, int processId) {
int processedRows = 0;
long startTime = System.nanoTime();
try (Stream<PersonDto> personDtoStream = csvFileReaderService.readCsvFile(pathToFile)) {
Iterator<PersonDto> iterator = personDtoStream.iterator();
while (iterator.hasNext()) {
List<Person> batch = new ArrayList<>();
while (iterator.hasNext() && batch.size() < 100) {
PersonDto personDto = iterator.next();
try {
personValidationService.validatePersonDto(personDto);
Person person = mappingFacade.mapToPerson(personDto);
batch.add(person);
} catch (RuntimeException e) {
log.error("Error processing CSV data: " + e.getMessage(), e);
processStatusService.updateStatus(processId, ProcessStatusEnum.FAILED, processedRows, e.getMessage());
throw e;
}
processedRows++;
}
if (!batch.isEmpty()) {
personBatchService.saveBatch(batch);
}
if (processedRows % 100 == 0) {
processStatusService.updateProcessedRows(processId, processedRows);
}
}
long endTime = System.nanoTime();
long durationMillis = (endTime - startTime) / 1_000_000;
log.info("Processed {} records in {} milliseconds", processedRows, durationMillis);
log.info("Processing rate: {} records per second", (processedRows / (durationMillis / 1000.0)));
processStatusService.updateStatus(processId, ProcessStatusEnum.COMPLETED, processedRows, null);
} catch (IOException e) {
log.error("IOException while processing CSV file", e);
processStatusService.updateStatus(processId, ProcessStatusEnum.FAILED, processedRows, e.getMessage());
throw new RuntimeException("Failed to process CSV file", e);
} catch (Exception e) {
log.error("Unexpected exception while processing CSV file", e);
processStatusService.updateStatus(processId, ProcessStatusEnum.FAILED, processedRows, e.getMessage());
throw e;
}
}
}
'
'
@Slf4j
@Service
@RequiredArgsConstructor
public class CsvFileReaderService {
private final PersonParserRegistry parserRegistry;
@Transactional
public Stream<PersonDto> readCsvFile(Path pathToFile) throws IOException {
BufferedReader bufferedReader = Files.newBufferedReader(pathToFile);
return bufferedReader.lines().skip(1)
.map(this::parseCsvToPersonDto)
.filter(Objects::nonNull);
}
private PersonDto parseCsvToPersonDto(String line) {
try {
String[] personProperties = line.split(",");
if (personProperties.length < 9) {
log.warn("Invalid CSV data format: Expected at least 9 fields, but got {}", personProperties.length);
return null;
}
String type = personProperties[0].trim();
PersonParser parser = parserRegistry.findParser(type);
if (parser == null) {
log.warn("No parser found for type: {}", type);
return null;
}
log.debug("Parsing line: {}", line);
return parser.parse(personProperties);
} catch (Exception e) {
log.error("Error parsing CSV line: {} - {}", line, e.getMessage());
return null;
}
}
}
'
New contributor
codingJohn is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.