Similar questions have been asked before, but my setup is slightly different from the other questions. In the below code, when I monitor my memory usage the consumption keeps rising. For small files that is no problem, but I am processing a file of numerous gigabytes. Ultimately, I'll end up with a MemoryError.
The idea is that a separate reader process reads batches of lines into a queue. A number of workers consume this queue and put their results in another queue, which in turn is consumed by a writer process that writes these results to an output file. The workers also return values after they have finished, which are then used by the main process. This is mainly diagnostic information (e.g. no. lines processed), but it is information that i get this information back.
The work that is done here, is putting the text through spaCy's pipeline which should not have any memory leaks.
The one thing that I can think of, is that each subprocess uses its own spaCy instance (is that true?), and as such the string memory/vocabulary is specific per process. That means that the size of the whole vocabulary is basically duplicated over all subprocesses. Is that true? If so, is there a way to make use of only one 'lookup table'/Voc instance across multiple spaCy instances? If this is not the issue, do you have any other idea what may be wrong?
import logging
import multiprocessing as mp
from os import cpu_count
from spacy.util import minibatch
import spacy
import psutil
logging.basicConfig(datefmt='%d-%b %H:%M:%S',
format='%(asctime)s - [%(levelname)s]: %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler('progress.log'),
logging.StreamHandler()
])
NLP = spacy.load('en_core_web_sm', disable=['ner', 'textcat'])
N_WORKERS = cpu_count()-1 or 1
""" Process a large input file. A separate reader process puts batches of lines in a queue,
picked up by workers who in turn process these lines. They put the results in a new queue and return some
diagnostics information after finishing. The results are read by a separate writer process that writes the
results to a new file.
"""
def reader(src, work_q, batch_size=1000):
with open(src, encoding='utf-8') as fhin:
lines = (line.strip() for line in fhin)
# minibatch is a generator, and as such this approach
# should be memory-lenient
for batch_idx, batch in enumerate(minibatch(lines, batch_size), 1):
work_q.put((batch, batch_idx))
# Notify all workers that work is done
for _ in range(N_WORKERS):
work_q.put('done')
logging.info('Done reading!')
def writer(results_q):
with open('out.txt', 'w') as fhout:
while True:
# Get values from results queue; write those to a file
m = results_q.get()
if m == 'done':
logging.info('Done writing everything to file!')
break
fhout.write('\n'.join(m) + '\n')
fhout.flush()
logging.info('Done writing!')
def spacy_process(texts, results_q):
docs = list(NLP.pipe(texts))
sents = [sent.text for doc in docs for sent in doc.sents]
return sents, len(sents)
def _start_worker(work_q, results_q):
# Keep track of some values, e.g. lines processed
lines_processed = 0
while True:
m = work_q.get()
if m == 'done':
logging.info('Done reading from file!')
break
batch, batch_idx = m
result, n_lines = spacy_process(batch, results_q)
results_q.put(result)
lines_processed += n_lines
if batch_idx == 1 or batch_idx % 25 == 0:
logging.info(f"Memory usage (batch #{batch_idx:,}):"
f" {psutil.virtual_memory().percent}%")
logging.info('Workers is done working!')
return lines_processed
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('fin', help='input file.')
args = parser.parse_args()
with mp.Manager() as manager, mp.Pool(N_WORKERS+2) as pool:
logging.info(f"Started a pool with {N_WORKERS} workers")
results_queue = manager.Queue(maxsize=N_WORKERS*10)
work_queue = manager.Queue(maxsize=N_WORKERS*10)
_ = pool.apply_async(writer, (results_queue, ))
_ = pool.apply_async(reader, (args.fin, work_queue))
worker_jobs = []
for _ in range(N_WORKERS):
job = pool.apply_async(_start_worker, (work_queue, results_queue))
worker_jobs.append(job)
# When a worker has finished its job, get its information back
total_n_sentences = 0
for job in worker_jobs:
n_sentences = job.get()
total_n_sentences += n_sentences
# Notify the writer that we're done
results_queue.put('done')
logging.info(f"Finished processing {args.fin}. Processed {total_n_sentences} lines.")
I am not sure why memory usage keeps rising. I have limited the size of the queue, so it should not grow beyond the set limit. I flush the writer frequently so the buffer doesn't grow to big sizes. So what am I missing?