I have a python script to predict an output using Logistic regression. Input file has only 40k entries with 10 columns which is relatively a small dataset for spark engine. This code is part of a pipeline where the preceding codes runs for 1-2 hours (in python) and the end result is to predict an outcome using logistic regression. The Python prediction script runs only for 11 seconds however the spark code for the same runs for more than an hour. This is strange!
Below is a portion of the script in Python and Pyspark. In pyspark part of the python code runs for more than 1 hour but displays the results that’s expected.
import scorecardpy as scard
import statsmodels.api as stm
logistic_model_pickle= "read pickle file for model"
data = scard.woebin_ply(input, pickle_file)
column_woe = [x for x in data.columns if x.endswith('woe')]
data['pred_prob'] = logistic_model_pickle.predict(stm.add_constant(data[column_woe]))
##Corresponding pyspark script
data = "same input file form python code in parquet format"
group_column = ['ID', 'DATE', 'Segment']
original_columns = data.columns
StructField('ID', IntegerType(), True),
StructField('DATE', DateType(), True),
StructField('Segment', IntegerType(), True),
StructField('col1_woe', DoubleType(), True),
StructField('Col2_woe', DoubleType(), True),
StructField('Col3_woe', DoubleType(), True),
StructField('Col4_woe', DoubleType(), True),
StructField('Col5_woe', DoubleType(), True),
StructField('Col6_woe', LongType(), True),
StructField('Col7_woe', DoubleType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def func_woe(data_frame):
model1 = "read a pickle file from blob storage"
dataframe_woe = scard.woebin_ply(data_frame, model1 )
data_woe = data.groupBy(group_column).apply(func_woe)
data = data.join(data_woe, on = group_column, how='left')
display(data) ## This is where the code takes more than 1 hour where the same line in python runs for 1 second.
<code>##Python script
import scorecardpy as scard
import statsmodels.api as stm
logistic_model_pickle= "read pickle file for model"
data = scard.woebin_ply(input, pickle_file)
column_woe = [x for x in data.columns if x.endswith('woe')]
data['pred_prob'] = logistic_model_pickle.predict(stm.add_constant(data[column_woe]))
##Corresponding pyspark script
data = "same input file form python code in parquet format"
group_column = ['ID', 'DATE', 'Segment']
original_columns = data.columns
schema = StructType([
StructField('ID', IntegerType(), True),
StructField('DATE', DateType(), True),
StructField('Segment', IntegerType(), True),
StructField('col1_woe', DoubleType(), True),
StructField('Col2_woe', DoubleType(), True),
StructField('Col3_woe', DoubleType(), True),
StructField('Col4_woe', DoubleType(), True),
StructField('Col5_woe', DoubleType(), True),
StructField('Col6_woe', LongType(), True),
StructField('Col7_woe', DoubleType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def func_woe(data_frame):
model1 = "read a pickle file from blob storage"
dataframe_woe = scard.woebin_ply(data_frame, model1 )
return dataframe_woe
data_woe = data.groupBy(group_column).apply(func_woe)
data = data.join(data_woe, on = group_column, how='left')
display(data) ## This is where the code takes more than 1 hour where the same line in python runs for 1 second.
</code>
##Python script
import scorecardpy as scard
import statsmodels.api as stm
logistic_model_pickle= "read pickle file for model"
data = scard.woebin_ply(input, pickle_file)
column_woe = [x for x in data.columns if x.endswith('woe')]
data['pred_prob'] = logistic_model_pickle.predict(stm.add_constant(data[column_woe]))
##Corresponding pyspark script
data = "same input file form python code in parquet format"
group_column = ['ID', 'DATE', 'Segment']
original_columns = data.columns
schema = StructType([
StructField('ID', IntegerType(), True),
StructField('DATE', DateType(), True),
StructField('Segment', IntegerType(), True),
StructField('col1_woe', DoubleType(), True),
StructField('Col2_woe', DoubleType(), True),
StructField('Col3_woe', DoubleType(), True),
StructField('Col4_woe', DoubleType(), True),
StructField('Col5_woe', DoubleType(), True),
StructField('Col6_woe', LongType(), True),
StructField('Col7_woe', DoubleType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def func_woe(data_frame):
model1 = "read a pickle file from blob storage"
dataframe_woe = scard.woebin_ply(data_frame, model1 )
return dataframe_woe
data_woe = data.groupBy(group_column).apply(func_woe)
data = data.join(data_woe, on = group_column, how='left')
display(data) ## This is where the code takes more than 1 hour where the same line in python runs for 1 second.
The woebin_ply function returns a dataframe with woe values for all columns (ranging from Col1 till Col7) and so I used a Panda_udf with Grouped_map. Can anyone advise where things might go wrong?