Dask: How to submit jobs to only two processes in a LocalCluster?

65 views Asked by At

I'm trying to run a dask.delayed function which is highly computationally- and memory-intense and therefore prone to OOM crashed and over-utilization errors if not scheduled properly.

I'm creating dask.delayed functions and gathering them non-asynchronously to avoid over-utilization:

import dask
import time
import copy
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster

@dask.delayed
def intense_func(volume, nrepeats):
    
    reference = copy.deepcopy(volume)
    
    start = time.time()
    
    for repeat in range(nrepeats):
        volume = np.fft.fftn(volume)
        volume = np.fft.ifftn(volume)
        
    end = time.time()
    
    same = np.allclose(reference, volume)
    
    return_dict = {'returns': same, 'time': end-start}
    
    return pd.DataFrame(return_dict, index=[0])

# 96-core machine
num_workers=2
threads_per_worker=48

cluster = LocalCluster(
    n_workers=num_workers,
    threads_per_worker=threads_per_worker,
    resources = {'foo':1},
    processes=False,
)

client = Client(cluster)

nx = ny = nz = 512
volume = np.random.uniform(low=0.,high=1.,size=(nx,ny,nz))

task_graph = []
ntransforms = 20
tasks = 20
for task in range(tasks):
    
    # generate dataset based on random parameters
    istart,iend = np.random.randint(0,nx//2), np.random.randint(nx//2,nx)
    jstart,jend = np.random.randint(0,ny//2), np.random.randint(ny//2,ny)
    kstart,kend = np.random.randint(0,nz//2), np.random.randint(nz//2,nz)
    
    subvolume = volume[istart:iend,jstart:jend,kstart:kend]
    
    task_graph.append(intense_func(subvolume, ntransforms))
    
df = dask.compute(*task_graph, resources={'foo':1}) # 'foo' doesn't seem right, but it doesn't schedule properly without it ¯\_(ツ)_/¯
df = pd.concat(df).reset_index(drop=True)

Currently, the program is successfully scheduling the functions to run synchronously (i.e. No worker is running more than one task at a time). However, these are being spawned on different processes, causing memory to accumulate and crash the machines with an OOM error. (Related note: I get errors like this: distributed.utils_perf - WARNING - full garbage collections took 17% CPU time recently (threshold: 10%) before failure).

So what I'm left with is a task graph that looks like this:

actual task stream

Which only solves the issue of over-utilization. When I actually want something like this:

desired task stream

Which would probably also solve the OOM errors. Is it possible to only allow Dask to spawn two workers, and only allow these two workers to do the work, without creating new processes beyond these two workers?

Thanks in advance :)

0

There are 0 answers