I am trying to set a generic GossiperMessageListener
on ConcurrentKafkaListenerContainerFactory
like below
<code>@Component
@Slf4j
public class GossiperMessageListener extends AbstractConsumerSeekAware implements
MessageListener<String, String> {
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments, @NotNull ConsumerSeekCallback callback) {
val groupId = KafkaUtils.getConsumerGroupId();
log.info("XXXXX groupId: {} assigned partitions: {}", groupId, assignments);
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
log.info("Received message with Key: {}", data.key());
}
}
</code>
<code>@Component
@Slf4j
public class GossiperMessageListener extends AbstractConsumerSeekAware implements
MessageListener<String, String> {
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments, @NotNull ConsumerSeekCallback callback) {
val groupId = KafkaUtils.getConsumerGroupId();
log.info("XXXXX groupId: {} assigned partitions: {}", groupId, assignments);
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
log.info("Received message with Key: {}", data.key());
}
}
</code>
@Component
@Slf4j
public class GossiperMessageListener extends AbstractConsumerSeekAware implements
MessageListener<String, String> {
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments, @NotNull ConsumerSeekCallback callback) {
val groupId = KafkaUtils.getConsumerGroupId();
log.info("XXXXX groupId: {} assigned partitions: {}", groupId, assignments);
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
log.info("Received message with Key: {}", data.key());
}
}
<code>@Configuration
@Import(GossiperMessageListener.class)
public GossiperConsumerConfig() {
@Bean
public AbstractKafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, String>, String, String>
atLeastOnceKafkaListenerContainerFactory(
final Map<String, Object> consumerProps,
final GossiperMessageListener messageListener) {
val factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.getContainerProperties().setMessageListener(messageListener);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return factory;
}
}
</code>
<code>@Configuration
@Import(GossiperMessageListener.class)
public GossiperConsumerConfig() {
@Bean
public AbstractKafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, String>, String, String>
atLeastOnceKafkaListenerContainerFactory(
final Map<String, Object> consumerProps,
final GossiperMessageListener messageListener) {
val factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.getContainerProperties().setMessageListener(messageListener);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return factory;
}
}
</code>
@Configuration
@Import(GossiperMessageListener.class)
public GossiperConsumerConfig() {
@Bean
public AbstractKafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, String>, String, String>
atLeastOnceKafkaListenerContainerFactory(
final Map<String, Object> consumerProps,
final GossiperMessageListener messageListener) {
val factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.getContainerProperties().setMessageListener(messageListener);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return factory;
}
}
I can see that partitions are getting assigned and I can receive messages from Kafka but my custom GossiperMessageListener
is not getting invoked. Can someone let me know what is going wrong
<code>2024-06-17 12:38:11,545 [aws.us-west-2.0.isos.notification.payloads.consumer-5-C-1] o.s.k.l.KafkaMessageListenerContainer aws.us-west-2.0.isos.notification.payloads.consumer: partitions assigned: [aws.us-west-2.0.isos.notification.payloads-5]
...
</code>
<code>2024-06-17 12:38:11,545 [aws.us-west-2.0.isos.notification.payloads.consumer-5-C-1] o.s.k.l.KafkaMessageListenerContainer aws.us-west-2.0.isos.notification.payloads.consumer: partitions assigned: [aws.us-west-2.0.isos.notification.payloads-5]
...
</code>
2024-06-17 12:38:11,545 [aws.us-west-2.0.isos.notification.payloads.consumer-5-C-1] o.s.k.l.KafkaMessageListenerContainer aws.us-west-2.0.isos.notification.payloads.consumer: partitions assigned: [aws.us-west-2.0.isos.notification.payloads-5]
...
- spring-kafka – 3.0.12
- kafka-clients – 3.5.1