Unable to change data in nested objects using multiprocessing

87 views Asked by At

I am writing a big program as a personal project, and in it i have nested objects. For example World contains multiple environments with each enviroment containing multiple people. What i am trying to do is that i am trying to return the traits of those people from the People class to the Enviroment class to the World class and then to the main program to display said results. I am using multiprocessing to run each People object where traits actively change in it.

The code attached is a smaller version of the problem. Type1 makes a Type2 where Type2 changes its values and then Type1 values are changed based on the Type2 changes but Type1 instance doesn't detect the changes in Type2. I tried using locks, manager, values, queues and syncmanagers (all from multiprocessing) and none seemed to work.Is there something obvious i am missing? or is there a different preferred program structure to achieve what i want to do?

My expected output was for Type1 change it's specific data type values to Type2 according to the change() function in both Type1 and Type2

import multiprocessing
import time


class Type1:
    def __init__(self) -> None:
        self.array =[]
        self.dict = {}
        self.text = ""
        self.number = 0
        self.process_dict = {}
        self.type2_dict = {}
        self.num = 0
    
    def start(self):
            new_type = Type2()
            p = multiprocessing.Process(target=new_type.change)
            self.process_dict[f"type2{self.num}"] = p
            self.type2_dict[f"type2{self.num}"] = new_type
            self.num += 1
            p.start()
        
    def stop(self):
        while len(self.process_dict) > 0:
            _, process = self.process_dict.popitem()
            process.terminate()
            process.join()

    
    def change(self):
        self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data()
            
    def print(self):
        print(self.array)
        print(self.dict)
        print(self.text)
        print(self.number)
        print(self.process_dict)
        
        
        
        
class Type2:
    def __init__(self) -> None:
        self.array =[]
        self.dict = {}
        self.text = "Hello"
        self.number = 0
    
    def change(self):
        while True:
            self.array = [6,7,8,9,10]
            self.dict = {"d":4,"e":5,"f":6}
            self.text = "Goodbye"
            self.number += 1
            print("Type2 changed")
        
    def get_data(self):
        return self.array, self.dict, self.text, self.number
        
        
if __name__ == "__main__":
    t = Type1()
    t.start()
    time.sleep(2)
    print("\n\nType1 After Start:")
    t.change()
    t.print()
    t.stop()
    print("\n\nType1 After Finish:")
    t.print()
    print("Type1 stopped")

Trying to use a manager according to suggestions in the answer below and changing dict results in a none type which indicates that the shared_var is not shared properly across processes for different objects.

import multiprocessing
import time


class Type1:
    def __init__(self, array, dict, text, number) -> None:
        self.array = array
        self.dict = dict
        self.text = text
        self.number = number
        self.process_dict = {}
        self.type2_dict = {}
        self.num = 0
    
    def start(self):
            new_type = Type2(self.array, self.dict, self.text, self.number)
            p = multiprocessing.Process(target=new_type.change, args=(self.dict,))
            self.process_dict[f"type2{self.num}"] = p
            self.type2_dict[f"type2{self.num}"] = new_type
            self.num += 1
            p.start()
        
    def stop(self):
        while len(self.process_dict) > 0:
            _, process = self.process_dict.popitem()
            process.terminate()
            process.join()

    
    def change(self):
        self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data()
            
    def print(self):
        print(self.array.value)
        print(self.dict.value)
        print(self.text.value)
        print(self.number.value)
        print(self.process_dict)
        
        
        
        
class Type2:
    def __init__(self, array, dict, text, number) -> None:
        self.array = array
        self.dict = dict
        self.text = text
        self.number = number
    
    def change(self, shared_var):
        while True:
            self.array = [6,7,8,9,10]
            self.text = "Goodbye"
            self.number = 1
            self.dict = shared_var
            # increase each number in the dictionary by 1
            for key in self.dict.keys():
                self.dict[key] += 1
            # shared_var = self.dict
            # print("Type2 changed")
            time.sleep(0.01)
        
    def get_data(self):
        return self.array, self.dict, self.text, self.number
        
        
