How to terminate workers started by dask multiprocessing scheduler?

2.1k views Asked by At

After using the dask multiprocessing scheduler for a long period of time, I noticed that the python processes started by the multiprocessing scheduler take a lot of memory. How can I restart the worker pool?

1

There are 1 answers

2
R. Max On BEST ANSWER

Update: You can do this to kill the workers started by the multiprocessing scheduler:

from dask.context import _globals
pool = _globals.pop('pool')  # remove the pool from globals to make dask create a new one
pool.close()
pool.terminate()
pool.join()

First answer:

For tasks that consume a lot of memory, I prefer to use the distributed scheduler even in localhost.

It's very straightforward:

  1. Start the scheduler in one shell:
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:       1.2.3.4:8786
distributed.scheduler - INFO -        http at:       1.2.3.4:9786
distributed.bokeh.application - INFO - Web UI: http://1.2.3.4:8787/status/
distributed.scheduler - INFO - -----------------------------------------------
distributed.core - INFO - Connection from 1.2.3.4:56240 to Scheduler
distributed.core - INFO - Connection from 1.2.3.4:56241 to Scheduler
distributed.core - INFO - Connection from 1.2.3.4:56242 to Scheduler
  1. Start the worker in another shell, you can adjust the parameters accordingly:
$ dask-worker  --nprocs 8 --nthreads 1 --memory-limit .8 1.2.3.4:8786
distributed.nanny - INFO -         Start Nanny at:            127.0.0.1:61760
distributed.nanny - INFO -         Start Nanny at:            127.0.0.1:61761
distributed.nanny - INFO -         Start Nanny at:            127.0.0.1:61762
distributed.nanny - INFO -         Start Nanny at:            127.0.0.1:61763
distributed.worker - INFO -       Start worker at:            127.0.0.1:61765
distributed.worker - INFO -              nanny at:            127.0.0.1:61760
distributed.worker - INFO -               http at:            127.0.0.1:61764
distributed.worker - INFO - Waiting to connect to:            127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.nanny - INFO -         Start Nanny at:            127.0.0.1:61767
distributed.worker - INFO -                Memory:                    1.72 GB
distributed.worker - INFO -       Local Directory: /var/folders/55/nbg15c6j4k3cg06tjfhqypd40000gn/T/nanny-11ygswb9
...
  1. Finally use the distributed.Client class to submit your jobs.
In [1]: from distributed import Client

In [2]: client = Client('1.2.3.4:8786')

In [3]: client
<Client: scheduler="127.0.0.1:61829" processes=8 cores=8>

In [4]: from distributed.diagnostics import progress

In [5]: import dask.bag

In [6]: data = dask.bag.range(10000, 8)

In [7]: data
dask.bag

In [8]: future = client.compute(data.sum())

In [9]: progress(future)
[########################################] | 100% Completed |  0.0s
In [10]: future.result()
49995000

I found out this way more reliable than the default scheduler. I prefer explicitly submit the task and handle the future to use the progress widget, which is really nice in a notebook. Also you can still do stuff while waiting the results.

If you get errors due to memory issues, you can restart the workers or the scheduler (start all over again), use smaller chunks of data and try again.