How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I’ve already tried using Workload Identity with a Service Connector (previously I’ve tried with a connection string without success), but I’m still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.
The consumer was tested locally and works fine, here is the code
from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('nKAFKA Stats: {}n'.format(pformat(stats_json)))
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
return access_token.token, access_token.expires_on
def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt("<resource group", 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])
# Azure credential # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential(managed_identity_client_id="<id>")
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
'group.id': '$Default',
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
}
#print(str(conf))
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %sn" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %sn" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
#print(str(c.list_topics().topics))
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(["test"], on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by usern')
finally:
# Close down consumer to commit final offsets.
c.close()
Here is the deployment on K8s
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
namespace: default
labels:
app: consumer
azure.workload.identity/use: "true"
spec:
replicas: 1
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
serviceAccountName: sc-account-<name>
containers:
- name: consumer
image: <image>
command: ["/venv/bin/python","/consumer.py"]
envFrom:
- secretRef:
name: sc-eventhub<secret>