I am trying to set up Kafka Kraft to use SASL. I have this server.properties file:
############################# Server Basics #############################
process.roles=broker,controller
node.id={{ node_id }}
controller.quorum.voters={{ controller_quorum_voters }}
############################# Socket Server Settings #############################
listeners=BROKER://{{ node_internal_ip }}:9092,CONTROLLER://{{ node_internal_ip }}:9093,CLIENT://{{ node_client_ip }}:9094
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# Credentials
listener.name.BROKER.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
listener.name.CONTROLLER.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs={{ kafka_logs_dir }}
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Topics Properties #############################
delete.topic.enable = true
auto.create.topics.enable = false
However I am getting this error:
[2024-06-08 18:49:01,662] ERROR Encountered fatal fault: caught exception (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'controller.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:150)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:103)
at org.apache.kafka.common.security.JaasContext.loadServerContext(JaasContext.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:143)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
at kafka.network.Processor.<init>(SocketServer.scala:973)
at kafka.network.Acceptor.newProcessor(SocketServer.scala:879)
at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at kafka.network.Acceptor.addProcessors(SocketServer.scala:848)
at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523)
at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251)
at kafka.network.SocketServer.$anonfun$new$29(SocketServer.scala:172)
at kafka.network.SocketServer.$anonfun$new$29$adapted(SocketServer.scala:172)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.network.SocketServer.<init>(SocketServer.scala:172)
at kafka.server.ControllerServer.startup(ControllerServer.scala:188)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:98)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:98)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:98)
at kafka.Kafka$.main(Kafka.scala:112)
at kafka.Kafka.main(Kafka.scala)
It is quite strange, as the official documentation explicitly says that the configuration: ‘listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config’ should have the priority
I am sure I am passing the configuration file because here is my service
[Unit]
Description=Apache Kafka
Documentation=http://kafka.apache.org/documentation.html
After=network.target
[Service]
Type=simple
User=kafka
Group=kafka
Environment="KAFKA_HOME={{ kafka_install_dir }}"
Environment="KAFKA_HEAP_OPTS=-Xmx{{ ansible_memtotal_mb // 2 }}M -Xms{{ ansible_memtotal_mb // 2 }}M"
ExecStart=/bin/sh -c '{{ kafka_install_dir }}/bin/kafka-server-start.sh {{ kafka_install_dir }}/config/kraft/server.properties > {{ kafka_install_dir }}/kafka.log 2>&1'
ExecStop={{ kafka_install_dir }}/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
plus I get the same error if I run the program manually
bin/kafka-server-start.sh config/kraft/server.properties
I am actually quite puzzled :/ Am I maybe missing some storage-formatting that I need to do before enabling SASL for brokers? However that doesn’t seem to be documented.
If you have any clue of what might be wrong here, please be my guest.
Thank you in advanced!