After using the dask multiprocessing scheduler for a long period of time, I noticed that the python processes started by the multiprocessing scheduler take a lot of memory. How can I restart the worker pool?
How to terminate workers started by dask multiprocessing scheduler?
2.2k views Asked by Arco Bast At
1
Update: You can do this to kill the workers started by the multiprocessing scheduler:
from dask.context import _globals pool = _globals.pop('pool') # remove the pool from globals to make dask create a new one pool.close() pool.terminate() pool.join()First answer:
For tasks that consume a lot of memory, I prefer to use the
distributedscheduler even in localhost.It's very straightforward:
distributed.Clientclass to submit your jobs.In [1]: from distributed import Client In [2]: client = Client('1.2.3.4:8786') In [3]: client <Client: scheduler="127.0.0.1:61829" processes=8 cores=8> In [4]: from distributed.diagnostics import progress In [5]: import dask.bag In [6]: data = dask.bag.range(10000, 8) In [7]: data dask.bag In [8]: future = client.compute(data.sum()) In [9]: progress(future) [########################################] | 100% Completed | 0.0s In [10]: future.result() 49995000I found out this way more reliable than the default scheduler. I prefer explicitly submit the task and handle the future to use the progress widget, which is really nice in a notebook. Also you can still do stuff while waiting the results.
If you get errors due to memory issues, you can restart the workers or the scheduler (start all over again), use smaller chunks of data and try again.