How to pass 2d array as multiprocessing.Array to multiprocessing.Pool?

3.2k views Asked by At

My aim is to pass a parent array to mp.Pool and fill it with 2s while distributing it to different processes. This works for arrays of 1 dimension:

import numpy as np
import multiprocessing as mp
import itertools


def worker_function(i=None):
    global arr
    val = 2
    arr[i] = val
    print(arr[:])


def init_arr(arr=None):
    globals()['arr'] = arr

def main():
    arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
    mp.Pool(1, initializer=init_arr, initargs=(arr,)).starmap(worker_function, zip(range(5)))
    print(arr[:])


if __name__ == '__main__':
    main()

Output:

[2, 0, 0, 0, 0]
[2, 2, 0, 0, 0]
[2, 2, 2, 0, 0]
[2, 2, 2, 2, 0]
[2, 2, 2, 2, 2]
[2, 2, 2, 2, 2]

But how can I do the same for x-dimensional arrays? Adding a dimension to arr:

arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)

produces an error:

Traceback (most recent call last):
  File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 23, in <module>
    main()
  File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 17, in main
    arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\context.py", line 141, in Array
    ctx=self.get_context())
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 88, in Array
    obj = RawArray(typecode_or_type, size_or_initializer)
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 67, in RawArray
    result.__init__(*size_or_initializer)
TypeError: only size-1 arrays can be converted to Python scalars

Changing the dtype of arr does not help either.

1

There are 1 answers

0
Darkonaut On BEST ANSWER

You can't directly use multiprocessing.Array as a 2-d array, but in one-dimensional memory, the second dimension is just an illusion anyway :).

Luckily numpy allows reading an array from buffer and reshaping it without the need to copy it. In the demo below I just use a separate lock so we can observe the changes made step by step, there's currently no race condition for what it's doing.

import multiprocessing as mp
import numpy as np    

def worker_function(i):
    global arr, arr_lock
    val = 2
    with arr_lock:
        arr[i, :i+1] = val
        print(f"{mp.current_process().name}\n{arr[:]}")


def init_arr(arr, arr_lock=None):
    globals()['arr'] = np.frombuffer(arr, dtype='int32').reshape(5, 5)
    globals()['arr_lock'] = arr_lock


def main():
    arr = mp.Array('i', np.zeros(5 * 5, dtype='int32'), lock=False)
    arr_lock = mp.Lock()

    mp.Pool(2, initializer=init_arr, initargs=(arr, arr_lock)).map(
        worker_function, range(5)
    )

    arr = np.frombuffer(arr, dtype='int32').reshape(5, 5)
    print(f"{mp.current_process().name}\n{arr}")


if __name__ == '__main__':
    main()

Output:

ForkPoolWorker-1
[[2 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
 [2 2 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [2 2 2 2 2]]
MainProcess
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [2 2 2 2 2]]

Process finished with exit code 0