How to run multiple inferences in parallel on CPU?

127 views Asked by At

I've got some models implemented in PyTorch, where their performance is evaluated on a custom platform (wrapper around Pytorch, keeping overall interface).

This is really slow however: testing 10k CIFAR10 takes almost 30mins on a single CPU. My cloud farm has no GPU available, but is highly CPU-oriented with load of memory available. Thus I'm thinking about spawning multiple threads/processes to parallelize these inference tests.

I know this is not as trivial with Python due to GIL and Pytorch resource model; from some research I found torch.multiprocessing.Pool.

Is it the best way? How could I deploy say N inference tasks on N CPUs, and then collect the results into an array? I wonder whether some torch.device info must be handled or is done automatically.

Something like:

for task in inference_tasks:
    p = spawn(process)
    accuracy = inference(model, p)
    ....
    #collect results
    results.append(accuracy)

Edit: the inference predictions are all independent from each other. The DataLoader could be copied and fed to each process to do the inference, then collect all the results.

1

There are 1 answers

1
Dunes On

Parallelism is pretty simple in python. The trickiness comes in how to divide up your job, and that sharing memory/state is hard and/or time consuming. Your ideal parallelisable function takes few/small inputs and returns few/small outputs. sum(range(N, M)) is pretty ideal. It take two integers as input and returns one integer. Example:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

# job to parallelise computation of sum(range(N, M))
N = 0
M = 1_000_000_000
range_ = range(N, M)

with ProcessPoolExecutor(max_workers=os.cpu_count()) as pool:
    # compute batch size
    chunk_size, remainder = divmod(len(range_), os.cpu_count())
    if remainder:
        chunk_size += 1


    # split job into roughly equal size chunks
    futures = []
    for i in range(os.cpu_count()):
       fut = pool.submit(sum, range_[i*chunk_size:(i+1)*chunk_size])
       futures.append(fut)

    # process results as and when they become ready
    total = 0
    for future in as_completed(futures):
        total += future.result()

print(f'{total=}')

max_workers=os.cpu_count() is not strictly needed, as this is the default behaviour of ProcessPoolExecutor.