Multiprocessing in python where only one thread (process) may use one of a set of values at one time

105 views Asked by At

I am in the process of attempting to automate some slow processes by multithreading (multiprocessing - parallelizing) them with Python.

However I have a problem. Each running process must take a piece of data as an argument. For each piece of data, only one instance of each piece of data must be in use at any one time.

To explain this more clearly:

  • Each process connects to an API and requires the use of an API key
  • I have a fixed number of API keys
  • Two processes must not use the same API key simultaniously

I am stuck and can't figure a way around this problem. (Other than the "dumb" solution which I will explain later.)

The issue with multiprocess is that one defines a fixed number of workers which execute as part of a pool. Each worker runs a function which expects to recieve some arguments. The arguments are initialized as a list, and workers are dispatched with one entry from the list.

Imagine a list like this:

[a, b, c, d, e, f, g, h, i, ...]

and a pool of 3 workers.

When the pool first launches, the values a, b, c will be passed to three processes. There is no guarantee about how long each one will take.

It is therefore possible that process 1 finishes, and consumes data d. It is possible this process finishes again before either process 2 or 3 has finished processing data b or c.

If that happens, process 1 will consume data e.

It should now be obvious why putting the api key data into the same list as the rest of the data will not work.

In the above example, processes 1 and 2 will be processing data e and b respectively. If the api keys had been part of the list feeding the processes with data then elements b and e would contain the same api key. (presumably)

Is there a way to explicitly "pin" some data (like an api key) to each process spawned by pool.map() and thus solve this problem?

4

There are 4 answers

3
Dillon Davis On

When creating your multiprocessing.Pool, pass it an initializer and initial args which includes a multiprocess-safe Queue to pass each worker a single API key.

Something like this:

from multiprocessing import Pool, Queue

API_KEY = None  # set specific to each worker later

def set_api_key(queue):
    global API_KEY
    API_KEY = queue.get()

api_key_queue = Queue()
for api_key in api_keys:
    api_key_queue.put(api_key)

with Pool(4, set_api_key, (api_key_queue,)) as pool:
    results = pool.map(...)
0
FreelanceConsultant On

The Dumb Solution

The dumb solution is to divide the list of work into roughly equal length lists, the number of such lists being equal to the number of Api Keys.

In this particular case I am limited by a number of Api Keys, although typically one would be limited by something else, such as compute, number of CPU threads etc.

The specific code I have written will not be of much use to anyone else in general, however the general principle is shown in the code below:

# Number of simultanious workers is limited by the number of api keys
num_workers = len(api_keys)

# Split a list into `num_workers` smaller lists
data_chunks = numpy.array_split(all_data, num_workers)

# Create a list of tuples containing data and api key pairs
worker_args = [(data, api_key) for data, api_key in zip(data_chunks, api_keys)]

# Dispatch workers
with multiprocessing.Pool(num_workers) as pool:
    pool.map(multiprocess_function, worker_args)
0
Mark Setchell On

Here's a simple, but effective way of allowing any worker process to:

  • acquire one of your two API keys
  • do some work
  • return the key for other processes to use

It is implemented simply with a queue, into which you place the two API keys at the start:

#!/usr/bin/env python3

import os
import sys
from time import sleep
from multiprocessing import Manager, Queue
from multiprocessing.pool import Pool
from functools import partial

nJobs = 20
 
# Worker process
def worker(q, jobNum):
    PID = os.getpid()
    # Acquire a token
    token = q.get()
    print(f'DEBUG: pid: {PID}, got token: {token}, working on job: {jobNum}')
    # Do some work
    sleep(2)
    print(f'DEBUG: pid: {PID}, releasing token: {token}, job: {jobNum} complete')
    q.put(token)
 
################################################################################
# MAIN
################################################################################
if __name__ == '__main__':

    # Start a multiprocessing manager
    with Manager() as manager:

        # Create a queue to hold available tokens
        q = manager.Queue()
        q.put("TokenA")
        q.put("TokenB")
    
        # Create process pool of 10 processes
        with Pool(10) as pool:
            func = partial(worker, q)
            pool.map(func, list(range(nJobs)))

Each job takes 2s. So with 2 tokens available, it takes 10s and looks like this:

DEBUG: pid: 28826, got token: TokenA, working on job: 0
DEBUG: pid: 28825, got token: TokenB, working on job: 1
DEBUG: pid: 28826, releasing token: TokenA, job: 0 complete
DEBUG: pid: 28827, got token: TokenA, working on job: 2
DEBUG: pid: 28825, releasing token: TokenB, job: 1 complete
DEBUG: pid: 28829, got token: TokenB, working on job: 3
DEBUG: pid: 28827, releasing token: TokenA, job: 2 complete
DEBUG: pid: 28830, got token: TokenA, working on job: 4
DEBUG: pid: 28829, releasing token: TokenB, job: 3 complete
DEBUG: pid: 28831, got token: TokenB, working on job: 5
DEBUG: pid: 28831, releasing token: TokenB, job: 5 complete
DEBUG: pid: 28830, releasing token: TokenA, job: 4 complete
DEBUG: pid: 28828, got token: TokenB, working on job: 6
DEBUG: pid: 28833, got token: TokenA, working on job: 7
DEBUG: pid: 28833, releasing token: TokenA, job: 7 complete
DEBUG: pid: 28828, releasing token: TokenB, job: 6 complete
DEBUG: pid: 28832, got token: TokenB, working on job: 8
DEBUG: pid: 28834, got token: TokenA, working on job: 9
DEBUG: pid: 28834, releasing token: TokenA, job: 9 complete
DEBUG: pid: 28832, releasing token: TokenB, job: 8 complete

