I am creating a very basic consumer/producer setup for Kafka. I am using both Zookeper and Kafka with a Docker compose. Both seem to be connecting but when it comes to produce or consume, nothing happens. I can see the topic created and saw this in the kafka container log when it starts up. Not sure if it’s related.
[2024-05-20 14:32:40,797] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker kafka:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
2024-05-20 16:32:40 java.io.IOException: Connection to kafka:9092 (id: 1 rack: null) failed.
2024-05-20 16:32:40 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
2024-05-20 16:32:40 at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:298)
2024-05-20 16:32:40 at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:251)
2024-05-20 16:32:40 at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
I am almost certain I am missing something in the docker compose file but I cannot figure out what, tried several options around the internet but couldn’t solve it.
These are the files:
docker-compose.yml
version: '3'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
hostname: zookeeper
restart: unless-stopped
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
hostname: kafka
restart: unless-stopped
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_ADVERTISED_HOST_NAME=localhost
Consumer.java
public class Consumer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-fourth-application";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(List.of("consumer_test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}
}
}
Producer.java
public class Producer {
public static void main(String[] args) throws InterruptedException {
Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
int i = 0;
while (true) {
final ProducerRecord<String, String> record = new ProducerRecord<>("consumer_test", "consumer_key", (i++)+"");
producer.send(record);
Thread.sleep(5000);
}
}
}