Why doesn't a prefect task fail, if a contained dask.distributed task fails?

81 views Asked by At

I'm running a workflow using Prefect using a DaskTaskRunner, which creates and holds a dask.distibuted.LocalCluster instance. Inside a prefect task I use a dask_ml.RandomSearchCV and fit it, which by my understanding should happen using the LocalCluster.

When the fitting fails for any reason (i.e. the dask-task submitted to the LocalCluster by the RandomizedSearchCV fails), the error is printed but it is not propagated to the outer prefect task. The fitting runs (and fails) endlessly.

A minimal example

from distributed import Client
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from dask_ml.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
import pandas as pd

@task
def sample_task():
    X = pd.DataFrame({"A": [1, 2, 3], "B": ["str", 1, 2], "C": [0, 0, 0]})
    y = X["C"]

    estimator = RandomizedSearchCV(
        estimator=RandomForestRegressor(),
        param_distributions={'max_depth': [3, 5, 7]},
    )

    estimator.fit(X, y)


@flow
def sample_flow():
    sample_task.submit()

    print("This should not be printed")


sample_flow.with_options(
    task_runner=DaskTaskRunner(
        cluster_class="dask.distributed.LocalCluster"
    )
)()

As expected, the fitting fails because of the string inside X. However I would expect that the prefect task sample_task() then also fails, but it doesn't.

Probably a retry limit for the dask task would also do it, but those are submitted by RandomizedSearchCV. Is there any way I can propagate that error, so that my prefect task also fails?

1

There are 1 answers

0
jeffhale On BEST ANSWER

dask_ml.model_selection.RandomizedSearchCV raises a warning and not an error, so Prefect doesn't stop dask-ml from retrying.

You can see the dask-ml code here