In order to parallelize the processing of some data I want to create 2 different workers. They each use a different data source. Therefore I do call them individually with a worker pool of one worker. (see Code)

I want to use a queue for each of them individually to get the input data. The very first runthrough works fine. But If I add more jobs for the worker, the worker does not process them. You can also see the list of jobs getting bigger.

What am I doing wrong here? The desired output would be a print of each job that I put in the queue.

You can check the code below which was written in Python 3.7 that should reproduce the error.

Bonus Question: If I take away the "time.sleep(0.1)" in the main section, and run the script in PowerShell, the script finishes before printing the result of the second worker. How to use .join here in order to wait for the processes to run through?

# Creates workers
def worker1(q):
    while True:
        item = q.get(True)
        print('I am Worker 1: I started working, len of the queue is : ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


def worker2(q):
    while True:
        item = q.get(True)
        print('I am Worker 2: I started working, len of the queue is :  ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


if __name__ == '__main__':

    # Configs
    n_workers = 2

    # Define Queues
    q1 = multiprocessing.Manager().Queue()
    q2 = multiprocessing.Manager().Queue()

    the_queue = [q1, q2]
    the_pool = []
    workers = [worker1, worker2]

    # Creating one worker for each Queue
    for i in range(n_workers):
        the_pool.append(multiprocessing.Pool(1, workers[i],(the_queue[i],)))

    # First Run
    for i in range(n_workers):
        the_queue[i].put("hello " + str(i))
        print('I just put in a value, len of the queue is :  ' + str(the_queue[i].qsize()))

    # Give some time before adding new tasks
    time.sleep(0.1)

    # Next Runs
    for ii in range(10):
        for i in range(n_workers):
            the_queue[i].put("hello " + str(i))
            print('I just put in a value, len of the queue is :   ' + str(the_queue[i].qsize()))

0 Answers