Python, tracking using tqdm across parallel sub-tasks

355 views Asked by At

In order to control the code I am working on, I have tried to create a single tracking across many tasks that occur in different threads. I know at the beginning of the run the amount of tasks (and workers).

For demonstration (does not work, toy example):

from multiprocessing import Pool
from tqdm import tqdm

def work(i, t):
    for _ in range(10**6):
        t.update()
    return i

def wrapped_work(params):
    work(*params)

def main(n=1):
    # another loop:
    with Pool(processes=8) as p:
        with tqdm(total=n * 10**6) as t:
            return sum(p.map(work, ((i, t) for i in range(1, n+1))))

if __name__ == "__main__":
    main(5)

I tried to implies this topic with pool, but without success. I would greatly appreciate your help.

1

There are 1 answers

2
Itai Alon On BEST ANSWER

based on this post:

from multiprocessing import Pool, Process, Value
from ctypes import c_bool, c_long

from tqdm.auto import tqdm


class TqdmMultiprocessing:
    max_processes = 64

    def __init__(self, static_func, processes=64):
        self.counter = Value(c_long, lock=False)
        self.pool = Pool(
            processes=min(processes, self.max_processes),
            initializer=self.worker_init,
            initargs=(static_func, self.counter)
        )

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.pool.close()

    def tqdm(self, static_func, iterable, **kwargs):
        done_value = Value(c_bool)

        proc = Process(target=self.listener, args=(self.counter, done_value, kwargs,))
        proc.start()

        result = self.pool.map(static_func, iterable)

        done_value.value = True
        proc.join()
        self.counter.value = 0

        return result

    @staticmethod
    def listener(counter: Value, is_done: Value, kwargs):
        with tqdm(**kwargs) as tqdm_bar:
            old_counter = 0
            while not is_done.value:
                new_counter = counter.value
                tqdm_bar.update(new_counter - old_counter)
                old_counter = new_counter
            tqdm_bar.update(tqdm_bar.total - old_counter)

    @staticmethod
    def worker_init(static_func, counter: Value):
        static_func.counter = counter


def work(i):
    for _ in range(10**6):
        work.counter.value += 1
    return i


def main(n=1):
    with TqdmMultiprocessing(work, processes=3) as p:
        p.tqdm(work, range(n), total=n * 10 ** 6)
        p.tqdm(work, range(n), total=n * 10 ** 6)


if __name__ == "__main__":
    main(5)