I am new to Kafka and currently attempting to send data from Airflow to a Kafka broker. However, I’m encountering an issue where it reports a DNS lookup failure for the broker.
broker service in docker-compose.yml
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
kafka_stream.py
def stream_data():
from kafka import KafkaProducer
res = get_data()
res = format_data(res)
print("Function Started")
print(res)
try:
producer = KafkaProducer(bootstrap_servers=['broker:29092'],api_version = (2, 5, 0), max_block_ms=500000)
producer.send('user_created', res)
except Exception as e:
print("Failed to connect to Kafka broker : ", e)
with DAG('user_automation',
default_args=default_args,
schedule_interval='@daily',
) as dag:
streaming_task = PythonOperator(
task_id = 'stream_data_from_api',
python_callable=stream_data
)
Error:
[2024-07-28, 16:04:19 UTC] {conn.py:1276} WARNING - DNS lookup failed for broker:29092, exception was [Errno -3] Temporary failure in name resolution. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
[2024-07-28, 16:04:19 UTC] {conn.py:297} ERROR - DNS lookup failed for broker:29092 (0)
Here is my complete code: https://github.com/dhainiksuthar/airflow-kafka