I'm noticing that when submitting jobs and running compute(), despite attempting to restrict work to specific nodes on the dask distributed cluster (using workers='...', allow_other_workers=False), the tasks seem to still be done by multiple workers.

For example, lets read in a CSV and then try to sum up the contents of a column:

import dask.dataframe as dd
dfut1 = client.submit(dd.read_csv, 
                     'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2009-01.csv', 
                     workers='w1', allow_other_workers=False)
df1 = client.gather(dfut1)
df1.Passenger_Count.sum().compute(workers='w1', allow_other_workers=None)

When running the final like (...sum...compute), just looking at the "Status" tab on the Dask Dashboard, it is clear the computation is being done by both worker on the cluster, not just w1 as directed. (This is supported by client.has_what(), which literally shows tasks spread across the cluster.)

I also tried allow_other_workers=False and simply leaving out the parameter, but all combinations lead to work being done across the cluster. I also tried replacing worker names with ip:port and using lists instead of strings without luck. Is there a way to truly force machine/node/worker affinity for data and tasks executed by tasks?

As background,

I started a dask scheduler my 1st machine

dask-scheduler

I started a dask worker on a 2nd machine

dask-worker <schedulerip:port> --name w1

I started another dask worker on a 3rd machine

dask-worker <schedulerip:port> --name w2

This is using dask==1.2.2 and distributed==1.28.0 on python 3.6.

1 Answers

1
MRocklin On Best Solutions

It looks like you are calling a dask dataframe function within a submit call. This is odd. You're submitting Dask code to run on a particular worker. That Dask code then calls back to the cluster to run things. There is no reason to call submit on a dask function.

See http://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections