I am finding trouble at making my project work well. It is a stream processing pipeline (with Kafka and Flink) that captures data from an API and records it on a Postgres database table. The problem is that the Docker logs point out too many reconnections and there is no data on the endpoint table.
There are topics already created for the Kafka broker, but it seems nothing goes past there.
And this is the producer.py:
import websocket
import json
import time
import schedule
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from datetime import datetime
import random
import config
import logging
import threading
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load configuration
c = config.load_config()
kafka_bootstrap_server = c.KAFKA_BOOTSTRAP_SERVER
kafka_topic_data = c.KAFKA_TOPIC_DATA
kafka_topic_predictions = c.KAFKA_TOPIC_PREDICTIONS
symbols = c.crypto_SYMBOLS
interval = c.PRODUCER_INTERVAL_SECONDS
finnhub_api_key = c.FINNHUB_API_KEY
# Initialize latest_trade_data
latest_trade_data = {}
# Function to create Kafka topics if they don't exist
def create_kafka_topics():
try:
client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_server)
topics = [kafka_topic_data, kafka_topic_predictions]
new_topics = [NewTopic(name=topic, num_partitions=1, replication_factor=1) for topic in topics]
existing_topics = client.list_topics()
existing_topic_names = set(existing_topics) # Convert to set for faster lookup
topics_to_create = [topic for topic in new_topics if topic.name not in existing_topic_names]
if topics_to_create:
client.create_topics(new_topics=topics_to_create)
for topic in topics_to_create:
logger.info(f"Topic {topic.name} created")
else:
logger.info("No new topics to create. All topics already exist.")
except Exception as e:
logger.error(f"An error occurred while creating Kafka topics: {e}")
finally:
client.close()
logger.info("Kafka client closed.")
# Kafka Producer initialization with retry logic
def create_producer(bootstrap_servers):
attempts = 0
while attempts < 5:
try:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'))
return producer
except Exception as e:
logger.error(f"Failed to create Kafka producer: {e}")
attempts += 1
time.sleep(2 ** attempts) # Exponential backoff
raise Exception("Failed to create Kafka producer after several attempts")
producer = create_producer(kafka_bootstrap_server)
def delivery_report(err, record_metadata):
""" Called once for each message produced to indicate delivery result. """
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.info(f"Message delivered to {record_metadata.topic} [{record_metadata.partition}]")
def on_message(ws, message):
"""Callback function to handle incoming WebSocket messages."""
data = json.loads(message)
if data.get('type') == 'trade':
for trade in data['data']:
symbol = trade['s']
record = {
'symbol': symbol,
'timestamp': datetime.fromtimestamp(trade['t'] / 1000.0).strftime('%Y-%m-%d %H:%M:%S'),
'price': trade['p'],
'volume': trade['v']
}
latest_trade_data[symbol] = record # Update latest trade data
try:
future = producer.send(kafka_topic_data, key=symbol, value=record)
future.add_callback(delivery_report)
future.add_errback(lambda exc: logger.error(f"Failed to send record to Kafka: {exc}"))
except Exception as e:
logger.error(f"Failed to send record to Kafka: {e}")
def on_error(ws, error):
"""Callback function to handle WebSocket errors."""
logger.error(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg):
"""Callback function to handle WebSocket closure."""
logger.info(f"### closed ### status: {close_status_code}, message: {close_msg}")
reconnect_websocket()
def reconnect_websocket():
"""Recreate and start the WebSocket connection."""
global ws_thread, ws
logger.info("Reconnecting to WebSocket...")
ws.close()
ws_thread.join() # Ensure previous thread is finished
ws = websocket.WebSocketApp(
f"wss://ws.finnhub.io?token={finnhub_api_key}",
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.on_open = on_open
ws_thread = threading.Thread(target=ws.run_forever)
ws_thread.start()
def on_open(ws):
"""Callback function to handle WebSocket connection opening."""
for symbol in symbols:
ws.send(json.dumps({'type': 'subscribe', 'symbol': symbol}))
logger.info("Subscribed to symbols: %s", symbols)
def generate_prediction():
"""Generates crypto prediction data based on the latest trade data."""
for symbol, data in latest_trade_data.items():
last_price = data['price']
prediction = last_price * (1 + random.uniform(-0.02, 0.02)) # Simple prediction logic
record = {
'symbol': symbol,
'time': data['timestamp'],
'close': last_price,
'prediction': prediction
}
try:
future = producer.send(kafka_topic_predictions, key=symbol, value=record)
future.add_callback(delivery_report)
future.add_errback(lambda exc: logger.error(f"Failed to send prediction to Kafka: {exc}"))
except Exception as e:
logger.error(f"Failed to send prediction to Kafka: {e}")
def job():
"""Job to generate predictions."""
generate_prediction()
if __name__ == "__main__":
# Ensure Kafka topics are created
create_kafka_topics()
# Initialize the WebSocket connection
ws = websocket.WebSocketApp(
f"wss://ws.finnhub.io?token={finnhub_api_key}",
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.on_open = on_open
# Schedule the prediction job
schedule.every(interval).seconds.do(job)
ws_thread = threading.Thread(target=ws.run_forever)
ws_thread.start()
# Run the scheduled jobs in the main thread
try:
while True:
schedule.run_pending()
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
finally:
producer.flush()
ws.close()
ws_thread.join()
logger.info("Producer and WebSocket connection closed.")
This is the error log from my producer.py:
2024-08-03 17:27:11 INFO:kafka.conn:Broker version identified as 2.5.0
2024-08-03 17:27:11 INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: connecting to kafka:9092 [('172.19.0.7', 9092) IPv4]
2024-08-03 17:27:11 INFO:kafka.conn:Probing node 1 broker version
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: Connection complete.
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection.
2024-08-03 17:27:11 INFO:kafka.conn:Broker version identified as 2.5.0
2024-08-03 17:27:11 INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2024-08-03 17:27:11 ERROR:__main__:An error occurred while creating Kafka topics: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='CryptoData', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[]), (topic='CryptoPrediction', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='CryptoData', error_code=36, error_message="Topic 'CryptoData' already exists."), (topic='CryptoPrediction', error_code=36, error_message="Topic 'CryptoPrediction' already exists.")])'.
2024-08-03 17:27:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection.
2024-08-03 17:27:11 INFO:__main__:Kafka client closed.
2024-08-03 17:27:12 INFO:websocket:Websocket connected
2024-08-03 17:27:12 INFO:__main__:Subscribed to symbols: ['BTCUSDT']
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: connecting to kafka:9092 [('172.19.0.7', 9092) IPv4]
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.7', 9092)]>: Connection complete.
2024-08-03 17:32:11 INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.7', 9092)]>: Closing connection.
``