How to connect my pod on AKS to Azure Event Hub with its kafka interface?

How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I’ve already tried using Workload Identity with a Service Connector (previously I’ve tried with a connection string without success), but I’m still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.

The consumer was tested locally and works fine, here is the code

from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os


def stats_cb(stats_json_str):
    stats_json = json.loads(stats_json_str)
    print('nKAFKA Stats: {}n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
    # confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

    # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
    # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
    # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
    access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
    return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
    sys.stderr.write(
        'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..n' % program_name)
    options = '''
 Options:
  -T <intvl>   Enable client statistics at specified interval (ms)
'''
    sys.stderr.write(options)
    sys.exit(1)


if __name__ == '__main__':
    optlist, argv = getopt.getopt("<resource group", 'T:')
    if len(argv) < 3:
        print_usage_and_exit(sys.argv[0]) 

      # Azure credential     # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview

    cred = DefaultAzureCredential(managed_identity_client_id="<id>")


    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
        'group.id': '$Default',
        'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest',

        # Required OAuth2 configuration properties
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'OAUTHBEARER',
        # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
        'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
    }
    #print(str(conf))
    # Check to see if -T option exists
    for opt in optlist:
        if opt[0] != '-T':
            continue
        try:
            intval = int(opt[1])
        except ValueError:
            sys.stderr.write("Invalid option value for -T: %sn" % opt[1])
            sys.exit(1)

        if intval <= 0:
            sys.stderr.write("-T option value needs to be larger than zero: %sn" % opt[1])
            sys.exit(1)

        conf['stats_cb'] = stats_cb
        conf['statistics.interval.ms'] = int(opt[1])

    # Create logger for consumer (logs will be emitted when poll() is called)
    logger = logging.getLogger('consumer')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
    logger.addHandler(handler)

    # Create Consumer instance
    # Hint: try debug='fetch' to generate some log messages
    c = Consumer(conf, logger=logger)
    #print(str(c.list_topics().topics))
    def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

    # Subscribe to topics
    c.subscribe(["test"], on_assign=print_assignment)

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by usern')

    finally:
        # Close down consumer to commit final offsets.
        c.close()

Here is the deployment on K8s

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
  namespace: default
  labels:
    app: consumer
    azure.workload.identity/use: "true"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      serviceAccountName: sc-account-<name>
      containers:       
      - name: consumer         
        image: <image>
        command: ["/venv/bin/python","/consumer.py"]
        envFrom:
          - secretRef:
              name: sc-eventhub<secret>

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật