Parallel jobs in for loop slowing down with each iteration

141 views Asked by At

Context: I'm trying to simulate the long-term repair of damaged bridges by checking the bridges' states (damaged or undamaged) at a number of points during a period of time. To do so, I've written a Python function called repair_over_horizon that takes (among other things) a list of damaged bridges and outputs a list of lists, each of which contains the bridges still damaged at a particular timestep. For example, if the input list were:

[100, 423, 667, 904, 221, 56, 495, 70, 109, 65]

and I used repair_over_horizon with 3 timesteps, I would expect the output to be something like:

[[100, 423, 667, 904, 221, 56, 495, 70, 109, 65], [100, 423, 904, 56, 70, 109, 65], [423, 904]].

The function repair_over_horizon actually takes multiple input lists, or "maps", at a time and computes their repairs in parallel. The parallelization is implemented for each timestep, as shown in the code below. I've also included the functions called within repair_over_horizon -- namely, compute_repairs_at_timestep and repair -- for reference.

Problem: While the function works as expected in terms of output, at each iteration (i.e. timestep), the repair takes significantly longer. This increase in time becomes apparent when the number of maps per timestep increases -- for example, in one situation where I have 50 maps per timestep and 10 timesteps, the first iteration takes ~6s, and the last iteration takes ~21s. When I have 500 maps per timestep and 5 timesteps, the first iteration takes ~36s and the fifth iteration takes ~110s.

Troubleshooting: I've profiled the code (see results for 500 maps per timestep and 5 timesteps below) and checked memory usage, but it's not apparent to me from the results where I'm going wrong in the implementation.

Question: What could be causing this massive slowdown between iterations and as the number of maps increases? Any ideas on how to fix it would be much appreciated.

UPDATE: To test whether there was a memory issue, I inserted gc.collect() at various points toward the ends of functions. However, I still found that the repair takes longer with each iteration, as stated in the Problem section above.

Partial profiling results:

Ordered by: cumulative time
ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 2500    1.102    0.000  289.855    0.116 /Library/Python/2.7/site-packages/pp.py:380(submit)
 2505  285.901    0.114  285.901    0.114 {cPickle.dumps}
 2500    0.020    0.000  127.742    0.051 /Library/Python/2.7/site-packages/pp.py:96(__call__)
 2500    0.040    0.000  127.719    0.051 /Library/Python/2.7/site-packages/pp.py:116(__unpickle)
 2500  127.675    0.051  127.675    0.051 {cPickle.loads}
    5    0.001    0.000    2.282    0.456 /Library/Python/2.7/site-packages/pp.py:280(__init__)
    5    0.001    0.000    2.279    0.456 /Library/Python/2.7/site-packages/pp.py:491(set_ncpus)
   20    0.001    0.000    2.278    0.114 /Library/Python/2.7/site-packages/pp.py:134(__init__)
   20    0.006    0.000    2.276    0.114 /Library/Python/2.7/site-packages/pp.py:140(start)
   20    0.001    0.000    1.938    0.097 /Library/Python/2.7/site-packages/pptransport.py:133(receive)
   40    1.935    0.048    1.935    0.048 {method 'read' of 'file' objects}
 2500    0.174    0.000    1.666    0.001 /Library/Python/2.7/site-packages/pp.py:656(__scheduler)
16084    1.364    0.000    1.364    0.000 {method 'acquire' of 'thread.lock' objects}
 1261    0.034    0.000    0.910    0.001 /Library/Python/2.7/site-packages/ppcommon.py:38(start_thread)
 1261    0.063    0.000    0.580    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:726(start)
 1261    0.057    0.000    0.370    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:603(wait)
   20    0.007    0.000    0.330    0.017 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:650(__init__)
   20    0.014    0.001    0.316    0.016 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:1195(_execute_child)
 1261    0.058    0.000    0.288    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:656(__init__)
 2500    0.107    0.000    0.244    0.000 /Library/Python/2.7/site-packages/pp.py:73(__init__)
60344    0.232    0.000    0.232    0.000 {isinstance}
  889    0.045    0.000    0.224    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:309(wait)
   20    0.201    0.010    0.201    0.010 {posix.fork}
 2522    0.032    0.000    0.151    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:242(Condition)
 1261    0.145    0.000    0.145    0.000 {thread.start_new_thread}
 1261    0.011    0.000    0.142    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:542(Event)
 1261    0.025    0.000    0.131    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:561(__init__)
 2522    0.113    0.000    0.119    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:260(__init__)

repair_over_horizon

def repair_over_horizon(n_gm_maps, n_damage_maps, 
bridge_array_internal_assembled, bridge_array_new_assembled, horizon = 
365, n_steps = 10, trajectories = 10, plot = False, quiet = False):

from itertools import product

t_per_timestep = []

sim_times = get_times(horizon,n_steps)

thresholds = []
for i in range(0,n_steps):
    thresholds.append(get_repair_threshold(sim_times[i])) 

