I aim to acquire data in one process and analyze those data using two separate processes, which will run in parallel.
In the provided minimal example, the initial process generates (every 1 second) data consisting of three arrays: array1, array2, and array3. Subsequently, two additional processes analyze these arrays.
I am seeking confirmation on the correctness of this approach, particularly regarding the analysis of the data. Is it best to analyze the data here :
# This is where I would do some processing on the data
?
from multiprocessing import shared_memory, Process, Lock, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration):
for i in range(10):
with lock:
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
np_array1[:] = np.random.randint(0, 1000, np_array1.shape)
np_array2[:] = np.random.randint(0, 1000, np_array2.shape)
np_array3[:] = np.random.randint(0, 1000, np_array3.shape)
existing_shm.close()
new_value_flag1.value = 1
new_value_flag2.value = 1
iteration.value = i
time.sleep(1)
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, lock, new_value_flag1, iteration):
while True:
if new_value_flag1.value == 1:
with lock:
print('Start consumer1',time.time())
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
print(f"consumer 1, Iteration {iteration.value}:")
existing_shm.close()
new_value_flag1.value = 0
print('Stop consumer1',time.time())
# This is where I would do some processing on the data
print(np_array1.mean())
print(np_array2.mean())
print(np_array3.mean())
time.sleep(0.01)
def consumer2(n, shape1, shape2, shape3, lock, new_value_flag2, iteration):
while True:
if new_value_flag2.value == 1:
with lock:
print('Start consumer2',time.time())
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
print(f"consumer 2, Iteration {iteration.value}:")
existing_shm.close()
new_value_flag2.value = 0
print('Stop consumer2',time.time())
# This is where I would do some processing on the data
print(np_array1.mean())
print(np_array2.mean())
print(np_array3.mean())
time.sleep(0.01)
if __name__ == '__main__':
# assume we have 3 arrays of different sizes (float32)
shape1 = (2000, 20000)
shape2 = (2000, 30000)
shape3 = (2000, 40000)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
lock = Lock()
new_value_flag1 = Value('i', 0)
new_value_flag2 = Value('i', 0)
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
EDIT Following the first comment, I have a new code :
from multiprocessing import shared_memory, Process, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
if new_value_flag1.value ==0 and new_value_flag2.value == 0:
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
new_value_flag1.value = 1
new_value_flag2.value = 1
iteration.value = i
print('producer', i, time.time()-start_time)
time.sleep(0.5)
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_value_flag1, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_value_flag1.value == 1:
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_value_flag1.value = 0
time.sleep(0.01)
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_value_flag2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_value_flag2.value == 1:
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_value_flag2.value = 0
time.sleep(0.01)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (200, 200)
shape2 = (200, 300)
shape3 = (200, 400)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
new_value_flag1 = Value('i', 0)
new_value_flag2 = Value('i', 0)
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_value_flag1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_value_flag2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
EDIT 2
Without unnecessary mapping and unmapping + using Event
from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
if not new_values_event_1.is_set() and not new_values_event_2.is_set():
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
new_values_event_1.set()
new_values_event_2.set()
iteration.value = i
print('producer', i, time.time()-start_time)
time.sleep(0.5)
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_values_event_1, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_values_event_1.is_set():
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_values_event_1.clear()
time.sleep(0.01)
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_values_event_2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_values_event_2.is_set():
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_values_event_2.clear()
time.sleep(0.01)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (50, 50)
shape2 = (50, 50)
shape3 = (50, 50)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
new_values_event_1 = Event()
new_values_event_2 = Event()
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_values_event_1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_values_event_2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
EDIT 3
Without unnecessary mapping and unmapping + using Event + removing unnecessary polling
from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time
# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
event_consumer1.wait() # Wait until consumer 1 is ready
event_consumer1.clear() # Reset the event
event_consumer2.wait() # Wait until consumer 2 is ready
event_consumer2.clear() # Reset the event
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
iteration.value = i
print('producer', i, time.time()-start_time)
event_producer.set() # Signal the consumers that new data is available
time.sleep(2) # delay to simulate the time at which the data is produced
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
event_producer.wait() # Wait until the producer has produced new data
event_producer.clear() # Reset the event
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
time.sleep(0.1) # delay to simulate the time at which the data is processed
event_consumer1.set() # Signal the producer that the data has been processed
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
event_producer.wait() # Wait until the producer has produced new data
event_producer.clear() # Reset the event
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
time.sleep(0.1) # delay to simulate the time at which the data is processed
event_consumer2.set() # Signal the producer that the data has been processed)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (5000, 50)
shape2 = (5000, 50)
shape3 = (5000, 50)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
event_consumer1 = Event()
event_consumer2 = Event()
event_consumer1.set() # Set the event to allow the producer to start
event_consumer2.set() # Set the event to allow the producer to start
event_producer = Event()
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_consumer2,event_producer , iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
Looking at your EDIT 3 code I see a few problems:
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4and you then passtotal_sizeas the size argument to your call toshared_memory.SharedMemory. But this call requires this argument to be anintrather than annumpy.int32. You should not be usingnp.prodto multiply together the elements of a tuple. Instead useoperator.mul, for exampleoperator.mul(*shape1).runningthat the consumers should check to see if the producer is still producing data.event_producerEventinstance that is only used by one of your consumers, you should have one for each consumer.producerfunction is setting and waiting on events in an illogical order.I have added comments in the following code tagged with 'Booboo' so you can search for them. Also, I have the producer creating only 3 iterations so that it terminates more quickly:
Prints:
Can We Do Better?
The problem with this design is what if you have a large number of consumers, for example 10? That's a lot of events that have to be created and set. Instead we will use two
multiprocessing.Conditioninstances as follows:Prints:
Update
I have modified the above code that uses conditions instead of events so that there is greater overlap in processing among the producers and consumers. Before, the producer started to produce the next iteration only after the consumers had all finished processing the current iteration. Now the producer can do all the work necessary to create the new array values for the next iteration in parallel with the consumers consuming the current values and waits for the consumers to finish their processing before updating the shared arrays with the new data. I have increased the consumption time to 1.5 seconds. In the previous version this would have added several seconds to the total running time to process 3 iterations.