Can I implement a counter for multiprocessing using pool callback?

1.9k views Asked by At

I googled a bit for how to correctly building a counter to keep track of the progress of work done. So far it seems all answers involved the use of lock and Value.

I am wondering if I can achieve it using the callback. It seems that the callback is executed in the main process, not the child process that the workers live in. Can I assume further it is executed in the same thread, thus no racing condition at all?

import time
import multiprocessing
import os

Pool = multiprocessing.Pool

def sqr(a):
    time.sleep(0.5)
    print 'local {}'.format(os.getpid())
    return a * a

pool = Pool(processes=4)


class Counter(object):
    def __init__(self):
        self.value = 0

    def incr(self, x):
        self.value += 1
        print 'count {}'.format(self.value)
        print 'callback {}'.format(os.getpid())


counter = Counter()

r = [pool.apply_async(sqr, (x,), callback=counter.incr) for x in range(10)]
pool.close()
pool.join()

local 27155local 27154local 27156


count 1
callback 27152
count 2
callback 27152
count 3
callback 27152
local 27153
count 4
callback 27152
local 27155
count 5
callback 27152
local 27156
local 27154
count 6
callback 27152
count 7
callback 27152
local 27153
count 8
callback 27152
local 27155
count 9
callback 27152
local 27156
count 10
callback 27152
main 27152
main count 10

Process finished with exit code 0

Update

Ok, it seems this link explained a bit of the mechanism behind callback.

So actually it runs on a different thread in the main process.

However, can I still implement the counter in the same way, as there is only 1 thread that modifies the counter?

2

There are 2 answers

0
bj0 On BEST ANSWER

From the SO link in @ami-tavory's comment, it seems like the callbacks might all be called on the same thread. Since this is not specified in the docs or api, though, I would not rely on it as it might change in the future or depending on implementation.

Python does not have an atomic increment (except some itertools trick that relies on the GIL), so to be sure you're threadsafe you must use a lock or some other form of synchronization. Why are you trying to avoid it? It can be used as a contextmanager which makes the code is very minimal:

from threading import Lock

class Counter(object):
    def __init__(self):
        self.value = 0
        self.lock = Lock()

    def incr(self, x):
        with self.lock:
            self.value += 1

An alternative would be to use imap_unordered, looping on the results as they become available (in the main thread) and updating your progress/counter there.

0
monkut On

Or you could count in the loop using imap_unordered as bj0 mentioned:

results = []
for count, result in enumerate(pool.imap_unordered(sqr, range(10)), 1):
    results.append(result)
    print(count)

Personally, I find it more straight forward to deal with raw results returned by imap_unordered() than the Result object returned by apply_async().