I have KafkaListener
@KafkaListener(
idIsGroup = false,
id = "#{T(java.util.UUID).randomUUID().toString()}",
topics = "${kafka.topic}",
groupId = "OpenDialGroup",
containerFactory = "kafkaListenerContainerFactory",
concurrency = "2")
public void employeeListener(Message message, @Header(value = "RqUID", required = false) String rquid) {
MDC.put("rquid", "[" + Strings.nullToEmpty(rquid) + "]");
try {
log.debug("Received new message {}", message);
//...
log.debug("Message processed");
} catch (Exception e) {
log.error("Error processing user", e);
}
finally {
MDC.clear();
}
}
Also config
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (kafkaTeamidProperties.getTrustStoreLocation() != null && !kafkaProperties.getTrustStoreLocation().isEmpty()) {
configProps.put("security.protocol", "SSL");
configProps.put("ssl.truststore.location", kafkaProperties.getTrustStoreLocation());
configProps.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());
configProps.put("ssl.keystore.location", kafkaProperties.getKeyStoreLocation());
configProps.put("ssl.key.password", kafkaProperties.getKeyStorePassword());
configProps.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());
configProps.put("ssl.endpoint.identification.algorithm", "");
}
JsonDeserializer<TeamIdEmployeeMessage> deserializer = new JsonDeserializer<>(Message.class, false);
deserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
In my topic i have 3 partition.
I have two instasnce of my Spring application runnin on wildfly servers and listening one topic in one consumer group.
When i send 300.000 messages in topic i receive about 115.000.
Iam dont understand where i lost my messages.
If i stopted one instance of my app i receive all messages. Whats wrong with it?
1