Similar questions have been asked before, but my setup is slightly different from the other questions. In the below code, when I monitor my memory usage the consumption keeps rising. For small files that is no problem, but I am processing a file of numerous gigabytes. Ultimately, I'll end up with a MemoryError.

The idea is that a separate reader process reads batches of lines into a queue. A number of workers consume this queue and put their results in another queue, which in turn is consumed by a writer process that writes these results to an output file. The workers also return values after they have finished, which are then used by the main process. This is mainly diagnostic information (e.g. no. lines processed), but it is information that i get this information back.

The work that is done here, is putting the text through spaCy's pipeline which should not have any memory leaks.

The one thing that I can think of, is that each subprocess uses its own spaCy instance (is that true?), and as such the string memory/vocabulary is specific per process. That means that the size of the whole vocabulary is basically duplicated over all subprocesses. Is that true? If so, is there a way to make use of only one 'lookup table'/Voc instance across multiple spaCy instances? If this is not the issue, do you have any other idea what may be wrong?

import logging
import multiprocessing as mp
from os import cpu_count

from spacy.util import minibatch
import spacy

import psutil

logging.basicConfig(datefmt='%d-%b %H:%M:%S',
                    format='%(asctime)s - [%(levelname)s]: %(message)s',
                    level=logging.INFO,
                    handlers=[
                        logging.FileHandler('progress.log'),
                        logging.StreamHandler()
                    ])

NLP = spacy.load('en_core_web_sm', disable=['ner', 'textcat'])
N_WORKERS = cpu_count()-1 or 1

""" Process a large input file. A separate reader process puts batches of lines in a queue,
    picked up by workers who in turn process these lines. They put the results in a new queue and return some
    diagnostics information after finishing. The results are read by a separate writer process that writes the
    results to a new file.
"""


def reader(src, work_q, batch_size=1000):
    with open(src, encoding='utf-8') as fhin:
        lines = (line.strip() for line in fhin)
        # minibatch is a generator, and as such this approach
        # should be memory-lenient
        for batch_idx, batch in enumerate(minibatch(lines, batch_size), 1):
            work_q.put((batch, batch_idx))

    # Notify all workers that work is done
    for _ in range(N_WORKERS):
        work_q.put('done')

    logging.info('Done reading!')

def writer(results_q):
    with open('out.txt', 'w') as fhout:
        while True:
            # Get values from results queue; write those to a file
            m = results_q.get()
            if m == 'done':
                logging.info('Done writing everything to file!')
                break

            fhout.write('\n'.join(m) + '\n')
            fhout.flush()

    logging.info('Done writing!')

def spacy_process(texts, results_q):
    docs = list(NLP.pipe(texts))
    sents = [sent.text for doc in docs for sent in doc.sents]

    return sents, len(sents)

def _start_worker(work_q, results_q):
    # Keep track of some values, e.g. lines processed
    lines_processed = 0
    while True:
        m = work_q.get()
        if m == 'done':
            logging.info('Done reading from file!')
            break

        batch, batch_idx = m
        result, n_lines = spacy_process(batch, results_q)
        results_q.put(result)
        lines_processed += n_lines

        if batch_idx == 1 or batch_idx % 25 == 0:
            logging.info(f"Memory usage (batch #{batch_idx:,}):"
                         f" {psutil.virtual_memory().percent}%")

    logging.info('Workers is done working!')

    return lines_processed


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('fin', help='input file.')
    args = parser.parse_args()

    with mp.Manager() as manager, mp.Pool(N_WORKERS+2) as pool:
        logging.info(f"Started a pool with {N_WORKERS} workers")
        results_queue = manager.Queue(maxsize=N_WORKERS*10)
        work_queue = manager.Queue(maxsize=N_WORKERS*10)

        _ = pool.apply_async(writer, (results_queue, ))
        _ = pool.apply_async(reader, (args.fin, work_queue))

        worker_jobs = []
        for _ in range(N_WORKERS):
            job = pool.apply_async(_start_worker, (work_queue, results_queue))
            worker_jobs.append(job)

        # When a worker has finished its job, get its information back
        total_n_sentences = 0
        for job in worker_jobs:
            n_sentences = job.get()
            total_n_sentences += n_sentences

        # Notify the writer that we're done
        results_queue.put('done')

    logging.info(f"Finished processing {args.fin}. Processed {total_n_sentences} lines.")

I am not sure why memory usage keeps rising. I have limited the size of the queue, so it should not grow beyond the set limit. I flush the writer frequently so the buffer doesn't grow to big sizes. So what am I missing?

0 Answers