I'm running a workflow using Prefect using a DaskTaskRunner
, which creates and holds a dask.distibuted.LocalCluster
instance.
Inside a prefect task I use a dask_ml.RandomSearchCV
and fit it, which by my understanding should happen using the LocalCluster
.
When the fitting fails for any reason (i.e. the dask-task submitted to the LocalCluster
by the RandomizedSearchCV
fails), the error is printed but it is not propagated to the outer prefect task. The fitting runs (and fails) endlessly.
A minimal example
from distributed import Client
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from dask_ml.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
import pandas as pd
@task
def sample_task():
X = pd.DataFrame({"A": [1, 2, 3], "B": ["str", 1, 2], "C": [0, 0, 0]})
y = X["C"]
estimator = RandomizedSearchCV(
estimator=RandomForestRegressor(),
param_distributions={'max_depth': [3, 5, 7]},
)
estimator.fit(X, y)
@flow
def sample_flow():
sample_task.submit()
print("This should not be printed")
sample_flow.with_options(
task_runner=DaskTaskRunner(
cluster_class="dask.distributed.LocalCluster"
)
)()
As expected, the fitting fails because of the string inside X
. However I would expect that the prefect task sample_task()
then also fails, but it doesn't.
Probably a retry limit for the dask task would also do it, but those are submitted by RandomizedSearchCV
.
Is there any way I can propagate that error, so that my prefect task also fails?
dask_ml.model_selection.RandomizedSearchCV raises a warning and not an error, so Prefect doesn't stop dask-ml from retrying.
You can see the dask-ml code here