ppservers = ()  # starting parallelization

for k in range(0, n_steps):
    start = time.time()
    jobs = []
    job_server = pp.Server(ppservers=ppservers)
    print "Starting pp for recovery modelling at k=", k, "with", job_server.get_ncpus(), "workers"
    for m, n, l in product(xrange(n_gm_maps), xrange(n_damage_maps), xrange(trajectories)):
        jobs.append(job_server.submit(compute_repairs_at_timestep, (m, n, k, l, bridge_array_internal_assembled,
        bridge_array_new_assembled, thresholds[k], n_steps), depfuncs = (repair,), modules=('numpy', 'random', 'copy',), group = k))
    # get the results of timestep k, since the next timestep depends on those
    for job in jobs:
        temp_dict = job()
        if temp_dict is not None:
            if k < n_steps-1:
                temp_inds = temp_dict['indices']
                mstar = temp_inds[0]
                nstar = temp_inds[1]
                kstar = temp_inds[2] + 1
                lstar = temp_inds[3]

                bridge_array_internal_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['internal'][mstar][nstar][kstar][lstar]
                bridge_array_new_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['new'][mstar][nstar][kstar][lstar]

            else:
                pass

    t_per_timestep.append(time.time()-start)
    print "Done with timestep k = ", k, " in ", time.time() - start, " seconds"

# close server
job_server.destroy()

if quiet == False:
    #print bridge_array_internal_assembled_rec
    print 'Great. You have modeled stochastic recovery of bridges and created time-dependent damage maps.'
if plot == True:
    pass

# returns the two lists of lists as a single dictionary with two keys (internal and new)
return {'internal':bridge_array_internal_assembled, 'new':bridge_array_new_assembled, 'timesteps':t_per_timestep}

compute_repairs_at_timestep

def compute_repairs_at_timestep(m, n, k, l, 
bridge_array_internal_assembled, bridge_array_new_assembled, threshold, 
n_steps, limit = None):
    # keep track of ground motion map, associated damage map, timestep, 
    and recovery trajectory
    inds = [m,n,k,l]
    # repair bridges stochastically at a single timestep
    temp_dict = repair(bridge_array_internal_assembled[m][n][k][l], 
    bridge_array_new_assembled[m][n][k][l], threshold, limit)
    # propagate repair to next timestep until timesteps are done
    if k < n_steps-1:
        bridge_array_internal_assembled[m][n][k + 1][l][:] = 
        temp_dict['internal']
        bridge_array_new_assembled[m][n][k + 1][l][:] = temp_dict['new']
        return {'indices': inds, 'internal': 
        bridge_array_internal_assembled, 'new': 
        bridge_array_new_assembled}
    else:
        return {'indices': inds, 'internal': 
        bridge_array_internal_assembled, 'new': 
        bridge_array_new_assembled}

repair

        def repair(bridge_array_internal, bridge_array_new, ext_threshold, limit=None):

        import numpy as np

        d = len(bridge_array_internal)
        if d > 0:
            temp_internal = copy.deepcopy(bridge_array_internal)
            temp_new = copy.deepcopy(bridge_array_new)
            # repair bridges stochastically per Shinozuka et al 2003 model
            temp = np.random.rand(1, d)
            temp = temp[0]
            temp = np.ndarray.tolist(temp)

            rep_inds_internal = []  # track the internal ID numbers of bridges that are repaired
            rep_inds_new = []  # track the new ID numbers of bridges that are repaired
            if limit is None:
                for i in range(0, d):
                    if temp[i] < ext_threshold:
                        rep_inds_internal.append(temp_internal[i])
                        rep_inds_new.append(temp_new[i])

                if len(rep_inds_internal) > 0:  # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
                    for i in range(0, len(rep_inds_internal)):
                        try:
                            temp_internal.remove(rep_inds_internal[i])
                            temp_new.remove(rep_inds_new[i])
                        except ValueError:
                            pass
            else:  # if limit is a number
                rep_count = 0
                for i in range(0, d):
                    if (temp[i] < ext_threshold) & (rep_count < limit):
                        rep_inds_internal.append(temp_internal[i])
                        rep_inds_new.append(temp_new[i])
                        rep_count += 1
                    elif rep_count == limit:  # if we've repaired as many bridges as we are allowed to, stop repairing bridges
                        break
                    else:
                        pass

                if len(rep_inds_internal) > 0:  # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
                    for i in range(0, len(rep_inds_internal)):
                        try:
                            temp_internal.remove(rep_inds_internal[i])
                            temp_new.remove(rep_inds_new[i])
                        except ValueError:
                            pass

            return {'internal': temp_internal,'new': temp_new}  # these damaged bridge arrays have been updated to not include repaired bridges

        elif d == 0:
            # all bridges have already been repaired
            return {'internal': [], 'new': []}
0

There are 0 answers