I have some code (this is not the full file):
chunk_list = []
def makeFakeTransactions(store_num, num_transactions):
global chunk_list
startTime = datetime.now()
data_load_datetime = startTime.isoformat()
data_load_name = "Faked Data v2.2"
data_load_path = "data was faked"
index_list = []
number_of_stores = store_num + 10
number_of_terminals = 13
for month in range(1, 13):
number_of_days = 30
extra_day_months = [1, 3, 5, 7, 8, 10, 12]
if month == 2:
number_of_days = 28
elif month in extra_day_months:
number_of_days = 31
for day in range(1, number_of_days + 1):
for store in range(store_num, number_of_stores):
operator_id = "0001"
operator_counter = 1
if store < 11:
store_number = "0000" + str(store)
else:
store_number = "000" + str(store)
for terminal in range(1, number_of_terminals + 1):
if terminal < 10:
terminal_id = str(terminal) + "000"
else:
terminal_id = str(terminal) + "00"
transaction_type = "RetailTransaction"
transaction_type_code = "Transaction"
transaction_date = date(2015, month, day)
transaction_date_str = transaction_date.isoformat()
transaction_time = time(random.randint(0, 23), random.randint(0, 59))
transaction_datetime = datetime.combine(transaction_date, transaction_time)
transaction_datetime_str = transaction_datetime.isoformat()
max_transactions = num_transactions
for transaction_number in range (0, max_transactions):
inactive_time = random.randint(80, 200)
item_count = random.randint(1, 15)
sequence_number = terminal_id + str(transaction_number)
transaction_datetime = transaction_datetime + timedelta(0, ring_time + special_time + inactive_time)
transaction_summary = {}
transaction_summary["transaction_type"] = transaction_type
transaction_summary["transaction_type_code"] = transaction_type_code
transaction_summary["store_number"] = store_number
transaction_summary["sequence_number"] = sequence_number
transaction_summary["data_load_path"] = data_load_path
index_list.append(transaction_summary.copy())
operator_counter += 10
operator_id = '{0:04d}'.format(operator_counter)
chunk_list.append(index_list)
if __name__ == '__main__':
store_num = 1
process_number = 6
num_transactions = 10
p = multiprocessing.Pool(process_number)
results = [p.apply(makeFakeTransactions, args = (store_num, num_transactions,)) for store_num in xrange(1, 30, 10)]
results = [p.apply(elasticIndexing, args = (index_list,)) for index_list in chunk_list]
I have a global variable chunk_list
that gets appended to at the end of my makeFakeTransactions
function and basically it's a list of lists. However, when I do a test print of chunk_list
after the 3 processes for makeFakeTransactions
, the chunk_list
shows up empty, even though it should've been appended to 3 times. Am I doing something wrong regarding global list variables in multiprocessing? Is there a better way to do this?
Edit: makeFakeTransactions
appends a dictionary copy to index_list
and once all the dictionaries are appended to index_list
, it appends index_list
to the global variable chunk_list
.
First, your code isn't actually running in parallel. According to the docs, p.apply will block until complete, so you are running your tasks sequentially on the process pool. You need to use p.map_async to kick off a task and not wait for it to complete.
Second, as was said in a comment, global state isn't shared between processes. You can use shared memory, but in this case it is much simpler to just transfer the result back from the worker process. Since you don't use chunk_list for anything other than collecting the result, you can just send the result back after computation and collect them on the calling process. This is easy using
multiprocessing.Pool
, you just return the result from your worker function:This will make
p.apply()
return index_list.p.apply_async()
will return anAsyncResult
that will returnindex_list
withAsyncResult.get()
. Since you're already using list comprehension, the modifications are small:You can do simplify it down to one step by using
p.map
, which effectively does what those previous two lines do. Notep.map
blocks until all results are available.Since
p.map
expects a single argument function, you need to wrap it in a lambda.