I have a windowed time series, and I want to regress a value for each window of the time series. I considered this process highly parallelizable so I use the following code:
def global_analysis(self, y0:np.ndarray[np.ndarray,np.ndarray], y1:np.ndarray[np.ndarray,np.ndarray],
z:np.ndarray, f:np.ndarray,
local_function,
delta=3.914e-3, window=39.14e-3):`
""" Computes some analysis between two signals in a given window over a length
:param y0 (np.array) : first signal [P,S] (used as reference)
:param y1 (np.array) : second signal [P,S]
:param z (np.ndarray) : length array [m]
:param f (np.array) : frequency array [GHz]
:param local_function (function): function which will compute some operation for each window
:optional delta (float) : distance between sensors centers [m]
:optional window (float) : length of the sensors [m]
:returns output (np.array) : array with computed signal at each window in a sequence
"""
# Delta and window from meters to number of points
dz = z[1]-z[0] # distance increment
delta = round(delta/dz)
window = round(window/dz)
# Re-define the window variable (is easier for the rest of the code)
window = round(window/2)
# Get the total number of points
n_points = y0.shape[1]
# Split the whole points into steps
steps = range(window,n_points-window+1,delta)
##### Compute analysis #####
with ThreadPoolExecutor() as executor:
output = list(executor.map(lambda i: process_window(i, y0, y1, z, window, f, local_function), steps))
executor.shutdown()
return np.array(output)
And the function to process each window is
def process_window(i, y0, y1, z, window, f, local_function):
yy0 = np.array([y0[0, i - window:i + window], y0[1, i - window:i + window]])
yy1 = np.array([y1[0, i - window:i + window], y1[1, i - window:i + window]])
zz = np.array(z[i - window:i + window])
return float(local_function(yy0, yy1, zz, f))
I'm using different local_functions to get different values for the windowed time-series. I usually use as the local_function the function "A", which consists of a series of numpy operations, and the results are good, I obtain a 10x faster execution. However, when I try to use as the local_function the function "P", which consists of a series of numpy operations PLUS a nitime (another library) multitapered cross spectral density calculations, I cannot speed up the execution, actually I get a 50% slower execution. WHY? Which are the requirements for my local_function to be parallelized properly?
I tried different parallelization functions as ProcessPoolExecutor.
I also tried to dive into the nitime library and I found that the function "tridisolve" from the utils module must be redefined in python because the cythonized version is not importable.