How to assign values to array from inside the worker_funtion of multiprocessing.Pool.map?

1.6k views Asked by At

Basically what I want is to insert those 2's into ar, so that ar gets changed outside the worker_function.

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(i=None, ar=None):
    val = 2
    ar[i] = val
    print(ar)


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar=ar)
    mp.Pool(1).map(func_part, range(2))
    print(ar)


if __name__ == '__main__':
    main()

The only thing I can achieve so far is changing the copy of ar inside worker_function but not outside the function:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
[0. 0. 0. 0. 0.]
2

There are 2 answers

3
Darkonaut On BEST ANSWER

For performance you should use a shared-memory multiprocessing.Array here to avoid reconstructing and sending arrays across different processes again and again. The array will be the same in all processes, which isn't the case in your example where you send copies around. That's also the reason you don't see the changes made in the parent.

import multiprocessing as mp
import numpy as np


def worker_function(i):
    global arr
    val = 2
    arr[i] = val
    print(mp.current_process().name, arr[:])


def init_arr(arr):
    globals()['arr'] = arr


def main():
    # as long as we don't conditionally modify the same indices 
    # from multiple workers, we don't need the lock ...
    arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
    mp.Pool(2, initializer=init_arr, initargs=(arr,)).map(worker_function, range(5))
    print(mp.current_process().name, arr[:])


if __name__ == '__main__':
    main()

Output:

ForkPoolWorker-1 [2, 0, 0, 0, 0]
ForkPoolWorker-2 [2, 2, 0, 0, 0]
ForkPoolWorker-1 [2, 2, 2, 0, 0]
ForkPoolWorker-2 [2, 2, 2, 2, 0]
ForkPoolWorker-1 [2, 2, 2, 2, 2]
MainProcess [2, 2, 2, 2, 2]

Process finished with exit code 0
0
Booboo On

First, your arguments to worker_function are defined in the wrong order.

As you have observed, each process gets a copy of the array. The best you can do is to return the modified array:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for array in arrays:
        print(array)


if __name__ == '__main__':
    main()

Prints:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]

But now you are dealing with two, separately modified arrays. You would have to add additional logic to merge the results of these two arrays into one:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for i in range(2):
        ar[i] = arrays[i][i]
    print(ar)


if __name__ == '__main__':
    main()

Prints:

[2. 2. 0. 0. 0.]

But what would make more sense would be for the worker_function to just return a tuple giving the index of the element being modified and the new value:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    return i, i + 3 # index, new value


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    results = mp.Pool(2).map(func_part, range(2))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

Prints:

[3. 4. 0. 0. 0.]

Of course, if the worker_function modified multiple values, it would return a tuple of tuples.

And finally, if you do need to pass in an object to the sub-processes, there is another way using a pool initializer:

import numpy as np
import multiprocessing as mp


def pool_initializer(ar):
    global the_array

    the_array = ar


def worker_function(i):
    return i, the_array[i] ** 2 # index, value


def main():
    ar = np.array([1,2,3,4,5])
    with mp.Pool(5, pool_initializer, (ar,)) as pool:
        results = pool.map(worker_function, range(5))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

Prints:

[ 1  4  9 16 25]