I have a program that is trying to predict email conversion for every email I send in a week (so, usually 7 sends). The output is 7 different files with the prediction scores for each customer. Running these serially can take close to 8 hours, so I have tried to parallelize them with multiprocessing
. This speeds things up very well, but I've noticed that after a process finishes it seems to hold onto its memory, until there is none left and one of the processes gets killed by the system without completing its task.
I've based the following code off of the 'manual pool' example in this answer, as I need to limit the number of processes that start at once due to memory constraints. What I would like is that as one process finishes, it releases its memory to the system, freeing up space for the next worker.
Below is the code that handles concurrency:
def work_controller(in_queue, out_list):
while True:
key = in_queue.get()
print key
if key == None:
return
work_loop(key)
out_list.append(key)
if __name__ == '__main__':
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
processes = []
for i in xrange(num_workers):
p = Process(target=work_controller, args=(work,results))
processes.append(p)
p.start()
iters = itertools.chain([key for key in training_dict.keys()])
for item in iters:
work.put(item)
for p in processes:
print "Joining Worker"
p.join()
Here is the actual work code, if that is of any help:
def work_loop(key):
with open('email_training_dict.pkl','rb') as f:
training_dict = pickle.load(f)
df_test = pd.DataFrame.from_csv(test_file)
outdict = {}
target = 'is_convert'
df_train = train_dataframe(key)
features = data_cleanse(df_train,df_test)
# MAIN PREDICTION
print 'Start time: {}'.format(datetime.datetime.now()) + '\n'
# train/test by mailer
X_train = df_train[features]
X_test = df_test[features]
y_train = df_train[target]
# run model fit
clf = imbalance.ImbalanceClassifier()
clf = clf.fit(X_train, y_train)
y_hat = clf.predict(X_test)
outdict[key] = clf.y_vote
print outdict[key]
print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
with open(output_file,'wb') as f:
pickle.dump(outdict,f)
I'm assuming, that like the example you linked you are using the Queue.Queue() as your queue object. This is a blocking queue, which means a call to
queue.get()
will return an element, or wait/block until it can return an element. Try changing yourwork_controller
function to the below:While the above solves the blocking issue it gives rise to another. At the start of the threads' life, there are no items in the in_queue, thus the threads will immediately end.
To solve this I suggest you do add a flag to indicate if it is okay to terminate.