How do I free memory from Process in multiprocessing.queue?

4.4k views Asked by At

I have a program that is trying to predict email conversion for every email I send in a week (so, usually 7 sends). The output is 7 different files with the prediction scores for each customer. Running these serially can take close to 8 hours, so I have tried to parallelize them with multiprocessing. This speeds things up very well, but I've noticed that after a process finishes it seems to hold onto its memory, until there is none left and one of the processes gets killed by the system without completing its task.

I've based the following code off of the 'manual pool' example in this answer, as I need to limit the number of processes that start at once due to memory constraints. What I would like is that as one process finishes, it releases its memory to the system, freeing up space for the next worker.

Below is the code that handles concurrency:

def work_controller(in_queue, out_list):
    while True:
        key = in_queue.get()
        print key

        if key == None:
            return

        work_loop(key)
        out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    for p in processes:
        print "Joining Worker"
        p.join()

Here is the actual work code, if that is of any help:

def work_loop(key):
    with open('email_training_dict.pkl','rb') as f:
        training_dict = pickle.load(f)
    df_test = pd.DataFrame.from_csv(test_file)
    outdict = {}
    target = 'is_convert'

    df_train = train_dataframe(key)
    features = data_cleanse(df_train,df_test)

    # MAIN PREDICTION
    print 'Start time: {}'.format(datetime.datetime.now()) + '\n'

    # train/test by mailer
    X_train = df_train[features]
    X_test = df_test[features]
    y_train = df_train[target]

    # run model fit
    clf = imbalance.ImbalanceClassifier()

    clf = clf.fit(X_train, y_train)
    y_hat = clf.predict(X_test)

    outdict[key] = clf.y_vote
    print outdict[key]
    print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
    with open(output_file,'wb') as f:
        pickle.dump(outdict,f)
1

There are 1 answers

0
steve On

I'm assuming, that like the example you linked you are using the Queue.Queue() as your queue object. This is a blocking queue, which means a call to queue.get() will return an element, or wait/block until it can return an element. Try changing your work_controller function to the below:

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          return
      print key

      work_loop(key)
      out_list.append(key)

While the above solves the blocking issue it gives rise to another. At the start of the threads' life, there are no items in the in_queue, thus the threads will immediately end.

To solve this I suggest you do add a flag to indicate if it is okay to terminate.

global ok_to_end # put this flag in a global space

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          if ok_to_end: # consult the flag before ending.
              return
      print key

      work_loop(key)
      out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    ok_to_end = False # termination flag
    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    ok_to_end = True # termination flag set to True after queue is filled

    for p in processes:
        print "Joining Worker"
        p.join()