In Python, multiple processes in a process pool encountered errors while modifying shared variables generated by the Manager module

59 views Asked by At
from multiprocessing import Pool, Manager

def task(args):
    k, v, sharedDict, lock = args
    with lock:
        if k not in sharedDict:
            sharedDict[k] = {}
            sharedDict[k]['current'] = v
            print(f"sharedDict[k]['current'] = {sharedDict[k]['current']}")

def main():
    manager = Manager()
    lock = manager.Lock()
    dic = manager.dict()
    pool = Pool(processes=2)
    tasks = [('a', {'A': 1}, dic, lock), ('b', {'B': 2}, dic, lock), ('c', {'C': 3}, dic, lock), ('d', {'D': 4}, dic, lock)]
    pool.map(task, tasks)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

When I run the code above, this line throws an error:print(f"sharedDict[k]['current'] ={sharedDict[k]['current']}"),KeyError: 'current', even though I've clearly added the value to the dictionary. I hope someone can help me.

2

There are 2 answers

2
Yoshikage Kira On BEST ANSWER

From Python docs1,

If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a __setitem__ on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy

sharedDict[k] = {}
sharedDict[k]['current'] = v
print(f"sharedDict[k]['current'] = {sharedDict[k]['current']}")

sharedDict[k] is non proxy nested object in in sharedDict. Its changes won't be propagated because the proxy has no way of knowing when the values contained within are modified.

The work around is to create a temporary local dict.

def task(args):
    k, v, shared_dict, lock = args
    with lock:
        if k not in shared_dict:
            shared_dict[k] = {}
            inner_dict = shared_dict[k]
            inner_dict['current'] = v
            shared_dict[k] = inner_dict
            pprint(f"{shared_dict[k]['current'] = }")

The line shared_dict[k] = inner_dict will call the __setitem__ on the sharedDict forcing it to update.

Credit to @ken

1
tdelaney On

As mentioned, manager.dict() is a proxy object, but the assigned {} in sharedDict[k] = {} is not. It's a regular Python dictionary. Assignments made in the inner dict are not proxied. A future indexing sharedDict[k] gets another copy of the original dictionary still held by the proxy which has never seen the local modifications.

Yoshikage Kira shows a great solution - update and reassign the local copy. But you could also create proxy dicts in the worker process. Managers create a process that works as a common server for proxied objects. Your pool workers could connect to the same server and create their own managed dictionaries.

from multiprocessing import Pool, Manager, managers

def task_init(address):
    global task_manager
    task_manager = managers.SyncManager(address=address)
    task_manager.connect()
    
def task(args):
    k, v, sharedDict, lock = args
    with lock:
        if k not in sharedDict:
            sharedDict[k] = task_manager.dict()
            sharedDict[k]['current'] = v
            print(f"sharedDict[k]['current'] = {sharedDict[k]['current']}")

def main():
    manager = Manager()
    lock = manager.Lock()
    dic = manager.dict()
    pool = Pool(processes=2, initializer=task_init, initargs=[manager.address])
    tasks = [('a', {'A': 1}, dic, lock), ('b', {'B': 2}, dic, lock), ('c', {'C': 3}, dic, lock), ('d', {'D': 4}, dic, lock)]
    pool.map(task, tasks)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Either way, that seems like an awful lot of interprocess proxy calls and copying of data. And you are right! Managers can really suck resources. Wait, did I just imply managers suck? Um... If you can solve your problem without them, you may find you have a faster program.