I've been trying to parallelize hyperparameter tuning for my prophet model for around 100 combinations of hyperparameters saved in the dataframe params_df. I want to parallelize the hyperparameter tuning operation and have done the following:
schema = StructType([StructField('hyperparameters', StringType(), True),
StructField('mape', DoubleType(), True)])
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def grid_search(hyperparameters):
train=valid_train_df.drop(columns='Series').copy()
cutoffs=[train.loc[len(train)-5,'ds'],train.loc[len(train)-3,'ds']]
hyperparameters=hyperparameters.reset_index(drop=True)
params=ast.literal_eval(hyperparameters['hyperparameters'][0])
prophet_test = Prophet(**params)
for regressor in indep:
prophet_test.add_regressor(regressor)
prophet_test.fit(train[indep+['ds','y']])
df_cv = cross_validation(model=prophet_test, horizon='62 days', cutoffs=cutoffs)
df_cv['monthly_mape']=abs(df_cv['y']-df_cv['yhat'])/df_cv['y']
mape=df_cv['monthly_mape'].mean()
tuning_results = pd.DataFrame({'hyperparameters':str(params),'mape':mape},index=[0])
return tuning_results
This part makes sure that I run pandas_udf on each set of hyperparameters. spark_parallel_df is basically the spark dataframe for params_df containing the hyperparameters column.
run_udf_results=spark_parallel_df.groupby('hyperparameters').apply(grid_search)
hyperparameter_results=run_udf_results.toPandas()
But applying this seems is not giving me the desired results, as it is taking around 40 minutes to complete the hyperparameter tuning.
valid_train_df is the input dataframe used
I want to know if I'm missing something here?