Parallel programming: Synchronizing processes

69 views Asked by At

I have a program which has a lot of music decks (deck 1, deck 2, music_clip_deck, speackers_deck, ip_call_1, ip_call_2, ip_call_3). Each deck works in a seperate process. The chunk time I use to crop the mp3 files/retransmitions stream/voice from microphone/voice from aiortc-pyav is 125msec. After that I fill some queues (one for each seperate process) and I send the final queue to the final thread for the final audio processing before hearing and transmitted to clients.

How can I synchronize all the process together, so one while run time of each process takes exactly 125 msec?

Here is one figure for help:

enter image description here

This approach may not help at all:

class Deck_1_Proc(Process):
...
...
...
    def run(self):
        while(True):
            t1 = time.time()
            ...
            ...
            ...
            t2 = time.time()
            if t2 - t1 < 0.125:
                time.sleep(0.125 - (t2 - t1))

Maybe a better approach should be use something like javascript setInterval with time parameter: 125msec

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()
1

There are 1 answers

5
Ahmed AEK On BEST ANSWER

the main issue is that most timers drift, and sleep is not accurate, not even QTimer, so a stable timer (in a sense that the 100th tick is close to 12.5 seconds) would have to do something like this.

import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            cv.notify_all()

you can easily synchronize all processes to wake up at the same time using a Condition Variable, but if one of them lags behind for a few milliseconds you probably need a multiprocessing.Value to ensure they only wait if they are not lagging behind as follows:

import threading
import time
from multiprocessing import Condition, Value, Process, Event
def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            frame.value += 1
            cv.notify_all()
            if quit_event.is_set():
                return

def worker(cv, frame_number, worker_id, quit_event: Event):
    current_frame = 0
    while True:
        with cv:
            cv.wait_for(lambda: current_frame <= frame_number.value)
            if quit_event.is_set():
                return
        print(f"processed frame {current_frame} in worker {worker_id}")
        current_frame += 1

if __name__ == "__main__":
    condition = Condition()
    frame = Value('q', lock=False)
    quit_event = Event()
    processes = []
    for i in range(4):
        process = Process(target=worker, args=(condition, frame, i, quit_event))
        process.start()
        processes.append(process)
    tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
    tr.start()
    time.sleep(5)
    quit_event.set()

Edit: added a quit_event in the form of a multiprocessing.Event, because it is an atomic.

Edit2: changed the value to be signed q with no lock, as per @Booboo comment, this saves a file descriptor and allows negative frame numbers (as -1 has its uses).