I want to consume all the kafka messages from 0 offset in startup of my spring boot application whenever my service restart. Below is my kafka configuration but it is not working.
@Bean
public ConsumerFactory<String,String> consumerFactory1(){
Map<String,Object> config = new HashMap<>();
InetAddress ip = null;
try {
ip = getIPAddress();
} catch (Exception e) {
log.error("error on getting local host", e.getMessage());
}
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaserver);
config.put(ConsumerConfig.GROUP_ID_CONFIG,
(ip != null? ip.getHostName() : "rasbrandmapper-" + UUID.randomUUID()));
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
private InetAddress getIPAddress() throws UnknownHostException {
return InetAddress.getLocalHost();
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> configKafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
factory.setConcurrency(1);
return factory;
}
@KafkaListener(id = "config_id", topicPartitions = @TopicPartition(topic = "#{@kafkaConstants.getConfig()}",
partitionOffsets = {@PartitionOffset(partition = "0-11", initialOffset = "0")}),
containerFactory = "configKafkaListenerContainerFactory1")`your text`
New contributor
Madhu Korada is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.