I'm noticing that when submitting jobs and running compute(), despite attempting to restrict work to specific nodes on the dask distributed cluster (using workers='...', allow_other_workers=False), the tasks seem to still be done by multiple workers.
For example, lets read in a CSV and then try to sum up the contents of a column:
import dask.dataframe as dd dfut1 = client.submit(dd.read_csv, 'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2009-01.csv', workers='w1', allow_other_workers=False) df1 = client.gather(dfut1) df1.Passenger_Count.sum().compute(workers='w1', allow_other_workers=None)
When running the final like (...sum...compute), just looking at the "Status" tab on the Dask Dashboard, it is clear the computation is being done by both worker on the cluster, not just w1 as directed. (This is supported by client.has_what(), which literally shows tasks spread across the cluster.)
I also tried
allow_other_workers=False and simply leaving out the parameter, but all combinations lead to work being done across the cluster. I also tried replacing worker names with ip:port and using lists instead of strings without luck. Is there a way to truly force machine/node/worker affinity for data and tasks executed by tasks?
I started a dask scheduler my 1st machine
I started a dask worker on a 2nd machine
dask-worker <schedulerip:port> --name w1
I started another dask worker on a 3rd machine
dask-worker <schedulerip:port> --name w2
This is using dask==1.2.2 and distributed==1.28.0 on python 3.6.