How to guarantee the value in multiple processes is increased or decreased correctly in python?

41 views Asked by At

My needs

At present, there are several groups of data that need to perform computation-type tasks, and every time a group of data is executed, java will be notified and the generated result file will be passed to java, and I will tell Java when all tasks are executed (that is, when the last group of data is executed) that all the data in this round is executed. The java side is going to do the whole database entry operation.

My assumption

Maintain an integer value between multiple processes, increment by 1 when executing a set of data, and compare this integer value with the total number of tasks in the notification java method, equal means all finished. (This does not take into account the failure of the calculation task.)

The situation faced

I declare an integer variable through the Manager in the multiprocessing module, and then passed it to each process. When executing self-increment, multiple processes read the same value (see the output below for details, or perform the following demo by themselves). Unable to meet the value of the read and write atoms, I tried to lock, but it did not work.

This is my small demo

from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
from multiprocessing.managers import ValueProxy
import os


m = Manager().Value(ctypes.c_int, 0)


def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):
    """simulate the computation-type tasks"""

    # simulate the calculation
    res = x**y

    # Lock does not work
    with Lock():
        # self-increment
        _m.value += 1

    # compare this integer value with the total number of tasks, equal means all finished.
    if _m.value == total_tasks:
        print(True)

    print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")


def main():
    # there are 8 groups of tasks
    t1 = (100, 200, 300, 400, 500, 600, 700, 800)
    t2 = (80, 70, 60, 50, 40, 30, 20, 10)

    len_t = len(t1)

    # tasks are executed by multiple processes
    with ProcessPoolExecutor(max_workers=len_t) as executor:
        {executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}


if __name__ == "__main__":
    main()

Then the output:

m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000
m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000
m_value: 6, p_id: 14885, res: 107374182400000000000000000000

Note that the correct output should be m_value printed as 12345678 but...

So what did I do wrong, hoping to get help here. THX.

1

There are 1 answers

1
furas On BEST ANSWER

I see two problems. Or maybe three problems

  1. You have to create Lock() only once and sent it to processes - this way all processes will use the same lock.
  2. You have to use m = Manager() and later l = m.Lock() to create this lock.
  3. you have to print in lock because if you print outside then you may get value already changed by other process.
from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager
from multiprocessing.managers import ValueProxy
import os


def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int, l): # get lock
    """simulate the computation-type tasks"""

    # simulate the calculation
    res = x**y

    with l:
        # self-increment
        _m.value += 1

        
        # compare this integer value with the total number of tasks, equal means all finished.
        if _m.value == total_tasks:
            print(True)

        # print in lock
        print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")


def main():

    m = Manager()                 # first create manager
    v = m.Value(ctypes.c_int, 0)  # next use it to create value
    l = m.Lock()                  # and use the same manager to create lock

    # there are 8 groups of tasks
    t1 = (100, 200, 300, 400, 500, 600, 700, 800)
    t2 = (80, 70, 60, 50, 40, 30, 20, 10)

    len_t = len(t1)

    # tasks are executed by multiple processes
    with ProcessPoolExecutor(max_workers=len_t) as executor:
        {executor.submit(calc_number, x, y, v, len_t, l) for x, y in zip(t1, t2)}  # send lock to processes


if __name__ == "__main__":
    main()

Frankly, I found information in another question on Stackoveflow.
Simply I searched multiprocessing python lock ProcessPoolExecutor in Google.

concurrency - ProcessPoolExecutor and Lock in Python - Stack Overflow