Unable to use dynamic classes with concurrent.futures.ProcessPoolExecutor

1.2k views Asked by At

In the code below, I am dynamically creating an object of the class inside the _py attribute by using the generate_object method.

The code works perfectly if I am not using a concurrent approach. However, if I use concurrency from concurrent.futures, I do not get the desired result because of an error saying (beyond other things):

_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed

After googling this error, I understood that only picklable objects are to be passed as parameter in ProcessPoolExecutor.map(), so I decided to see how I could turn my dynamic class to be picklable.

The problem is that all other solutions for this problem creates a dynamic object in a different manner (different from what I'm using in _string_to_object()). Examples: 1 and 2

I would very much like to keep the dynamic object creation the way it is right now because a lot of my real code is based on it, therefore I am looking for a concurrent solution that works with this toy code below.

Code

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
    
    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()

print('Multiprocessing usage')
n_cores = 3
n_calls = 3

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

with ProcessPoolExecutor(max_workers=n_cores) as executor:
    args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
    results = executor.map(concurrent_function, args)
2

There are 2 answers

5
Booboo On BEST ANSWER

I couldn't come up with a way of getting the Script classes to be created in the global name space strictly adhering to your current scheme. However:

Since for each invocation of method generate_object you are creating a new class in the local namespace and instantiating an object of that class, why not postpone that work for it to be done in the process pool? This also has the added advantage of doing this class-creation processing in parallel and there is no pickling required. We now pass to concurrent_function the two integer arguments number_1 and number_2:

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor


class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''

    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

"""
from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""


def concurrent_function(args):
    for arg in args:
        obj = A().generate_object(arg[0], arg[1])
        obj.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

Prints:

Multiprocessing usage
Numbers =  0 and 1
Numbers =  1 and 2
Numbers =  1 and 2
Numbers =  2 and 3
Numbers =  2 and 3
Numbers =  3 and 4

A More Efficient Way

There is no need to use exec. Instead use closures:

from concurrent.futures import ProcessPoolExecutor

def make_print_function(number_1, number_2):
    def print_numbers():
        print(f'Numbers = {number_1} and {number_2}')

    return print_numbers



def concurrent_function(args):
    for arg in args:
        fn = make_print_function(arg[0], arg[1])
        fn()


def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

Prints:

Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4

Using an Object Cache to Avoid Creating New Objects Unnecessarily

obj_cache = {} # each process will have its own

def concurrent_function(args):
    for arg in args:
        # was an object created with this set of arguments: (arg[0], arg[1])?
        obj = obj_cache.get(arg)
        if obj is None: # must create new object
            obj = A().generate_object(arg[0], arg[1])
            obj_cache[arg] = obj # save object for possible future use
        obj.print_numbers()
0
ihavenoidea On

Possibly I found a way to do this without the need of the exec() function. The implementation (with comments) is below.

import codecs
from concurrent.futures import ProcessPoolExecutor

class A:
    def __init__(self):
        self.py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
    def generate_text(self, number_1, number_2):
        py = self.py.format(number_1, number_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        class_code = self.generate_text(number_1, number_2)
        # Create file in disk
        with open("Script_" + str(number_1) + "_" + str(number_2) + ".py", "w") as file:
            file.write(class_code)
        # Now import it and the class will now be (correctly) seen in __main__
        package = "Script_" + str(number_1) + "_" + str(number_2)
        class_name = "Script_" + str(number_1) + "_" + str(number_2)
        # This is the programmatically version of 
        # from <package> import <class_name>
        class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
        return class_name()

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 2
    
    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
        results = executor.map(concurrent_function, args)

if __name__ == '__main__':
    main()

Basically what I'm doing is instead of dynamic allocating the class, I am writing it to a file. I'm doing this because the source of the problem I was having is that pickle was not able to correctly locate the nested class when looking at the global scope. Now I am programmatically importing the class (after saving it to file).

Of course, this solution also has the bottleneck of having to deal with files which is also costly. I did not measure whether dealing with files or exec is faster, but in my real-world case I need only one object of the synthesized class (and not one per parallel call as in the toy code provided), therefore the file option is best suited for me.

There's one problem yet: after using n_calls = 15 (for example) and executing many times, it seems like sometimes it is not able to import the module (the file just created). I tried to put a sleep() before programmatically importing it but it didn't help. This problem does not seem to happen when using a small number of calls and it also seems to happen randomly. An example of part of the error stack is shown below:

Traceback (most recent call last):
  File "main.py", line 45, in <module>
    main()
  File "main.py", line 42, in main
    results = executor.map(concurrent_function, args)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 674, in map
    results = super().map(partial(_process_chunk, fn),
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 184, in _get_chunks
    chunk = tuple(itertools.islice(it, chunksize))
  File "main.py", line 41, in <genexpr>
    args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
  File "main.py", line 26, in generate_object
    class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
ModuleNotFoundError: No module named 'Script_13_14'