Download and extract .tar files with multiprocessing

483 views Asked by At

I have a file containing a bunch of large .tar.bz2 files that I want to download and extract into a folder. I am trying to speed up the process with multithreading (for downloading) and multiprocessing (for extracting the files). The downloading works fine and fairly quickly, but the extraction never even begins. This is my code:

Edit: I have tried changing the number of urls to download to 2 (by editing urls = urls[:2]), and the extract() process does start. This might be due to the fact that the virtual machine I'm running the script on has 2 vCPUs. What does this entail? Can I only run as many concurrent.futures processes as the number of available CPUs?


import concurrent.futures
import tarfile
import wget
import os

urls = []
filenames = []

with open('urls.txt', 'r') as f:
    for url in f.readlines():
        urls.append(url.rstrip())

''' Temporary '''
urls = urls[:3]
print(urls)
''' Endtemp '''

def get_file(url, process_executor):
    print(f'Downloading {url}')
    file = wget.download(url)
    print(f'Downloaded {file}')
    return process_executor.submit(extract, file)

def extract(file):
    print(f'Opening {file}')
    tar = tarfile.open(file)
    print(f'Extracting {file}')
    tar.extractall('./files')
    tar.close()
    print(f'Extracted {file}')
    os.remove(file)
    return 1

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te:
        future_url_request = [te.submit(get_file, url, pe) for url in urls]

        processes = []
        for future in concurrent.futures.as_completed(future_url_request):
            processes.append(future.result())
        
        extracted = []
        for future in concurrent.futures.as_completed(processes):
            extracted.append(future.result())
        
        if  len(extracted) == len(urls):
            print('All urls have been downloaded and extracted successfully')
    

The program works succesfully downloads all files, but it never even begins the process of extracting them (i.e. it never prints 'Opening {file}').

Any suggestions as to what the issue might be?

Thank you.

1

There are 1 answers

0
Booboo On

The problem can arise when a new process is forked when the main process has threads other than the main thread. In this case the process can hang waiting for a lock to be released. This problem does not seem to occur when using the multiprocessing package for creating a pool.

Unless the number of URLs being retrieved are more than a few hundred or you are concerned about sending too many requests to the same domain in a short period of time, I would create a multithreading pool whose size is the number of URLs being retrieved instead of taking the default. In that case, each task submitted to the multithreading pool can submit a task to the multiprocessing queue using a method that blocks until the result is returned because this blocking does not prevent other threads from submitting tasks to the multiprocessing queue. On the other hand, if your multithreading pool size is less than the number of URLs, using a blocking call to the multiprocessing pool would delay the multithreading task from finishing and being able to process another task in the multithreading task queue. That is, when the multithreading pool size is as large as the number of URLs, there will not be any tasks in the task queue waiting for a current task to complete before it can be processed and using a blocking call cannot hurt and simplifies processing.

from multiprocessing.pool import Pool, ThreadPool
import tarfile
import wget
import os
from functools import partial

def get_file(url, process_executor):
    print(f'Downloading {url}')
    file = wget.download(url)
    print(f'Downloaded {file}')
    # If we have as many threads as URLs, then
    # there is no reason not to use a blocking call:
    return process_executor.apply(extract, args=(file,))

def extract(file):
    print(f'Opening {file}', flush=True)
    tar = tarfile.open(file)
    print(f'Extracting {file}', flush=True)
    tar.extractall('./files')
    tar.close()
    print(f'Extracted {file}', flush=True)
    os.remove(file)
    return 1

if __name__ == '__main__':
    urls = []
    
    with open('urls.txt', 'r') as f:
        for url in f.readlines():
            urls.append(url.rstrip())

    # probably should create as many threads as URLs:
    with Pool() as pe, ThreadPool(len(urls)) as te:
        extracted = te.map(partial(get_file, process_executor=pe), urls)        
        if  len(extracted) == len(urls):
            print('All urls have been downloaded and extracted successfully')

If you are using fewer threads than the number of URLs, then get_file should not block on submitting a task to the multiprocessing queue and should return a multiprocessing.AsyncResult instead:

...

def get_file(url, process_executor):
    ...
    # Non-blocking:
    return process_executor.apply_async(extract, args=(file,))

...

if __name__ == '__main__':
    urls = []

    with open('urls.txt', 'r') as f:
        for url in f.readlines():
            urls.append(url.rstrip())

    with Pool() as pe, ThreadPool() as te:
        async_results = te.map(partial(get_file, process_executor=pe), urls)
        extracted = [async_result.get() for async_result in async_results]
        if  len(extracted) == len(urls):
            print('All urls have been downloaded and extracted successfully')

Notes

Downloading the files concurrently is expected to save time. What's not clear that once the files have been download whether running tar in parallel will save time or actually hurt performance. Your disk has limited bandwidth and if it is not a solid state drive, there will potentially be excess head movement (and thus performance degradation) from the parallel processing. Try running with different multiprocessing pool sizes, especially 1.

Your use of as_completed is not necessary and is not emulated in either of the above code examples because:

  1. Tasks are submitted to the multiprocessing queue as soon as get_file has downloaded its file and are thus being processed as the threaded tasks are completed anyway.
  2. Once all the tasks have been submitted to the multiprocessing pool, which occurs only after all of the tasks have submitted to the multithreading queue have completed, you just need to wait for all of the multiprocessing tasks to complete. The order in which you wait for these completions (submission order versus completion order) will have no effect on how long it will take for all of the completions to occur. If, on the other hand, the multiprocessing tasks were returning results of interest (i.e. something other than 1) and you wanted to display these results in real time as they complete, that would be a different story. But that is not what you are doing. Now extracted will have results in task submission order at no extra cost. This could matter later if you decide to have extract return something of significance.

If it is feasible to create a multithreading pool size equal to the number of URLs (the first code example), that would be preferable not just because the code is slightly simpler. What if a download fails? get_file will have nothing to submit to the multiprocessing queue yet the second code version is expecting an AsyncResult instance representing a multiprocessing task to un-tar an archive. The first code version gives you greater control if handling error situations. On a download error, get_file could, for example, return 0 instead of the result from doing the un-tar. The main process can now inspect individual return codes or filter the return codes for 1 values and get a count of the number of successful completions.