if __name__ == "__main__":
    manager = multiprocessing.Manager()
    array = manager.list([1,2,3,4,5])
    dict = manager.dict({"a":1,"b":2,"c":3})
    text = manager.Value("s", "Hello")
    number = manager.Value("i", 0)
    t = Type1(array, dict, text, number)
    t.start()
    print("\n\nType1 After Start:")
    t.change()
    t.print()
    time.sleep(2)
    t.stop()
    print("\n\nType1 After Finish:")
    t.print()
    print("Type1 stopped")
2

There are 2 answers

3
Caio Silva On

Multiprocessing processes cannot communicate directly, but with multiprocessing.Manager() you can share several manager variables in the main thread and access them within processes as long as the main thread doesn't end, so I used process2.join() in the second process to the main thread does not fall.

import multiprocessing
import time


def worker(shared_var, update_var):
    while True:
        # Simulate some work
        if update_var:
            # Send data to the main process through the queue
            current_value = shared_var.value
            new_value = current_value + 1
            shared_var.value = new_value
            time.sleep(0.01)
        else:
            current_value = shared_var.value
            print("Main Process:", current_value)
            time.sleep(1)
                        
        

if __name__ == "__main__":
    # Create a multiprocessing queue
    manager = multiprocessing.Manager()
    
    shared_var = manager.Value('i', 0)

    # Create a multiprocessing process
    process = multiprocessing.Process(target=worker, args=(shared_var, True, ))
    process.start()
    
    process2 = multiprocessing.Process(target=worker, args=(shared_var, False, ))
    process2.start()
    process2.join()

In the example, process0 updates the manager variable adding +1 every 0.01 while process2 will only notify the console of the update every 1 second, the result of this is:

Main Process: 2
Main Process: 65
Main Process: 129
Main Process: 192
Main Process: 254
Main Process: 317
Main Process: 381
...

The processes do not interrupt each other and communication is made using the main thread as a bridge.

0
Booboo On

First a general comment: In your first coding attempt you have:

dict = manager.dict({"a":1,"b":2,"c":3})

But dict is a built-in class whose definition you have just inadvertently overlaid so that if there is any code such as d = dict(x=1, y=2), it will no longer work. Do not redefine built-in functions, classes, etc.

Let's start with your second coding example. You seem to know about the multiprocessing.Manager, which is a subclass of multiprocessing.managers.BaseManager that has defined a set of managed classes that you are using, but to no effect. The problem as mentioned is that your Type2 instance is executing in a child process with a copy of the Type2 instance that your Type1 instance t is referencing. Therefore, when the Type2.change method is invoked it is updating the child process's copy of theType2 instance and the Type2 instance referenced by t remains unchanged.

YourType2 class is rather simple and Python allows us to create a new, managed class rather easily based on this class. The following code registers with Type2Manager, a subclass of multiprocessing.managers.BaseManager, a new managed typeType2 and I have simplified your example to demonstrate this. See this (scroll down and especially read the section Customized managers):

import multiprocessing
from multiprocessing.managers import BaseManager
import time


class Type2Manager(BaseManager):
    pass

class Type1:
    def __init__(self, manager: Type2Manager) -> None:
        self.manager = manager
        self.array = []
        self.dict = {}
        self.text = ""
        self.number = 0

    def change(self):
        type2 = self.manager.Type2()
        p = multiprocessing.Process(target=type2.change)
        p.start()
        p.join() # Wait for change to complete
        self.array, self.dict, self.text, self.number = type2.get_data()

    def print(self):
        print("array:", self.array)
        print("dict:", self.dict)
        print("text:", self.text)
        print("number:", self.number)

class Type2:
    def __init__(self) -> None:
        self.array = []
        self.dict = {}
        self.text = "Hello"
        self.number = 0

    def change(self):
        self.array = [6, 7, 8, 9, 10]
        self.dict = {"d": 4, "e": 5, "f": 6}
        self.text = "Goodbye"
        self.number = 1

    def get_data(self):
        return self.array, self.dict, self.text, self.number

Type2Manager.register('Type2', Type2)

if __name__ == "__main__":
    with Type2Manager() as manager:
        t = Type1(manager)
        # Before calling t.change():
        print("Type1 Before Change:")
        t.print()

        t.change()
        print("\n\nType1 After Change:")
        t.print()

Prints:

Type1 Before Change:
array: []
dict: {}
text:
number: 0


Type1 After Change:
array: [6, 7, 8, 9, 10]
dict: {'d': 4, 'e': 5, 'f': 6}
text: Goodbye
number: 1