I want to make a Dask Delayed flow which includes CPU and GPU tasks. GPU tasks can only run on GPU workers, and a GPU worker only has one GPU and can only handle one GPU task at a time.
Unfortunately, I see no way to specify worker resources in the Delayed API.
Here is common code:
client = Client(resources={'GPU': 1})
@delayed
def fcpu(x, y):
sleep(1)
return x + y
@delayed
def fgpu(x, y):
sleep(1)
return x + y
Here is the flow written in pure Delayed. This code will not behave properly because it doesn't know about the GPU resource.
# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
# STEP TWO: two GPU tasks
c = fgpu(a, b) # Requires 1 GPU
d = fgpu(a, b) # Requires 1 GPU
# STEP THREE: final CPU task
e = fcpu(c, d)
%time e.compute() # 3 seconds
This is the best solution I could come up with. It combines Delayed syntax with Client.compute() futures. It seems to behave correctly, but it is very ugly.
# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
a_future, b_future = client.compute([a, b]) # Wo DON'T want a resource limit
# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future, b_future)
d = fgpu(a_future, b_future)
c_future, d_future = client.compute([c, d], resources={'GPU': 1})
# STEP THREE: final CPU task
e = fcpu(c_future, d_future)
res = e.compute()
Is there a better way to do this?
Maybe an approach similar to what is described in https://jobqueue.dask.org/en/latest/examples.html It is a case of processing on one GPU machine or a machine with SSD.