I’m trying to setup SASL plain authentication in kafka. But when I try to read a message or send a message, the Kafka Server logs the following error:
INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:49254-2) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
Whereas the consumer/producer terminal logs the following error:
WARN [Consumer clientId=console-consumer, groupId=console-consumer-40647] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
My .conf files, .properties files and .bat files (as I’m using windows) are given below
kafka_server_jaas.conf file which is used for Kafka Server:
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required
username=kafkauser
password=Kafka123
user_kafkauser=Kafka123;
};
Client {org.apache.zookeeper.server.auth.DigestLoginModule required
username=kafkauser
password=Kafka123
servicename=zookeeper;
};
zookeeper_jaas.conf file which is used for Zookeeper:
Server {org.apache.zookeeper.server.auth.DigestLoginModule required
username=kafkauser
password=Kafka123
user_kafkauser=Kafka123;
};
kafka_client_jaas.conf which is used for Consumer and Producer:
KafkaClient {org.apache.zookeeper.server.auth.DigestLoginModule required
username=kafkauser
password=Kafka123;
};
server.properties:
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
advertised.host.name=localhost
super.users=User:kafkauser
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
zookeeper.properties:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenewal=3600000
jaasEnabled=true
consumer.properties:
bootstrap.servers=localhost:9092
security.protocol=SASL_PLAINTEXT
SASL.mechanism=PLAIN
group.id=test-consumer-group
producer.properties:
bootstrap.servers=localhost:9092
security.protocol=SASL_PLAINTEXT
SASL.mechanism=PLAIN
compression.type=none
kafka-server-start.bat:
@echo off
IF [%1] EQU [] (
echo USAGE: %0 server.properties
EXIT /B 1
)
SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=/kafka/config/kafka_server_jaas.conf
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
IF NOT ERRORLEVEL 1 (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
) ELSE (
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
)
)
"%~dp0kafka-run-class.bat" kafka.Kafka %*
EndLocal
zookeeper-server-start.bat:
@echo off
IF [%1] EQU [] (
echo USAGE: %0 zookeeper.properties
EXIT /B 1
)
SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=/kafka/config/zookeeper_jaas.conf
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal
kafka-console-consumer.bat:
@echo off
SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=/kafka/config/kafka_client_jaas.conf
set KAFKA_HEAP_OPTS=-Xmx512M
"%~dp0kafka-run-class.bat" kafka.tools.ConsoleConsumer %*
EndLocal
kafka-console-producer.bat:
@echo off
SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=/kafka/config/kafka_client_jaas.conf
set KAFKA_HEAP_OPTS=-Xmx512M
"%~dp0kafka-run-class.bat" kafka.tools.ConsoleProducer %*
EndLocal