Multiprocessing SharedMemory not found in second multiprocessing.Pool

31 views Asked by At

In my code, I generate data in a multiprocessing Pool and, then, I process it in another Pool. My implementation since now has been saving the data in disk and loading it after. Right now I wanted to keep my data in memory so this is the approach I took. I use Multiprocessing SharedMemory to send back and forth the BytesIO object I work on (this way, multiprocessing doesn't have to pickle the data).

from io import BytesIO
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory


def generate_data(_dummy_arg) -> str:
    data = BytesIO(bytearray(1024 * 1024 * 38))

    # prepare SharedMemory to send the bytes back to main thread
    buf = data.getbuffer()
    shared_memory = SharedMemory(create=True, size=buf.nbytes)
    shared_memory.buf[:] = buf
    shared_memory.close()
    return shared_memory.name


def process_data(data_name: str) -> str:
    # recover the data from its name, this is where the error happens
    data = SharedMemory(data_name)
    # FileNotFoundError: [Errno 2] No such file or directory: '/psm_607cb218'
    return "some_result"


datas: list[SharedMemory] = []
with Pool(5) as p:
    for data_name in p.map(generate_data, range(5)):
        # recover SharedMemory from the name
        data = SharedMemory(data_name)
        datas.append(data)

# some code

with Pool(5) as p:
    for returned in p.map(process_data, (data.name for data in datas)):
        ...

But this implementation raises an FileNotFoundError: [Errno 2] No such file or directory: '/psm_201b67a1' in the second Pool. It's like the SharedMemory object disapeard somehow. Any ideas?

Edit: It works if I reduce the size of the data to, for example, 1024 * 1024 * 10 (10MB).

2

There are 2 answers

3
Dani On

This is likely due to the fact that the SharedMemory instance is getting unlinked when its close() method is called. As far as I know this is a bug that still hasn't been fixed As a workaround, try calling the close method after processing the data

def process_data(data_name: str) -> str:
    data = SharedMemory(data_name)
    # Process the data here...
    # After processing, close the shared memory
    data.close()
    return "some_result"
3
SIGHUP On

Once the shared memory segment has been created it will be available until it is unlinked.

Closing of the shared memory does not unlink it. It is best practice to close the segment when you're done with it. To that end, it's easier to have a simple context manager to ensure that this happens automatically.

In this example, the use of the multiprocessing Pool is irrelevant but is there to align with what's being done in the OP

from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
from io import BytesIO

BUFSIZ = 30 * 1024 * 1024
SHMNAME = "CtrlZ"


class MySharedMemory(SharedMemory):
    def __init__(self, name=None, create=False, size=0):
        super().__init__(name, create, size)

    def __enter__(self):
        return self

    def __exit__(self, *_):
        self.close()


def loadshm():
    data = BytesIO(bytearray(BUFSIZ))
    with MySharedMemory(SHMNAME, True, BUFSIZ) as shm:
        shm.buf[:BUFSIZ] = data.getbuffer()


def unloadshm():
    with MySharedMemory(SHMNAME) as shm:
        print(len(bytes(shm.buf)) == BUFSIZ)


def main():
    try:
        with Pool() as pool:
            pool.apply(loadshm)
            pool.apply(unloadshm)
    finally:
        MySharedMemory(SHMNAME).unlink()


if __name__ == "__main__":
    main()

Output:

True