I am facing issue while fetching 2 million records from kafka topic through a consumer.
My requirement is to fetch the record and store those records into a Map and return that Map to other methods so they can process and write the final output to a file.
Adding the run method here –
public Map<String, SaleRecord> run() throws NullPointerException {
kafkaConfigurations();
Map<String, SaleRecord> recordMap = new HashMap<>();
LOG.info("Started consuming kafka messages....");
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
int i = 0;
// int recordLimit = 1000; // Set the limit to the desired number of records
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
LOG.debug(records.count() + "^^^^^^^size of records");
/* if (records.isEmpty()) {
LOG.info("No more records to consume. Exiting...");
break;
}*/
for (ConsumerRecord<String, String> record : records) {
JsonNode jsonData = null;
try {
jsonData = mapper.readTree(record.value());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
// Validate JSON data
if (!validateJsonData(jsonData)) {
continue; // Skip invalid records
}
SaleRecord priceRecords = mapper.convertValue(jsonData, SaleRecord.class);
if (priceRecords != null && isValidSaleRecord(priceRecords)) {
recordMap.put(priceRecords.getInvoiceId(), priceRecords);
i++;
}
}
LOG.info("Done consuming kafka messages after {} iterations", i);
return recordMap;
}
}
So what can be the terminal condition of while loop, if I have to process 2 million records within less time.
I have tried to processing 1 lakh records and its working with terminal condition as 100000.
but I have to process 20 lakh per day ,so its taking time and going into a infinite loop.
I dont have much expertise on kafka. If anyone know about kafkaListner property or any easy way to do this, please help.
sanju is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.