Python 3.4 multiprocessing Queue faster than Pipe, unexpected

4.1k views Asked by At

I am doing an audio player that received samples from an udp socket, and everything was working fine. But when I implemented an Lost Concealment algorithm, the player failed to keep producing silence at the excepted rate (each 10ms send a list of multiple 160 bytes).

When playing audio with pyaudio, using the blocking call write to play some samples, I noticed it blocked on average for duration of the sample. So I created a new dedicated process to play the samples.

The main process processes the output stream of audio and sends the result to that process using a multiprocessing.Pipe . I decided to use the multiprocessing.Pipe because it was supposed to be faster than the other ways.

Unfortunately, when I runned the program on a virtual machine, the bitrate was half of what I was getting on my fast PC, which didnt fail to meet the target bitrate.

After some tests, I concluded that what was causing the delay was the Pipe's function send.

I did a simple benchmark script (see below) to see the differences between the various methods of transmiting to a process. The script, keeps sending a [b'\x00'*160] constantly for 5 seconds, and counts how many bytes of the bytes object were sent in total. I tested the following methods of sending: "not sending", multiprocessing.Pipe, multiprocessing.Queue, multiprocessing.Manager, multiprocessing.Listener/Client and finally, socket.socket:

Results for my "fast" PC running window 7 x64:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

Results for the VirtualBox's VM guest running Windows 7 x64, host running Windows 7 x64:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

Script used:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

Question

  • If Queue uses Pipe how is it faster than Pipe in this context? This contradicts the question Python multiprocessing - Pipe vs Queue
  • How could I garante a constant bitrate stream from on process to another, while having a low send delay?

Update 1

Inside my program, after trying out with Queues instead of Pipes. I got an enormous boost.

On my computer, using Pipes I got +- 16000 B/s , using Queues I got +-7.5 Million B/s. On the virtual machine I got from +-13000 B/s to 6.5 Million B/s. Thats about 500 times more bytes using Queue instread of Pipe.

Of course I wont be playing millions of bytes per seconds, I will only be playing the normal rate for sound. (in my case 16000 B/s, coincidence with the value above).
But the point is, I can limit the rate to what I want, while still having time to finish other computations (like receiving from sockets, applying sound algorithms, etc)

1

There are 1 answers

0
Mike Sandford On BEST ANSWER

I can't say for sure, but I think the issue you're dealing with is synchronous versus asynchronous I/O. My guess is that the Pipe is somehow ending up synchronous and the Queue is ending up asynchronous. Why exactly one is defaulting one way and the other is the other might be better answered by this question and answer:

Synchronous/Asynchronous behaviour of python Pipes