My needs
At present, there are several groups of data that need to perform computation-type tasks, and every time a group of data is executed, java will be notified and the generated result file will be passed to java, and I will tell Java when all tasks are executed (that is, when the last group of data is executed) that all the data in this round is executed. The java side is going to do the whole database entry operation.
My assumption
Maintain an integer value between multiple processes, increment by 1 when executing a set of data, and compare this integer value with the total number of tasks in the notification java method, equal means all finished. (This does not take into account the failure of the calculation task.)
The situation faced
I declare an integer variable through the Manager
in the multiprocessing
module, and then passed it to each process. When executing self-increment, multiple processes read the same value (see the output below for details, or perform the following demo by themselves). Unable to meet the value of the read and write atoms, I tried to lock, but it did not work.
This is my small demo
from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
from multiprocessing.managers import ValueProxy
import os
m = Manager().Value(ctypes.c_int, 0)
def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):
"""simulate the computation-type tasks"""
# simulate the calculation
res = x**y
# Lock does not work
with Lock():
# self-increment
_m.value += 1
# compare this integer value with the total number of tasks, equal means all finished.
if _m.value == total_tasks:
print(True)
print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")
def main():
# there are 8 groups of tasks
t1 = (100, 200, 300, 400, 500, 600, 700, 800)
t2 = (80, 70, 60, 50, 40, 30, 20, 10)
len_t = len(t1)
# tasks are executed by multiple processes
with ProcessPoolExecutor(max_workers=len_t) as executor:
{executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}
if __name__ == "__main__":
main()
Then the output:
m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000
m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000
m_value: 6, p_id: 14885, res: 107374182400000000000000000000
Note that the correct output should be m_value
printed as 12345678
but...
So what did I do wrong, hoping to get help here. THX.
I see two problems. Or maybe three problems
Lock()
only once and sent it to processes - this way all processes will use the same lock.m = Manager()
and laterl = m.Lock()
to create this lock.print
in lock because if you print outside then you may get value already changed by other process.Frankly, I found information in another question on
Stackoveflow
.Simply I searched
multiprocessing python lock ProcessPoolExecutor
inGoogle
.concurrency - ProcessPoolExecutor and Lock in Python - Stack Overflow