How to use multiprocessing in python without duplicating large read-only dictionary

673 views Asked by At

I have a Look Up Table LUT which is a very large dictionary (24G). And I have millions of inputs to perform query on it.

I want to split the millions of inputs across 32 jobs, and run them in parallel. Due to the space contraint, I cannot run multiple python scripts, because that will result in memory overload.

I want to use the multiprocessing module to only load the LUT just once, and then have different processes look it up, while sharing it as a global variable, without having to duplicate it.

However when I look at the htop, it seems each subprocess are re-creating the LUT? I made this claim because under the VIRT, RES, SHR. The numbers are very high. But at the same time I dont see the additional memory used in the Mem row, it increased from 11Gb to 12.3G and just hovers there.

So im confused, is it, or is it not re-creating the LUT within each sub process ? How should i proceed to make sure i am running parallel works, without duplicating LUT in each subprocess ? Code is shown below the picture.

enter image description here (In this experiment I'm only using 1Gb of LUT so, dont worry about it not being 24Gb)

import os, sys, time, pprint, pdb, datetime
import threading, multiprocessing

## Print the process/thread details
def getDetails(idx):
    pid = os.getpid()
    threadName = threading.current_thread().name
    processName = multiprocessing.current_process().name
    print(f"{idx})\tpid={pid}\tprocessName={processName}\tthreadName={threadName} ")
    return pid, threadName, processName

def ComplexAlgorithm(value):
    # Instead of just lookup like this
    # the real algorithm is some complex algorithm that performs some search
    return value in LUT

## Querying the 24Gb LUT from my millions of lines of input
def PerformMatching(idx, NumberOfLines):
    pid, threadName, processName = getDetails(idx)
    NumberMatches = 0
    for _ in range(NumberOfLines):
        # I will actually read the contents from my file live, 
        # but here just assume i generate random numbers
        value = random.randint(-100, 100)
        if ComplexAlgorithm(value): NumberMatches += 1
    print(f"\t{idx}) | LUT={len(LUT)} | NumberMatches={NumberMatches} | done")

if __name__ == "__main__":

    ## Init
    num_processes = 9
    # this is just a pseudo-call to show you the structure of my LUT, the real one is larger
    LUT = (dict(i,set([i])) for i in range(1000))

    ## Store the multiple filenames
    ListOfLists = []
    for idx in range(num_processes):
        NumberOfLines = 10000
        ListOfLists.append( NumberOfLines )

    ## Init the processes
    ProcessList = []
    for processIndex in range(num_processes):
        ProcessList.append( 
            multiprocessing.Process(
                target=PerformMatching, 
                args=(processIndex, ListOfLists[processIndex])
            )
        )
        ProcessList[processIndex].start()

    ## Wait until the process terminates.
    for processIndex in range(num_processes):
        ProcessList[processIndex].join()

    ## Done
1

There are 1 answers

0
Booboo On

If you want to go the route of using a multiprocessing.Manager, this is how you could do it. The trade-off is that the dictionary is represented by a reference to a proxy for the actual dictionary that exists in a different address space and consequently every dictionary reference results in the equivalent of a remote procedure call. In other words, access is much slower compared with a "regular" dictionary.

In the demo program below, I have only defined a couple of methods for my managed dictionary, but you can define whatever you need. I have also used a multiprocessing pool instead of explicitly starting individual processes; you might consider doing likewise.

from multiprocessing.managers import BaseManager, BaseProxy
from multiprocessing import Pool
from functools import partial

def worker(LUT, key):
    return LUT[key]


class MyDict:
    def __init__(self):
        """ initialize the dictionary """
        # the very large dictionary reduced for demo purposes:
        self._dict = {i: i for i in range(100)}

    def get(self, obj, default=None):
        """ delegates to underlying dict """
        return self._dict.get(obj, default)

    def __getitem__(self, obj):
        """ delegates to underlying dict """
        return self._dict[obj]

class MyDictManager(BaseManager):
    pass

class MyDictProxy(BaseProxy):
    _exposed_ = ('get', '__getitem__')

    def get(self, *args, **kwargs):
        return self._callmethod('get', args, kwargs)

    def __getitem__(self, *args, **kwargs):
        return self._callmethod('__getitem__', args, kwargs)


def main():
    MyDictManager.register('MyDict', MyDict, MyDictProxy)
    with MyDictManager() as manager:
        my_dict = manager.MyDict()
        pool = Pool()
        # pass proxy instead of actual LUT:
        results = pool.map(partial(worker, my_dict), range(100))
        print(sum(results))

if __name__ == '__main__':
    main()

Prints:

4950

Discussion

Python comes with a managed dict class built in obtainable with multiprocessing.Manager().dict(). But initializing such a large number of entries with such a dictionary would be very inefficient based on my prior comment that each access would be relatively expensive. It seemed to me that it would be less expensive to create our own managed class that had an underlying "regular" dictionary that could be initialized directly when the managed class is constructed and not via the proxy reference. And while it is true that the managed dict that comes with Python can be instantiated with an already built dictionary, which avoids that inefficiency problem, my concern is that memory efficiency would suffer because you would have two instances of the dictionary, i.e. the "regular" dictionary and the "managed" dictionary.