I have a function that I am applying to hundreds of chunks of my larger dataset -- each individual job only takes a minute or two, and outputs a file if it is successful. This holds true for the first ~740 out of my ~750 individual jobs, but then the last several jobs, especially the very last one, seems to take increasingly longer, for no apparent reason (no errors/Exceptions). Originally my function was simply return a result which was being appended to a global list. I thought this might be the issue, so I changed to function to output the file, which I can access later on. However, that did not fix the lag issue. Next, I thought it might have to do with my reliance on the imap_unordered function within a context manager, which should take care of joining spawned processes and closing the pool:
with get_context("spawn").Pool(processes=processes) as p:
max_ = len(coverage_counting_job_params)
print("max_ is {}".format(max_))
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(get_edit_info_for_barcode_in_contig_wrapper, coverage_counting_job_params):
pbar.update()
total_contigs += 1
total_time = time.perf_counter() - start_time
total_seconds_for_contig[total_contigs] = total_time
After looking at some other posts here, I thought perhaps something was going on with the joining, and switched to using the async function in order to manager it more manually:
def update(result):
pbar.update()
num_files_made = len(glob("{}/*.tsv".format(coverage_processing_folder)))
#print("Made {}".format(num_files_made))
if num_files_made == max_:
print("All {} expected files are present!".format(max_))
return
for i in range(pbar.total):
pool.apply_async(
get_edit_info_for_barcode_in_contig_wrapper,
args=(coverage_counting_job_params[i],),
callback=update)
# wait for completion of all tasks:
print("Closing pool...")
pool.close()
print("Joining pool...")
pool.join()
I am using spawn because I am using polars within my function and it seems it only works with the "spawn" type of context.
However, even using apply_async still had the same issue.
My multiprocessing job should take only 50 minutes or so based on the per-job time, when using 30 cores, but instead is taking upwards of 2 hours because of this weird hanging. I am going crazy trying to figure out how to get around this issue, as the lag makes the software painfully slow for end-users.
Any ideas what might be going on?
Update:
- When I kill my original job because it is hanging, I sometimes get errors about leaked semaphores like "There appear to be 7 leaked semaphore objects to clean up at shutdown"