When I try to run the my implementation, I got the following error:-
11:57:50.305 [broker-0-lifecycle-manager-event-handler] ERROR kafka.server.BrokerLifecycleManager - [BrokerLifecycleManager id=0] Shutting down because we were unable to register with the controller quorum. 11:57:50.314 [Test worker] ERROR kafka.server.BrokerServer - [BrokerServer id=0] Received a fatal error while waiting for the controller to acknowledge that we are caught up java.util.concurrent.CancellationException: null at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276) at kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:586) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) at java.lang.Thread.run(Thread.java:750) 11:57:50.319 [Test worker] ERROR kafka.server.BrokerServer - [BrokerServer id=0] Fatal error during broker startup. Prepare to shutdown java.lang.RuntimeException: Received a fatal error while waiting for the controller to acknowledge that we are caught up at org.apache.kafka.server.util.FutureUtils.waitWithLogging(FutureUtils.java:68) at kafka.server.BrokerServer.startup(BrokerServer.scala:474) at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99) at kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99) at scala.Option.foreach(Option.scala:407) at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
I tried to deploy and start 3 KRaft nodes sequentially, each playing the role of broker and controller as well i.e. (process.roles=broker,controller).
Configuration for each Kraft Node:-
Map<String, String> config = new HashMap<>();
config.put("node.id", String.valueOf(nodeId));
config.put("num.partitions", String.valueOf(numPartitions));
config.put("log.dirs", logsDir);
config.put("default.replication.factor", String.valueOf(replicationFactor));
config.put("offsets.topic.replication.factor", String.valueOf(replicationFactor));
config.put("log.segment.bytes", String.valueOf(logSegmentBytes));
config.put("log.cleaner.dedupe.buffer.size", String.valueOf(CLEANER_BUFFER_SIZE));
config.put("listeners", "PLAINTEXT://" + hostName + ":" + kafkaBrokerPort + ",CONTROLLER://0.0.0.0:" + kafkaControllerPorts.get(nodeId));
config.put("process.roles", "broker,controller");
config.put("inter.broker.listener.name", "PLAINTEXT");
config.put("advertised.listeners", "PLAINTEXT://" + hostName + ":" + kafkaBrokerPort);
config.put("controller.listener.names", "CONTROLLER");
config.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
config.put(PROP_KAFKA_INTER_BROKER_PROTOCOL_VERSION, interBrokerProtocolVersion);
String quorumVoters = kafkaNodeIds.get(0) + "@localhost:" + kafkaControllerPorts.get(0);
for (int i = 1 ; i < kafkaControllerPorts.size() ; i++) {
quorumVoters = String.join(",", quorumVoters, kafkaNodeIds.get(i) + "@localhost:" + kafkaControllerPorts.get(i));
}
config.put("controller.quorum.voters", quorumVoters);
Here kafkaControllerPorts is an ArrayList of unique available ports in the system and kafkaNodeIds is a list of unique node ids assigned to each node.
here is the sample output from logs:-
KafkaBrokerPort: 50268
Node Id: 0
Kafka Controller Ports: [50265, 50266, 50267]
kafka node Ids: [0, 1, 2]
controller.quorum.voters: 0@localhost:50265,1@localhost:50266,2@localhost:50267
And here is the driver code:-
String logDirs = props.getProperty("log.dirs");
File logDir = new File(logDirs);
if (logDir.exists() && logDir.isDirectory()) {
File metaPropertiesFile = new File(logDir, "meta.properties");
if (!metaPropertiesFile.exists()) {
formatStorage(props);
}
}
System.out.println(clusterId);
KafkaConfig kafkaConfig = new KafkaConfig(props);
KafkaRaftServer kafkaServer = new KafkaRaftServer(kafkaConfig, new SystemTime());
log.info("Starting");
kafkaServer.startup();
log.info("Started");