I have a file containing a bunch of large .tar.bz2 files that I want to download and extract into a folder. I am trying to speed up the process with multithreading (for downloading) and multiprocessing (for extracting the files). The downloading works fine and fairly quickly, but the extraction never even begins. This is my code:
Edit: I have tried changing the number of urls to download to 2 (by editing urls = urls[:2]), and the extract() process does start. This might be due to the fact that the virtual machine I'm running the script on has 2 vCPUs. What does this entail? Can I only run as many concurrent.futures processes as the number of available CPUs?
import concurrent.futures
import tarfile
import wget
import os
urls = []
filenames = []
with open('urls.txt', 'r') as f:
for url in f.readlines():
urls.append(url.rstrip())
''' Temporary '''
urls = urls[:3]
print(urls)
''' Endtemp '''
def get_file(url, process_executor):
print(f'Downloading {url}')
file = wget.download(url)
print(f'Downloaded {file}')
return process_executor.submit(extract, file)
def extract(file):
print(f'Opening {file}')
tar = tarfile.open(file)
print(f'Extracting {file}')
tar.extractall('./files')
tar.close()
print(f'Extracted {file}')
os.remove(file)
return 1
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te:
future_url_request = [te.submit(get_file, url, pe) for url in urls]
processes = []
for future in concurrent.futures.as_completed(future_url_request):
processes.append(future.result())
extracted = []
for future in concurrent.futures.as_completed(processes):
extracted.append(future.result())
if len(extracted) == len(urls):
print('All urls have been downloaded and extracted successfully')
The program works succesfully downloads all files, but it never even begins the process of extracting them (i.e. it never prints 'Opening {file}').
Any suggestions as to what the issue might be?
Thank you.
The problem can arise when a new process is forked when the main process has threads other than the main thread. In this case the process can hang waiting for a lock to be released. This problem does not seem to occur when using the
multiprocessingpackage for creating a pool.Unless the number of URLs being retrieved are more than a few hundred or you are concerned about sending too many requests to the same domain in a short period of time, I would create a multithreading pool whose size is the number of URLs being retrieved instead of taking the default. In that case, each task submitted to the multithreading pool can submit a task to the multiprocessing queue using a method that blocks until the result is returned because this blocking does not prevent other threads from submitting tasks to the multiprocessing queue. On the other hand, if your multithreading pool size is less than the number of URLs, using a blocking call to the multiprocessing pool would delay the multithreading task from finishing and being able to process another task in the multithreading task queue. That is, when the multithreading pool size is as large as the number of URLs, there will not be any tasks in the task queue waiting for a current task to complete before it can be processed and using a blocking call cannot hurt and simplifies processing.
If you are using fewer threads than the number of URLs, then
get_fileshould not block on submitting a task to the multiprocessing queue and should return amultiprocessing.AsyncResultinstead:Notes
Downloading the files concurrently is expected to save time. What's not clear that once the files have been download whether running tar in parallel will save time or actually hurt performance. Your disk has limited bandwidth and if it is not a solid state drive, there will potentially be excess head movement (and thus performance degradation) from the parallel processing. Try running with different multiprocessing pool sizes, especially 1.
Your use of
as_completedis not necessary and is not emulated in either of the above code examples because:get_filehas downloaded its file and are thus being processed as the threaded tasks are completed anyway.1) and you wanted to display these results in real time as they complete, that would be a different story. But that is not what you are doing. Nowextractedwill have results in task submission order at no extra cost. This could matter later if you decide to haveextractreturn something of significance.If it is feasible to create a multithreading pool size equal to the number of URLs (the first code example), that would be preferable not just because the code is slightly simpler. What if a download fails?
get_filewill have nothing to submit to the multiprocessing queue yet the second code version is expecting anAsyncResultinstance representing a multiprocessing task to un-tar an archive. The first code version gives you greater control if handling error situations. On a download error,get_filecould, for example, return 0 instead of the result from doing the un-tar. The main process can now inspect individual return codes or filter the return codes for 1 values and get a count of the number of successful completions.