I’m new to Kafka and Spark. I’m using Airflow to produce data to the “user_created” topic.
I have the following code to load data from the “user_created” topic into a Spark DataFrame, but I’m encountering an error.
spark_stream.py
def connect_to_kafka(spark_conn):
print("Function Started")
spark_df = None
try:
spark_df = spark_conn.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'broker:29092')
.option('subscribe', 'user_created')
.load()
print(spark_df)
print("Kafka dataframe created successfully")
except Exception as e:
print(f"Kafka dataframe could not be created due to {e}")
return spark_df
def create_spark_connection():
print("Connection Started")
s_conn = None
try:
s_conn = SparkSession.builder
.appName('SparkDataStreaming')
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.5.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1")
.config('spark.cassandra.connection.host', 'localhost')
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
print("Spark connection created successfully!")
except Exception as e:
print(f"Couldn't create the spark session due to exception {e}")
return s_conn
if __name__ == "__main__":
spark_conn = create_spark_connection()
if spark_conn is not None:
spark_df = connect_to_kafka(spark_conn)
Error
Function Started
Kafka dataframe could not be created due to An error occurred while calling o35.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
... 20 more
Here is my complete code: https://github.com/dhainiksuthar/airflow-kafka