In my code, I generate data in a multiprocessing Pool and, then, I process it in another Pool. My implementation since now has been saving the data in disk and loading it after. Right now I wanted to keep my data in memory so this is the approach I took. I use Multiprocessing SharedMemory to send back and forth the BytesIO object I work on (this way, multiprocessing doesn't have to pickle the data).
from io import BytesIO
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
def generate_data(_dummy_arg) -> str:
data = BytesIO(bytearray(1024 * 1024 * 38))
# prepare SharedMemory to send the bytes back to main thread
buf = data.getbuffer()
shared_memory = SharedMemory(create=True, size=buf.nbytes)
shared_memory.buf[:] = buf
shared_memory.close()
return shared_memory.name
def process_data(data_name: str) -> str:
# recover the data from its name, this is where the error happens
data = SharedMemory(data_name)
# FileNotFoundError: [Errno 2] No such file or directory: '/psm_607cb218'
return "some_result"
datas: list[SharedMemory] = []
with Pool(5) as p:
for data_name in p.map(generate_data, range(5)):
# recover SharedMemory from the name
data = SharedMemory(data_name)
datas.append(data)
# some code
with Pool(5) as p:
for returned in p.map(process_data, (data.name for data in datas)):
...
But this implementation raises an FileNotFoundError: [Errno 2] No such file or directory: '/psm_201b67a1' in the second Pool. It's like the SharedMemory object disapeard somehow. Any ideas?
Edit:
It works if I reduce the size of the data to, for example, 1024 * 1024 * 10 (10MB).
This is likely due to the fact that the SharedMemory instance is getting unlinked when its close() method is called. As far as I know this is a bug that still hasn't been fixed As a workaround, try calling the close method after processing the data