How to wait for subscription forwarding on publisher side when connecting to XPUB-XSUB proxy?

57 views Asked by At

I'm trying to create message bus with ZeroMQ using XPUB-XSUB proxy.

Number of subscribers and publishers will vary as they come and go. Goal is that there could be number of daemons that can communicate through this message bus.

Problem is that when there already is a subscriber connected to XPUB side of the proxy and new publisher connects and immediately starts to send messages, the publisher won't get the first messages.

I assume the problem is that when publisher connects, the information about subscribers doesn't arrive immediately and first messages get discarded on the socket.

Easy but not reliable solution is to add small sleep on publisher side between connecting and sending messages.

Is there good way to wait for subscription forwarding? Or should I use some other type of sockets?

I have following example:

import threading
import time

from zmq import Context, Socket, proxy
from zmq.constants import PUB, SUB, XPUB, XPUB_VERBOSE, XSUB


def message_bus():
    context = Context.instance()

    in_socket: Socket = context.socket(XSUB)
    in_socket.bind("ipc:///tmp/in_socket.ipc")
    out_socket: Socket = context.socket(XPUB)
    out_socket.bind("ipc:///tmp/out_socket.ipc")
    out_socket.setsockopt(XPUB_VERBOSE, True)

    proxy(in_socket, out_socket)


def publisher():
    context = Context.instance()

    bus_in_socket: Socket = context.socket(PUB)
    bus_in_socket.connect("ipc:///tmp/in_socket.ipc")

    count = 1

    while True:
        bus_in_socket.send_string(f"message number {count}")
        count += 1
        time.sleep(0.5)


def subscriber():
    context = Context.instance()

    bus_out_socket: Socket = context.socket(SUB)
    bus_out_socket.connect("ipc:///tmp/out_socket.ipc")
    bus_out_socket.subscribe("")

    while True:
        print(f"subscriber {bus_out_socket.recv_multipart()}")


if __name__ == "__main__":
    message_bus_thread = threading.Thread(target=message_bus, daemon=True)
    subscriber_thread = threading.Thread(target=subscriber, daemon=True)
    publisher_thread = threading.Thread(target=publisher, daemon=True)

    message_bus_thread.start()
    subscriber_thread.start()
    time.sleep(1)
    publisher_thread.start()

    message_bus_thread.join()
    subscriber_thread.join()
    publisher_thread.join()

Output:

subscriber [b'message number 2']
subscriber [b'message number 3']
subscriber [b'message number 4']
subscriber [b'message number 5']
subscriber [b'message number 6']
subscriber [b'message number 7']
subscriber ...

As you can see first message [b'message number 1'] is not sent at all.

2

There are 2 answers

0
Afkaaja On

One solution for this is to use PUSH-PULL sockets instead of PUB-XSUB.

import threading
import time

from zmq import Context, Socket, proxy
from zmq.constants import PUB, PULL, PUSH, SUB


def message_bus():
    context = Context.instance()

    in_socket: Socket = context.socket(PULL)
    in_socket.bind("ipc:///tmp/in_socket.ipc")
    out_socket: Socket = context.socket(PUB)
    out_socket.bind("ipc:///tmp/out_socket.ipc")

    proxy(in_socket, out_socket)


def publisher():
    context = Context.instance()

    bus_in_socket: Socket = context.socket(PUSH)
    bus_in_socket.connect("ipc:///tmp/in_socket.ipc")

    count = 1

    while True:
        bus_in_socket.send_string(f"message number {count}")
        count += 1
        time.sleep(1)


def subscriber():
    context = Context.instance()

    bus_out_socket: Socket = context.socket(SUB)
    bus_out_socket.connect("ipc:///tmp/out_socket.ipc")
    bus_out_socket.subscribe("")

    while True:
        print(f"subscriber {bus_out_socket.recv_multipart()}")


if __name__ == "__main__":
    message_bus_thread = threading.Thread(target=message_bus, daemon=True)
    subscriber_thread = threading.Thread(target=subscriber, daemon=True)
    publisher_thread = threading.Thread(target=publisher, daemon=True)

    message_bus_thread.start()
    subscriber_thread.start()
    time.sleep(1)
    publisher_thread.start()

    message_bus_thread.join()
    subscriber_thread.join()
    publisher_thread.join()
0
jamesdillonharvey On

To answer the original question based on PUB/SUB

  • Change the publisher socket from PUB to XPUB
  • Set the XPUB_VERBOSE option
  • Connect to the proxy
  • Poll the publisher socket for an incoming subscription message
    • Send messages once the subscription has taken place.