Dask: How to use delayed functions with worker resources?

477 views Asked by At

I want to make a Dask Delayed flow which includes CPU and GPU tasks. GPU tasks can only run on GPU workers, and a GPU worker only has one GPU and can only handle one GPU task at a time.

Unfortunately, I see no way to specify worker resources in the Delayed API.

Here is common code:

client = Client(resources={'GPU': 1})

@delayed
def fcpu(x, y):
    sleep(1)
    return x + y

@delayed
def fgpu(x, y):
    sleep(1)
    return x + y

Here is the flow written in pure Delayed. This code will not behave properly because it doesn't know about the GPU resource.

# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)

# STEP TWO: two GPU tasks
c = fgpu(a, b)  # Requires 1 GPU
d = fgpu(a, b)  # Requires 1 GPU

# STEP THREE: final CPU task
e = fcpu(c, d)

%time e.compute()  # 3 seconds

This is the best solution I could come up with. It combines Delayed syntax with Client.compute() futures. It seems to behave correctly, but it is very ugly.

# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
a_future, b_future = client.compute([a, b]) # Wo DON'T want a resource limit

# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future, b_future)
d = fgpu(a_future, b_future)
c_future, d_future = client.compute([c, d], resources={'GPU': 1})

# STEP THREE: final CPU task
e = fcpu(c_future, d_future)
res = e.compute()

Is there a better way to do this?

2

There are 2 answers

1
s1mc0d3 On

Maybe an approach similar to what is described in https://jobqueue.dask.org/en/latest/examples.html It is a case of processing on one GPU machine or a machine with SSD.

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})
0
SultanOrazbayev On

This is possible with annotations, see the example in docs:

x = dd.read_csv(...)
with dask.annotate(resources={'GPU': 1}):
    y = x.map_partitions(func1)
z = y.map_partitions(func2)

z.compute(optimize_graph=False)

As noted in the graph, such annotations can be lost during optimization, hence the kwarg optimize_graph=False.