Python 3.8 - concurrent.futures.ProcessPoolExecutor performance going down in time

997 views Asked by At

I'm trying to paralelize one of my matching functions and it is working at the beginning. It is so good to see that my 72 core ec2 instance is killing it, about a minute or so it goes back to single core and iteration per second starts dropping.

import concurrent.futures as cf

results = pd.DataFrame()

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
        results = pd.concat([results, res], axis=0)

At the very beginning I see this

multicore processing

Then it goes to this

singlecore dropping

For about a minute, processing is pretty nice, than single core. While multiprocessing it is iterating about 250 per second and it goes down to 35 per second.

Any guidance is much appreciated.

EDIT - Additional Information - My original function:

def matcher(data,
            data_radial_matrice,
            data_indice,
            comparison_data,
            comparison_radial_matrice,
            distance_threshold=.1):
    

    import pandas as pd
    from sklearn.metrics.pairwise import haversine_distances
    from fuzzywuzzy import fuzz
    import numpy as np

    lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                               comparison_radial_matrice) * 3959
    lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

    lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

    lvl3 = pd.concat((lvl1, lvl2), axis=1)
    lvl3.columns = ['neigh_index', 'distance']
    lvl3.set_index('neigh_index', inplace=True)
    lvl3 = lvl3.merge(comparison_data,
                      left_index=True,
                      right_index=True,
                      how='inner')

    lvl4 = lvl3.loc[:, 'match_text'].apply(
        lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
    lvl5 = np.where(lvl4 == np.max(lvl4))
    interim_result = lvl3.iloc[lvl5]
    interim_result['match_score'] = np.max(lvl4)
    interim_result['adp_indice'] = data_indice

    return interim_result
1

There are 1 answers

0
Tolga On

The main performance bottleneck is caused by the pandas.concat process when I have changed the result collection part to np.concatenate that solved the problem. In the pandas backend, after a certain IO threshold, that slows down the whole process and kills multicore processing.

I made slight changes in my code, at the end I have returned numpy array.

def matcher2(data,
        data_radial_matrice,
        data_indice,
        comparison_data,
        comparison_radial_matrice,
        distance_threshold=.1):
'''  Haversine Distance between selected data point and comparison data points are calculated in miles
    by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''

import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np

lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                           comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
                  left_index=True,
                  right_index=True,
                  how='inner')

lvl4 = lvl3.loc[:, 'match_text'].apply(
    lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice

return np.array(interim_result)

At the end while I'm parsing results.

def dnb_matcher_helper(indice):
    return matcher2(adp, adp_rad, indice, dnb, dnb_rad)

import concurrent.futures as cf

dnb_results = np.empty(shape=(1,35))

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(dnb_matcher_helper, 
list(range(len(adp))))):
    if len(res) == 0:
        continue
    else:
        for line in res:
            line = line.reshape((1,35))
            dnb_results = np.concatenate((dnb_results, line), axis=0)