I have a power user compute with a single driver node and I’m trying to parallelize forecasting across multiple series by aggregating the data and doing a groupBy and then an apply on the groupBy.
The forecasting function is passed as an pandas UDF to the apply function.
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
result_schema =StructType([
StructField('ts',DateType()),
StructField('env',StringType()),
StructField('source',StringType()),
StructField('granularity',StringType()),
StructField('exog',StringType()),
StructField('target',FloatType()),
StructField('forecast',FloatType())
])
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def greykite_forecast(df):
# df -> partitioned df
# Forecasting
return forecast # Follows result_schema
agg_df = spark.read.csv('agg_data.csv',header=True,inferSchema=True)
env_source_history = (
agg_df
.repartition(sc.defaultParallelism, ['env', 'source', 'granularity','exog'])
).cache()
results = (
env_source_history
.groupBy('env', 'source', 'granularity','exog')
.apply(greykite_forecast)
)
display(results)
The driver node has 16 cores, I expected that Spark would distribute the work among all the cores parallelly but soon I realized that the job was being executed serially. I looked around and found that there was only a single executor. So I tried changing that by adding spark.executor.cores 1
to the spark config file, but this didn’t help.