Objective:
To set up a streaming job on Amazon EMR Serverless to process weather data from Amazon MSK (Managed Streaming for Apache Kafka) and write the word count results to an S3 bucket.
Steps Taken:
Script Preparation:
Created a Spark Streaming application script (weather_data_streaming.py) to read data from Kafka, process it, and write the results to S3.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, from_json
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, LongType, MapType
# Initialize Spark session
spark = SparkSession.builder.appName("KafkaWordCount").getOrCreate()
# Kafka parameters
kafka_bootstrap_servers = "kafkabootstrap server url"
kafka_topic = "weather-data-cities"
output_path = "s3://spark-streaming1/output/wordcount/"
# Define the schema for the JSON data
schema = StructType([
StructField("base", StringType(), True),
StructField("clouds", MapType(StringType(), IntegerType()), True),
StructField("cod", IntegerType(), True),
StructField("coord", MapType(StringType(), FloatType()), True),
StructField("dt", LongType(), True),
StructField("id", LongType(), True),
StructField("main", MapType(StringType(), FloatType()), True),
StructField("name", StringType(), True),
StructField("sys", MapType(StringType(), StringType()), True),
StructField("timezone", IntegerType(), True),
StructField("visibility", IntegerType(), True),
StructField("weather", MapType(StringType(), StringType()), True),
StructField("wind", MapType(StringType(), FloatType()), True)
])
# Read streaming data from Kafka
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_bootstrap_servers).option("subscribe", kafka_topic).load()
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
json_df = kafka_df.select(from_json(col("value"), schema).alias("data"))
# Extract the necessary fields for word count
fields_df = json_df.select(
col("data.base").alias("base"),
col("data.clouds.all").alias("clouds_all"),
col("data.coord.lat").alias("lat"),
col("data.coord.lon").alias("lon"),
col("data.main.feels_like").alias("feels_like"),
col("data.main.temp").alias("temp"),
col("data.weather.description").alias("weather_description")
)
# Combine the extracted fields into a single column for word count
combined_df = fields_df.withColumn("combined",
split(col("base"), "\s+") +
split(col("clouds_all").cast(StringType()), "\s+") +
split(col("lat").cast(StringType()), "\s+") +
split(col("lon").cast(StringType()), "\s+") +
split(col("feels_like").cast(StringType()), "\s+") +
split(col("temp").cast(StringType()), "\s+") +
split(col("weather_description"), "\s+")
)
# Explode the combined column into individual words
words_df = combined_df.select(explode(col("combined")).alias("word"))
# Perform word count
word_counts = words_df.groupBy("word").count().orderBy(col("count").desc())
# Write the word counts to S3
query = word_counts.writeStream.outputMode("complete").format("csv").option("path", output_path).option("checkpointLocation", "s3://spark-streaming1/checkpoints/wordcount").start()
query.awaitTermination()
Configuration:
Submitted the job through the EMR Serverless console with the necessary Spark properties, including specifying the Kafka package.
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 –conf spark.executor.memory=2g –conf spark.executor.cores=2 –conf spark.driver.memory=2g –conf spark.driver.cores=1 –conf spark.executor.instances=4 –conf spark.sql.streaming.checkpointLocation=s3://spark-streaming1/checkpoints/wordcount –conf spark.kafka.bootstrap.servers=b-1.datapipeline.ibvgjc.c14.kafka.us-east-1.amazonaws.com:9092,b-2.datapipeline.ibvgjc.c14.kafka.us-east-1.amazonaws.com:9092 –conf spark.dynamicAllocation.enabled=true –conf spark.dynamicAllocation.minExecutors=1 –conf spark.dynamicAllocation.maxExecutors=10 –conf spark.sql.shuffle.partitions=200 –conf spark.streaming.backpressure.enabled=true –repositories https://repo1.maven.org/maven2/
Errors Encountered:
-
Failed to Find Kafka Source:
pyspark.errors.exceptions.captured.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide. -
Dependency Resolution Issues:
Despite specifying the –packages parameter, the dependencies could not be resolved:
org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0: not found -
Timeout and Unresolved Dependencies:
Network issues led to timeout errors while trying to fetch dependencies:
Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.pom (java.net.ConnectException: Connection timed out)
Iam seeking guidance on resolving these dependency issues and successfully running the Spark streaming job with Kafka on Amazon EMR Serverless. Any insights or recommendations to ensure proper dependency resolution and network configuration would be greatly appreciated.
user26129742 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.