I am trying to lern the confluent Kafka in my project and we do have API Key and Secrete, but I am not able to use it while trying to read the message from kafka topic. I went thriugh the link: https://docs.confluent.io/cloud/current/access-management/authenticate/api-keys/best-practices-api-keys.html, but its not clear how to set it into Java consumer code and also getting below error
Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed
I used below code
public class HelloConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXXXXXXXXX.azure.confluent.cloud:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "someid");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "apiKeyValue" + "" password="" + "secreteValue" + "";");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("tci.gab.locate.response"));
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
System.out.println("Key: " + record.key() + ", Value: " + record.value());
System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
}