Defining dask worker resources for a dataframe operation

145 views Asked by At

I am applying multiple operations to a dask dataframe. Can I define distributed worker resource requirements for particular operation?

e.g. I call something like:

df.fillna(value="").map_partitions(...).map(...)

I want to specify resource requirement for map_partitions() (potentially different than the ones for map()), but seems like the method does not accept resources parameter.

PS. Alternatively, I figured out that I can call client.persist() after map_partitions() and specify resources in this call, but this immediately triggers the computation.

1

There are 1 answers

1
MRocklin On BEST ANSWER

You can specify resource constraints on particular parts of your computation when you call compute or persist by providing the intermediate collection.

x = dd.read_csv(...)
y = x.map_partitions(func)
z = y.map(func2)

z.compute(resources={tuple(y._keys()): {'GPU': 1}})

Thank you for the question, I went to include a link to the documentation about this feature and found that it was undocumented. I'll fix shortly.

It looks like today there is a bug where the intermediate keys may be optimized out in some situations (though this is less likely for dataframe operation), so you may also want to pass the optimize_graph=False keyword.

z.compute(resources={tuple(y._keys()): {'GPU': 1}}, optimize_graph=False)

See https://github.com/dask/distributed/pull/1362