I am working to update a shared dictionary synchronously using mpire package in Python in a multi-core machine (i.e., parallel processing to update a dict). The environment I am using is a Linux machine with 8 vCPU and 16 GB memory in Amazon Sagemaker. Below is a sample/dummy code snippet I am using for this. But I am unable to make it working. I know I can perhaps use Process or map methods from multiprocessing package to accomplish this task. I am just checking if there is any way I can do it using mpire package. Any help would be greatly appreciated. Thanks much!
def myFunc(shared_objects, id_val):
indata, output = shared_objects
# Temporary store for model output for an input ID
temp: Dict[str, int] = dict()
# Filter data for input ID and store output in temp variable
indata2 = indata.loc[indata['ID']==id_val]
temp = indata2.groupby(['M_CODE'])['VALUE'].sum().to_dict()
# store the result .. I want this to happen synchronously
output[id_val] = temp
#*******************************************************************
if __name__ == '__main__':
from mpire import WorkerPool
from multiprocessing import Manager
# This is just a sample data
inputData = pd.DataFrame(dict({'ID':['A', 'B', 'A', 'C', 'A'],
'M_CODE':['AKQ1', 'ALM3', 'BLC4', 'ALM4', 'BLC4'],
'VALUE':[0.75, 1, 1.75, 0.67, 3], }))
start_time = datetime.now()
print(start_time, '>> Process started.')
# Use a shared dict to store results from various workers
manager = Manager()
# dict on Manager has no lock at all!
# https://stackoverflow.com/questions/2936626/how-to-share-a-dictionary-between-multiple-processes-in-python-without-locking
output: Dict[str, Dict[str, int]] = manager.dict()
shared_objects = inputData, output
with WorkerPool(n_jobs=7, shared_objects=shared_objects) as pool:
results = pool.map_unordered(myFunc, inputData['ID'].unique(), progress_bar=True)
print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)
Below is the error I'm stuck with:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-10-df7d847398a1> in <module>
37
38 with WorkerPool(n_jobs=7, shared_objects=shared_objects) as pool:
---> 39 results = pool.map_unordered(myFunc, inputData['ID'].unique(), progress_bar=True)
40
41 print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)
/opt/conda/lib/python3.7/site-packages/mpire/pool.py in map_unordered(self, func, iterable_of_args, iterable_len, max_tasks_active, chunk_size, n_splits, worker_lifespan, progress_bar, progress_bar_position, enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout, worker_exit_timeout)
418 n_splits, worker_lifespan, progress_bar, progress_bar_position,
419 enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout,
--> 420 worker_exit_timeout))
421
422 def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], iterable_len: Optional[int] = None,
/opt/conda/lib/python3.7/site-packages/mpire/pool.py in imap_unordered(self, func, iterable_of_args, iterable_len, max_tasks_active, chunk_size, n_splits, worker_lifespan, progress_bar, progress_bar_position, enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout, worker_exit_timeout)
664 # Terminate if exception has been thrown at this point
665 if self._worker_comms.exception_thrown():
--> 666 self._handle_exception(progress_bar_handler)
667
668 # All results are in: it's clean up time
/opt/conda/lib/python3.7/site-packages/mpire/pool.py in _handle_exception(self, progress_bar_handler)
729 # Raise
730 logger.debug("Re-raising obtained exception")
--> 731 raise err(traceback_str)
732
733 def stop_and_join(self, progress_bar_handler: Optional[ProgressBarHandler] = None,
ValueError:
Exception occurred in Worker-0 with the following arguments:
Arg 0: 'A'
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 352, in _run_safely
results = func()
File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 288, in _func
_results = func(args)
File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 455, in _helper_func
return self._call_func(func, args)
File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 472, in _call_func
return func(args)
File "<ipython-input-10-df7d847398a1>", line 9, in myFunc
indata2 = indata.loc[indata['ID']==id_val]
File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/common.py", line 69, in new_method
return method(self, other)
File "/opt/conda/lib/python3.7/site-packages/pandas/core/arraylike.py", line 32, in __eq__
return self._cmp_method(other, operator.eq)
File "/opt/conda/lib/python3.7/site-packages/pandas/core/series.py", line 5502, in _cmp_method
res_values = ops.comparison_op(lvalues, rvalues, op)
File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/array_ops.py", line 262, in comparison_op
"Lengths must match to compare", lvalues.shape, rvalues.shape
ValueError: ('Lengths must match to compare', (5,), (1,))
[Update]: Here is the code I found to be working fine using only the package multiprocessing.
def myFunc(id_val, output, indata):
# Temporary store for model output for an input ID
temp: Dict[str, int] = dict()
# Filter data for input ID and store output in temp variable
indata2 = indata.loc[indata['ID']==id_val]
temp = indata2.groupby(['M_CODE'])['VALUE'].sum().to_dict()
# store the result .. I want this to happen synchronously
output[id_val] = temp
#*******************************************************************
if __name__ == '__main__':
import pandas as pd
from typing import Dict
from itertools import repeat
from multiprocessing import Manager
from datetime import datetime
# This is just a sample data
inputData = pd.DataFrame(dict({'ID':['A', 'B', 'A', 'C', 'A'],
'M_CODE':['AKQ1', 'ALM3', 'BLC4', 'ALM4', 'BLC4'],
'VALUE':[0.75, 1, 1.75, 0.67, 3], }))
start_time = datetime.now()
print(start_time, '>> Process started.')
# Use a shared dict to store results from various workers
with Manager() as manager:
# dict on Manager has no lock at all!
# https://stackoverflow.com/questions/2936626/how-to-share-a-dictionary-between-multiple-processes-in-python-without-locking
output: Dict[str, Dict[str, int]] = manager.dict()
# Start processes involving n workers
# Set chunksize to effciently handling the tasks across workers so none remains idle as much as possible
with manager.Pool(processes=7, ) as pool:
pool.starmap(myFunc,
zip(inputData['ID'].unique(), repeat(output), repeat(inputData)),
chunksize = max(inputData['ID'].nunique() // (7*4), 1))
output = dict(output)
print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)
UPDATE:
Now that I better understand the specific issue, I can say the issue lies with the relationship of
mpire.WorkerPool.map_unorderedchunking procedure with the expected inputs to thepandas.locfunction. Specifically,MyFuncgetsid_valas a singular Numpy array such asarray(['A'], dtype=object)as detailed in the chunking explanation and the source code. On the other side,indata['ID']in thelocfunction a pandas Series. One of these has to be changed for the comparison to work, but based on what your code is trying to do, theid_valcan be changed to give just its item, like:Making the new MyFunc (which on my machine gets your script to run):
The reason why this isn't an issue in your multiprocessing-only solution is because
zipis chunkinginputData['ID'].unique()the way you expect: it only gives the value, not the value wrapped in an array object. Nice job finding an alternative solution, though!The error is occurring in the function line:
indata2 = indata.loc[indata['ID']==id_valPer the main error:
File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/array_ops.py", line 262, in comparison_op "Lengths must match to compare", lvalues.shape, rvalues.shape ValueError: ('Lengths must match to compare', (5,), (1,))This is an element-wise equality match between
Series(['A', 'B', 'A', 'C', 'A']).unique()andSeries(['A', 'B', 'A', 'C', 'A']). Which will never work unless there are no repeated values in'ID'. I'm not sure what you are trying to do exactly with this statement, but that is certainly the cause of your error.