Hi I am trying to write streams of data from kafka to cassandra on docker container automated with Airflow DAG. Here are the things that are working as expected
- DAG sending data and I can see the output on control center
- spark-submit –master spark://localhost:7077 spark_stream.py working as expected
enter image description here
enter image description here - I can see the Keyspace and the table created on cqlsh
enter image description here
enter image description here
The error:
For some reason spark fails to write the data to cassandra and throws this error:
24/08/06 18:21:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@75f79aff,com.datastax.spark.connector.cql.CassandraConnector@3698a9ca,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,UUIDType)),ArrayBuffer(),Stream(ColumnDef(cleaned_tweet,RegularColumn,VarCharType), ColumnDef(date,RegularColumn,VarCharType), ColumnDef(lat,RegularColumn,VarCharType), ColumnDef(lng,RegularColumn,VarCharType), ColumnDef(original_tweet,RegularColumn,VarCharType), ColumnDef(place,RegularColumn,VarCharType), ColumnDef(sentiment,RegularColumn,VarCharType), ColumnDef(tweet,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(BytesInBatch(1024),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(original_tweet,StringType,true),StructField(tweet,StringType,true),StructField(lat,StringType,true),StructField(lng,StringType,true),StructField(place,StringType,true),StructField(date,StringType,true),StructField(sentiment,StringType,true),StructField(cleaned_tweet,StringType,true)),org.apache.spark.SparkConf@618b8d53)] is aborting.
24/08/06 18:21:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@75f79aff,com.datastax.spark.connector.cql.CassandraConnector@3698a9ca,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,UUIDType)),ArrayBuffer(),Stream(ColumnDef(cleaned_tweet,RegularColumn,VarCharType), ColumnDef(date,RegularColumn,VarCharType), ColumnDef(lat,RegularColumn,VarCharType), ColumnDef(lng,RegularColumn,VarCharType), ColumnDef(original_tweet,RegularColumn,VarCharType), ColumnDef(place,RegularColumn,VarCharType), ColumnDef(sentiment,RegularColumn,VarCharType), ColumnDef(tweet,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(BytesInBatch(1024),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(original_tweet,StringType,true),StructField(tweet,StringType,true),StructField(lat,StringType,true),StructField(lng,StringType,true),StructField(place,StringType,true),StructField(date,StringType,true),StructField(sentiment,StringType,true),StructField(cleaned_tweet,StringType,true)),org.apache.spark.SparkConf@618b8d53)] aborted.
This is how I create the table:
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
original_tweet TEXT,
tweet TEXT,
lat TEXT,
lng TEXT,
place TEXT,
date TEXT,
sentiment TEXT,
cleaned_tweet TEXT,
id UUID PRIMARY KEY
);
""")
logging.info("Table created successfully!")
This is the schema:
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("original_tweet", StringType(), False),
StructField("tweet", StringType(), False),
StructField("lat", StringType(), False),
StructField("lng", StringType(), False),
StructField("place", StringType(), False),
StructField("date", StringType(), False),
StructField("sentiment", StringType(), False),
StructField("cleaned_tweet", StringType(), False),
])
sel = spark_df.selectExpr("CAST(value AS STRING)")
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
And this is my Kafka_stream.py:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import uuid
default_args = {
'owner': 'airscholar',
'start_date': datetime(2024, 8, 1, 10, 00)
}
def get_data():
import pandas as pd
from json import loads, dumps
import os, logging
df = pd.read_csv('./dataframe.csv')
logging.info(f"DataFrame shape: {df.shape}")
df.drop(['Unnamed: 0', 'id', 'mentions'], axis=1, inplace=True)
return df
def format_data(data):
import os, logging
result = data.to_json(orient='records')
logging.info(f"Formatted data to JSON: {result[:100]}...")
return result
def stream_data():
import json
from json import loads, dumps
from kafka import KafkaProducer
import time
import logging
import traceback
producer = KafkaProducer(bootstrap_servers=['broker:29092'], max_block_ms=5000)
curr_time = time.time()
while True:
if time.time() > curr_time + 60: #1minute
break
try:
df = get_data()
data_json = format_data(df)
data_list = json.loads(data_json)
for record in data_list[0:2]:
record['id'] = str(uuid.uuid4())
logging.info(f"Sending record: {record}")
producer.send('users_created', json.dumps(record).encode('utf-8'))
logging.info(f"Sent {len(data_list)} records")
producer.flush()
except Exception as e:
logging.error(f'An error occured: {e}')
logging.error(traceback.format_exc())
continue
# stream_data()
with DAG('user_automation', default_args=default_args, schedule='@daily', catchup=False) as dag:
streaming_task = PythonOperator(
task_id = 'stream_twitter_data_from_api',
python_callable=stream_data
)
I tried writing the dataframe to a csv file and to the console before streaming and that throws the exact same error.
mambDa is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.