I'm trying to paralelize one of my matching functions and it is working at the beginning. It is so good to see that my 72 core ec2 instance is killing it, about a minute or so it goes back to single core and iteration per second starts dropping.
import concurrent.futures as cf
results = pd.DataFrame()
with cf.ProcessPoolExecutor() as executor:
for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
results = pd.concat([results, res], axis=0)
At the very beginning I see this
Then it goes to this
For about a minute, processing is pretty nice, than single core. While multiprocessing it is iterating about 250 per second and it goes down to 35 per second.
Any guidance is much appreciated.
EDIT - Additional Information - My original function:
def matcher(data,
data_radial_matrice,
data_indice,
comparison_data,
comparison_radial_matrice,
distance_threshold=.1):
import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np
lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])
lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]
lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
left_index=True,
right_index=True,
how='inner')
lvl4 = lvl3.loc[:, 'match_text'].apply(
lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice
return interim_result
The main performance bottleneck is caused by the
pandas.concat
process when I have changed the result collection part tonp.concatenate
that solved the problem. In the pandas backend, after a certain IO threshold, that slows down the whole process and kills multicore processing.I made slight changes in my code, at the end I have returned numpy array.
At the end while I'm parsing results.