I’m building an ETL for fetching data from an API, transforming it and uploading it in mongodb.
I use pyspark to distribute this process and I’m trying to use airflow to automate it.
However, I’ve hit a roadblock. After setting up Spark, Airflow and everything else, my Spark job is finally executing—but with an error 🙂
[2024-09-22, 13:13:46 UTC] {spark_submit.py:488} INFO - Driver stacktrace:
[2024-09-22, 13:13:46 UTC] {spark_submit.py:488} INFO - 24/09/22 13:13:46 INFO TaskSetManager: Lost task 6.2 in stage 0.0 (TID 19) on 172.21.0.6, executor 0: java.io.InvalidClassException (org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = -6188971942555435033, local class serialVersionUID = 553100815431272095) [duplicate 19]
[2024-09-22, 13:13:46 UTC] {spark_submit.py:488} INFO - 24/09/22 13:13:46 INFO TaskSetManager: Lost task 5.3 in stage 0.0 (TID 20) on 172.21.0.9, executor 1: java.io.InvalidClassException (org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = -6188971942555435033, local class serialVersionUID = 553100815431272095) [duplicate 20]
[2024-09-22, 13:13:46 UTC] {spark_submit.py:488} INFO - 24/09/22 13:13:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - 24/09/22 13:13:46 INFO DAGScheduler: Job 0 failed: foreach at /opt/***/AstroMRS/movies.py:25, took 3.357094 s
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - Traceback (most recent call last):
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/opt/***/AstroMRS/movies.py", line 36, in <module>
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - etl('/movie/popular', 500, 'movies_collection')
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/opt/***/AstroMRS/movies.py", line 25, in etl
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - rdd.foreach(lambda page:
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - ^^^^^^^^^^^^^^^^^^^^^^^^
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/rdd.py", line 1766, in foreach
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/rdd.py", line 2316, in count
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/rdd.py", line 2291, in sum
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/rdd.py", line 2044, in fold
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/rdd.py", line 1833, in collect
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - File "/home/***/.local/lib/python3.12/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
[2024-09-22, 13:13:47 UTC] {spark_submit.py:488} INFO - : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 16) (172.21.0.9 executor 1): java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = -6188971942555435033, local class serialVersionUID = 553100815431272095
this is how I made my docker-compose:
version: '3'
x-airflow-common:
&airflow-common
build:
context: .
dockerfile: Dockerfile
env_file:
- airflow.env
volumes:
- .:/opt/airflow/AstroMRS
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
- postgres
networks:
- movie-net
services:
mongodb:
image: mongo
ports:
- "27017:27017"
networks:
- movie-net
spark-master:
image: bitnami/spark:3.5.1
command: bash -c "pip install pymongo requests && bin/spark-class org.apache.spark.deploy.master.Master"
ports:
- "9090:8080"
- "7077:7077"
environment:
PYTHONPATH: '/opt/bitnami/spark'
volumes:
- ./src:/opt/bitnami/spark/src
networks:
- movie-net
spark-worker-1:
image: bitnami/spark:3.5.1
command: bash -c "pip install pymongo requests && bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077"
environment:
SPARK_MASTER_URL: spark://spark-master:7077
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 2G
PYTHONPATH: '/opt/bitnami/spark'
volumes:
- ./src:/opt/bitnami/spark/src
depends_on:
- spark-master
networks:
- movie-net
spark-worker-2:
image: bitnami/spark:3.5.1
command: bash -c "pip install pymongo requests && bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077"
environment:
SPARK_MASTER_URL: spark://spark-master:7077
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 2G
PYTHONPATH: '/opt/bitnami/spark'
volumes:
- ./src:/opt/bitnami/spark/src
depends_on:
- spark-master
networks:
- movie-net
postgres:
image: postgres:14.0
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
networks:
- movie-net
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
- airflow-init
airflow-scheduler:
<<: *airflow-common
command: scheduler
depends_on:
- airflow-init
airflow-init:
<<: *airflow-common
command: bash -c "airflow db migrate && airflow users create
--username $${AIRFLOW_ADMIN_USERNAME}
--firstname $${AIRFLOW_ADMIN_FIRSTNAME}
--lastname $${AIRFLOW_ADMIN_LASTNAME}
--role $${AIRFLOW_ADMIN_ROLE}
--email $${AIRFLOW_ADMIN_EMAIL}
--password $${AIRFLOW_ADMIN_PASSWORD}"
depends_on:
- postgres
volumes:
postgres-db-volume:
networks:
movie-net:
and here is my movies.py script:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
from src.fetch_movies import fetch_movies
from src.transform_movies import validation_aka_transformation
from src.store_movies import save_movies_mongo
load_dotenv()
api_url = os.getenv('API_URL')
authorization_key = os.getenv('AUTHORIZATION_KEY')
spark = SparkSession.builder.master("spark://spark-master:7077").appName("Movies Pipeline").getOrCreate()
def etl(endpoint, total_pages=None, filename=None):
if endpoint == '/movie/latest':
movies = fetch_movies(api_url, authorization_key, endpoint)
transformed_movies = validation_aka_transformation(movies)
save_movies_mongo(transformed_movies, 'movies_collection')
else:
pages = list(range(1, total_pages + 1))
rdd = spark.sparkContext.parallelize(pages, len(pages))
rdd.foreach(lambda page:
save_movies_mongo(
validation_aka_transformation(
fetch_movies(api_url, authorization_key, endpoint, page)
),
filename
)
)
print(f"ETL process completed for {endpoint}")
etl('/movie/popular', 500, 'movies_collection')
etl('/movie/top_rated', 500, 'movies_collection')
etl('/movie/latest')
The routes and everything are correct, I just don’t know how to handle this obstacle.
I would very appreciate any help!
Thank you!
I checked the version of spark on all spark workers and spark master and they are marched.
I run the spark job outside airflow and it worked fine!
Astro is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.