Is there a way to ensure a good connection between API and Kafka producer?

I am finding trouble at making my project work well. It is a stream processing pipeline (with Kafka and Flink) that captures data from an API and records it on a Postgres database table. The problem is that the Docker logs point out too many reconnections and there is no data on the endpoint table.

There are topics already created for the Kafka broker, but it seems nothing goes past there.

And this is the producer.py:

import websocket
import json
import time
import schedule
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from datetime import datetime
import random
import config
import logging
import threading

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load configuration
c = config.load_config()

kafka_bootstrap_server = c.KAFKA_BOOTSTRAP_SERVER
kafka_topic_data = c.KAFKA_TOPIC_DATA
kafka_topic_predictions = c.KAFKA_TOPIC_PREDICTIONS
symbols = c.crypto_SYMBOLS
interval = c.PRODUCER_INTERVAL_SECONDS
finnhub_api_key = c.FINNHUB_API_KEY

# Initialize latest_trade_data
latest_trade_data = {}

# Function to create Kafka topics if they don't exist
def create_kafka_topics(): 
    try: 
        client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_server) 
        topics = [kafka_topic_data, kafka_topic_predictions] 
        new_topics = [NewTopic(name=topic, num_partitions=1, replication_factor=1) for topic in topics] 
        existing_topics = client.list_topics() 
        existing_topic_names = set(existing_topics)  # Convert to set for faster lookup 
        topics_to_create = [topic for topic in new_topics if topic.name not in existing_topic_names] 

        if topics_to_create: 
            client.create_topics(new_topics=topics_to_create) 
            for topic in topics_to_create: 
                logger.info(f"Topic {topic.name} created") 
        else:
            logger.info("No new topics to create. All topics already exist.") 
    except Exception as e: 
        logger.error(f"An error occurred while creating Kafka topics: {e}") 
    finally: 
        client.close() 
        logger.info("Kafka client closed.")

# Kafka Producer initialization with retry logic
def create_producer(bootstrap_servers):
    attempts = 0
    while attempts < 5:
        try:
            producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                     value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                     key_serializer=lambda k: k.encode('utf-8'))
            return producer
        except Exception as e:
            logger.error(f"Failed to create Kafka producer: {e}")
            attempts += 1
            time.sleep(2 ** attempts)  # Exponential backoff
    raise Exception("Failed to create Kafka producer after several attempts")

producer = create_producer(kafka_bootstrap_server)

def delivery_report(err, record_metadata):
    """ Called once for each message produced to indicate delivery result. """
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {record_metadata.topic} [{record_metadata.partition}]")

def on_message(ws, message):
    """Callback function to handle incoming WebSocket messages."""
    data = json.loads(message)
    if data.get('type') == 'trade':
        for trade in data['data']:
            symbol = trade['s']
            record = {
                'symbol': symbol,
                'timestamp': datetime.fromtimestamp(trade['t'] / 1000.0).strftime('%Y-%m-%d %H:%M:%S'),
                'price': trade['p'],
                'volume': trade['v']
            }
            latest_trade_data[symbol] = record  # Update latest trade data
            try:
                future = producer.send(kafka_topic_data, key=symbol, value=record)
                future.add_callback(delivery_report)
                future.add_errback(lambda exc: logger.error(f"Failed to send record to Kafka: {exc}"))
            except Exception as e:
                logger.error(f"Failed to send record to Kafka: {e}")

def on_error(ws, error):
    """Callback function to handle WebSocket errors."""
    logger.error(f"WebSocket error: {error}")

def on_close(ws, close_status_code, close_msg):
    """Callback function to handle WebSocket closure."""
    logger.info(f"### closed ### status: {close_status_code}, message: {close_msg}")
    reconnect_websocket()

def reconnect_websocket():
    """Recreate and start the WebSocket connection."""
    global ws_thread, ws
    logger.info("Reconnecting to WebSocket...")
    ws.close()
    ws_thread.join()  # Ensure previous thread is finished
    ws = websocket.WebSocketApp(
        f"wss://ws.finnhub.io?token={finnhub_api_key}",
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws.on_open = on_open

    ws_thread = threading.Thread(target=ws.run_forever)
    ws_thread.start()

def on_open(ws):
    """Callback function to handle WebSocket connection opening."""
    for symbol in symbols:
        ws.send(json.dumps({'type': 'subscribe', 'symbol': symbol}))
    logger.info("Subscribed to symbols: %s", symbols)

def generate_prediction():
    """Generates crypto prediction data based on the latest trade data."""
    for symbol, data in latest_trade_data.items():
        last_price = data['price']
        prediction = last_price * (1 + random.uniform(-0.02, 0.02))  # Simple prediction logic
        record = {
            'symbol': symbol,
            'time': data['timestamp'],
            'close': last_price,
            'prediction': prediction
        }
        try:
            future = producer.send(kafka_topic_predictions, key=symbol, value=record)
            future.add_callback(delivery_report)
            future.add_errback(lambda exc: logger.error(f"Failed to send prediction to Kafka: {exc}"))
        except Exception as e:
            logger.error(f"Failed to send prediction to Kafka: {e}")

def job():
    """Job to generate predictions."""
    generate_prediction()

if __name__ == "__main__":
    # Ensure Kafka topics are created
    create_kafka_topics()

    # Initialize the WebSocket connection
    ws = websocket.WebSocketApp(
        f"wss://ws.finnhub.io?token={finnhub_api_key}",
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws.on_open = on_open

    # Schedule the prediction job
    schedule.every(interval).seconds.do(job)

    ws_thread = threading.Thread(target=ws.run_forever)
    ws_thread.start()

    # Run the scheduled jobs in the main thread
    try:
        while True:
            schedule.run_pending()
            time.sleep(0.5)
    except KeyboardInterrupt:
        logger.info("Shutting down gracefully...")
    finally:
        producer.flush()
        ws.close()
        ws_thread.join()
        logger.info("Producer and WebSocket connection closed.")

This is the error log from my producer.py:

2024-08-03 17:27:11 INFO:kafka.conn:Broker version identified as 2.5.0
2024-08-03 17:27:11 INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: connecting to kafka:9092 [('172.19.0.7', 9092) IPv4]
2024-08-03 17:27:11 INFO:kafka.conn:Probing node 1 broker version
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: Connection complete.
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection. 
2024-08-03 17:27:11 INFO:kafka.conn:Broker version identified as 2.5.0
2024-08-03 17:27:11 INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2024-08-03 17:27:11 ERROR:__main__:An error occurred while creating Kafka topics: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='CryptoData', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[]), (topic='CryptoPrediction', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='CryptoData', error_code=36, error_message="Topic 'CryptoData' already exists."), (topic='CryptoPrediction', error_code=36, error_message="Topic 'CryptoPrediction' already exists.")])'.
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection. 
2024-08-03 17:27:11 INFO:__main__:Kafka client closed.
2024-08-03 17:27:12 INFO:websocket:Websocket connected
2024-08-03 17:27:12 INFO:__main__:Subscribed to symbols: ['BTCUSDT']
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: connecting to kafka:9092 [('172.19.0.7', 9092) IPv4]
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: Connection complete.
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection. 
``

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật