Submit code for execution to all processes in a concurrent.futures.ProcessPool

423 views Asked by At

Context:

  • A Python application server that uses a concurrent.futures.process.ProcessPool to execute code
  • We sometimes want to hot reload imported code without restarting the entire server process

(yes I know importlib.reload has caveats)

To get this to work I imagine I would have to execute the importlib.reload in every multiprocessing process that is managed by the process pool.

Is there a way to submit something to all processes in a process pool?

1

There are 1 answers

4
Darkonaut On

I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.

Is there a way to submit something to all processes in a process pool?

The challenge here lies in assuring that really all processes get this something once and only once and no further execution takes place until every process got it.

You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]]). The barrier will hold back parties calling barrier.wait() until every party has done so and then release them all at once.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

If you are okay with keeping barrier a global and multiprocessing.get_context()._name returns "fork", you don't need to use the initializer because globals will be inherited and accessible through forking.