If you put 5 tokens in the queue at the start, i.e. TokenA..TokenE, that allows 5 jobs to run at once and it takes 4s, the output looks like this:

DEBUG: pid: 28866, got token: TokenA, working on job: 1
DEBUG: pid: 28865, got token: TokenB, working on job: 0
DEBUG: pid: 28868, got token: TokenC, working on job: 2
DEBUG: pid: 28870, got token: TokenD, working on job: 3
DEBUG: pid: 28869, got token: TokenE, working on job: 4
DEBUG: pid: 28865, releasing token: TokenB, job: 0 complete
DEBUG: pid: 28871, got token: TokenB, working on job: 5
DEBUG: pid: 28866, releasing token: TokenA, job: 1 complete
DEBUG: pid: 28873, got token: TokenA, working on job: 7
DEBUG: pid: 28868, releasing token: TokenC, job: 2 complete
DEBUG: pid: 28870, releasing token: TokenD, job: 3 complete
DEBUG: pid: 28869, releasing token: TokenE, job: 4 complete
DEBUG: pid: 28867, got token: TokenC, working on job: 6
DEBUG: pid: 28864, got token: TokenD, working on job: 8
DEBUG: pid: 28872, got token: TokenE, working on job: 9
DEBUG: pid: 28871, releasing token: TokenB, job: 5 complete
DEBUG: pid: 28873, releasing token: TokenA, job: 7 complete
DEBUG: pid: 28867, releasing token: TokenC, job: 6 complete
DEBUG: pid: 28864, releasing token: TokenD, job: 8 complete
DEBUG: pid: 28872, releasing token: TokenE, job: 9 complete
0
Hai Vu On

My approach is to store the API keys in an API key queue (of type multiprocessing.Queue). For each process, the first step is to call Queue.get() to acquire the next available API key. This call will block indefinitely until an API key is available and returned.

Next, the process will perform the API call and obtains the result. It will put that result in another the queue. Finally, before the process terminates, it returns the API key to the API key queue.

Code:

import logging
import multiprocessing
import random
import time

logging.basicConfig(
    level=logging.DEBUG,
    format="%(levelname)-8s | %(processName)-10s |  %(message)s",
)


def worker(api_key, data):
    """
    Make API call then return the result.
    """
    # Simulate work, which requires some time to complete
    duration = random.randint(1, 5)
    time.sleep(duration)

    # Simulate the result
    result = f"{api_key}-{data}"
    return result


def worker_shell(api_keys, data, output):
    """
    A shell on top of a worker. It manages getting the API key,
    calling the worker, and storing the result.
    """
    logging.debug("Waiting for next available API key")
    api_key = api_keys.get()
    logging.debug("Got API key %r", api_key)

    result = worker(api_key, data)
    logging.debug("Done, result is %r", result)
    output.put(result)

    logging.debug("Return API key %r to queue", api_key)
    api_keys.put(api_key)


def main():
    """Entry"""
    multiprocessing.current_process().name = "main"
    logging.info("Start")

    # Queue of API keys
    api_queue = multiprocessing.Queue()
    for i in range(3):
        api_queue.put(f"key{i}")

    # Input/output
    input_list = list("abcde")
    output_queue = multiprocessing.Queue()

    # Process all data
    processes = []
    for data in input_list:
        process = multiprocessing.Process(
            target=worker_shell,
            args=(api_queue, data, output_queue),
            name=f"worker-{data}",  # Give the process a name
        )
        process.start()
        processes.append(process)

    # Wait for all to finish
    for process in processes:
        process.join()

    logging.info("Results:")
    while not output_queue.empty():
        result = output_queue.get()
        logging.info("- %r", result)

    logging.info("End")


if __name__ == "__main__":
    main()

Sample output:

INFO     | main       |  Start
DEBUG    | worker-a   |  Waiting for next available API key
DEBUG    | worker-a   |  Got API key 'key0'
DEBUG    | worker-b   |  Waiting for next available API key
DEBUG    | worker-b   |  Got API key 'key1'
DEBUG    | worker-e   |  Waiting for next available API key
DEBUG    | worker-d   |  Waiting for next available API key
DEBUG    | worker-d   |  Got API key 'key2'
DEBUG    | worker-c   |  Waiting for next available API key
DEBUG    | worker-a   |  Done, result is 'key0-a'
DEBUG    | worker-a   |  Return API key 'key0' to queue
DEBUG    | worker-e   |  Got API key 'key0'
DEBUG    | worker-b   |  Done, result is 'key1-b'
DEBUG    | worker-b   |  Return API key 'key1' to queue
DEBUG    | worker-c   |  Got API key 'key1'
DEBUG    | worker-d   |  Done, result is 'key2-d'
DEBUG    | worker-d   |  Return API key 'key2' to queue
DEBUG    | worker-e   |  Done, result is 'key0-e'
DEBUG    | worker-e   |  Return API key 'key0' to queue
DEBUG    | worker-c   |  Done, result is 'key1-c'
DEBUG    | worker-c   |  Return API key 'key1' to queue
INFO     | main       |  Results:
INFO     | main       |  - 'key0-a'
INFO     | main       |  - 'key1-b'
INFO     | main       |  - 'key2-d'
INFO     | main       |  - 'key0-e'
INFO     | main       |  - 'key1-c'
INFO     | main       |  End

Notes

  • The worker function does the actual work of calling the API using the API key and the input data. In this case, my worker function is just a simulation.
  • The worker_shell function manages obtaining and returning the API key in addition to calling worker to get the job done
  • In the main function, I created api_queue to store the API keys
  • After that, the code is straight forward: process all the input and leave the managing of resources to worker_shell and the actual work to worker