Global List not updating when using multiprocessing in python

1.4k views Asked by At

I have some code (this is not the full file):

chunk_list = []

def makeFakeTransactions(store_num, num_transactions):

    global chunk_list
    startTime = datetime.now()
    data_load_datetime = startTime.isoformat()
    data_load_name = "Faked Data v2.2"
    data_load_path = "data was faked"
    index_list = []
    number_of_stores = store_num + 10
    number_of_terminals = 13

    for month in range(1, 13):
        number_of_days = 30
        extra_day_months = [1, 3, 5, 7, 8, 10, 12]
        if month == 2:
            number_of_days = 28
        elif month in extra_day_months:
            number_of_days = 31
        for day in range(1, number_of_days + 1):
            for store in range(store_num, number_of_stores):
                operator_id = "0001"
                operator_counter = 1
                if store < 11:
                    store_number = "0000" + str(store)
                else:
                    store_number = "000" + str(store)

                for terminal in range(1, number_of_terminals + 1):
                    if terminal < 10:
                    terminal_id = str(terminal) + "000"
                else:
                    terminal_id = str(terminal) + "00"
                    transaction_type = "RetailTransaction"
                    transaction_type_code = "Transaction"
                    transaction_date = date(2015, month, day)
                    transaction_date_str = transaction_date.isoformat()
                    transaction_time = time(random.randint(0, 23), random.randint(0, 59))
                    transaction_datetime = datetime.combine(transaction_date, transaction_time)
                    transaction_datetime_str = transaction_datetime.isoformat()
                    max_transactions = num_transactions

                    for transaction_number in range (0, max_transactions):
                        inactive_time = random.randint(80, 200)
                        item_count = random.randint(1, 15)
                        sequence_number = terminal_id + str(transaction_number)
                    transaction_datetime = transaction_datetime + timedelta(0, ring_time + special_time + inactive_time)

                    transaction_summary = {} 
                    transaction_summary["transaction_type"] = transaction_type
                    transaction_summary["transaction_type_code"] = transaction_type_code
                    transaction_summary["store_number"] = store_number
                    transaction_summary["sequence_number"] = sequence_number                    
                    transaction_summary["data_load_path"] = data_load_path
                    index_list.append(transaction_summary.copy())    

                operator_counter += 10 
                operator_id = '{0:04d}'.format(operator_counter)

    chunk_list.append(index_list)

if __name__ == '__main__':
    store_num = 1
    process_number = 6
    num_transactions = 10
    p = multiprocessing.Pool(process_number)
    results = [p.apply(makeFakeTransactions, args = (store_num, num_transactions,)) for store_num in xrange(1, 30, 10)]
    results = [p.apply(elasticIndexing, args = (index_list,)) for index_list in chunk_list]

I have a global variable chunk_list that gets appended to at the end of my makeFakeTransactions function and basically it's a list of lists. However, when I do a test print of chunk_list after the 3 processes for makeFakeTransactions, the chunk_list shows up empty, even though it should've been appended to 3 times. Am I doing something wrong regarding global list variables in multiprocessing? Is there a better way to do this?

Edit: makeFakeTransactions appends a dictionary copy to index_list and once all the dictionaries are appended to index_list, it appends index_list to the global variable chunk_list.

1

There are 1 answers

2
bj0 On

First, your code isn't actually running in parallel. According to the docs, p.apply will block until complete, so you are running your tasks sequentially on the process pool. You need to use p.map_async to kick off a task and not wait for it to complete.

Second, as was said in a comment, global state isn't shared between processes. You can use shared memory, but in this case it is much simpler to just transfer the result back from the worker process. Since you don't use chunk_list for anything other than collecting the result, you can just send the result back after computation and collect them on the calling process. This is easy using multiprocessing.Pool, you just return the result from your worker function:

return index_list

This will make p.apply() return index_list. p.apply_async() will return an AsyncResult that will return index_list with AsyncResult.get(). Since you're already using list comprehension, the modifications are small:

p = multiprocessing.Pool(process_number)
async_results = [p.apply_async(makeFakeTransactions, args = (store_num, num_transactions,)) for store_num in xrange(1, 30, 10)]
results = [ar.get() for ar in async_results]

You can do simplify it down to one step by using p.map, which effectively does what those previous two lines do. Note p.map blocks until all results are available.

p = multiprocessing.Pool(process_number)
results = p.map(lambda store_num: makeFakeTransactions(store_num, num_transactions), xrange(1, 30, 10))

Since p.map expects a single argument function, you need to wrap it in a lambda.