Enable Client to Talk to Server using the NetMQ "Realiable Pub/Sub" Pattern

1.7k views Asked by At

I have a v4.0.0.1 implementation of Somdron's "Reliable Pub-Sub" pattern for communication between two parts of a new application. This application will have a "Server" (the engine that does all the heavy calculations) and "Clients" that will send requests and get information on progress back from the server.

The problem I have with my current version of "Reliable Pub-Sub" is that I don't seem to have a proper way for sending requests to the sever from the client. Let me start by showing you the code:

SERVER:

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Linq;

namespace Demo.Messaging.Server
{
    public class ZeroMqMessageServer : IDisposable
    {
        private const string WELCOME_MESSAGE = "WM";
        private const string HEARTBEAT_MESSAGE = "HB";
        private const string PUBLISH_MESSAGE_TOPIC = "PUB";
        private readonly TimeSpan HEARTBEAT_INTERVAL = TimeSpan.FromSeconds(2);

        private NetMQActor actor;
        private NetMQTimer heartbeatTimer;
        private XPublisherSocket publisher;
        private NetMQPoller poller;

        public ZeroMqMessageServer(string address)
        {
            Address = address;
            actor = NetMQActor.Create(Start);
        }

        private void Start(PairSocket shim)
        {
            using (publisher = new XPublisherSocket())
            {
                publisher.SetWelcomeMessage(WELCOME_MESSAGE);
                publisher.Bind(Address);

                //publisher.ReceiveReady -= DropPublisherSubscriptions;
                publisher.ReceiveReady += DropPublisherSubscriptions;

                heartbeatTimer = new NetMQTimer(HEARTBEAT_INTERVAL);
                heartbeatTimer.Elapsed += OnHeartbeatTimeElapsed;

                shim.ReceiveReady += OnShimReceiveReady;
                shim.SignalOK(); // Let the actor know we are ready to work. 

                poller = new NetMQPoller() { publisher, shim, heartbeatTimer };
                poller.Run();
            }
        }

        private void DropPublisherSubscriptions(object sender, NetMQSocketEventArgs e)
        {
            publisher.SkipMultipartMessage();
        }

        private void OnHeartbeatTimeElapsed(object sender, NetMQTimerEventArgs e)
        {
            publisher.SendFrame(HEARTBEAT_MESSAGE);
        }

        private void OnShimReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var socket = e.Socket;
            string command = socket.ReceiveFrameString();

            if (command == PUBLISH_MESSAGE_TOPIC)
            {
                // Forward the message to the publisher. 
                NetMQMessage message = socket.ReceiveMultipartMessage();
                publisher.SendMultipartMessage(message);
            }
            else if (command == NetMQActor.EndShimMessage)
            {
                // Dispose command received, stop the poller. 
                poller.Stop();
            }
        }

        public void PublishMessage(NetMQMessage message)
        {
            // We can use actor like NetMQSocket and publish messages.
            actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
                 .SendMultipartMessage(message);
        }

        public string Address { get; private set; }

        private bool disposedValue = false;

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    actor?.Dispose();
                    publisher?.Dispose();
                    poller?.Dispose();
                }
                disposedValue = true;
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }
    }
}

CLIENT:

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using Messaging.Helpers;

namespace Demo.Messaging.Client
{
    public class ZeroMqMessageClient : IDisposable
    {
        private string SUBSCRIBE_COMMAND = "S";
        private const string WELCOME_MESSAGE = "WM";
        private const string HEARTBEAT_MESSAGE = "HB";
        private const string PUBLISH_MESSAGE_TOPIC = "PUB";

        private readonly TimeSpan TIMEOUT = TimeSpan.FromSeconds(5);
        private readonly TimeSpan RECONNECTION_PERIOD = TimeSpan.FromSeconds(5);

        private readonly string[] addressCollection;
        private List<string> subscriptions = new List<string>();

        private NetMQTimer timeoutTimer;
        private NetMQTimer reconnectionTimer;

        private NetMQActor actor;
        private SubscriberSocket subscriber;
        private PairSocket shim;
        private NetMQPoller poller;

        public ZeroMqMessageClient(params string[] addresses)
        {
            addressCollection = addresses;
            actor = NetMQActor.Create(Start);
        }

        private void Start(PairSocket shim)
        {
            this.shim = shim;
            shim.ReceiveReady += OnShimReceiveReady;

            timeoutTimer = new NetMQTimer(TIMEOUT);
            timeoutTimer.Elapsed += OnTimeoutTimerElapsed;

            reconnectionTimer = new NetMQTimer(RECONNECTION_PERIOD);
            reconnectionTimer.Elapsed += OnReconnectionTimerElapsed;

            poller = new NetMQPoller() { shim, timeoutTimer, reconnectionTimer };
            shim.SignalOK();
            Connect();

            poller.Run();
            if (subscriber != null)
                subscriber.Dispose();
        }

        private void Connect()
        {
            using (NetMQPoller tmpPoller = new NetMQPoller())
            {
                List<SubscriberSocket> socketCollection = new List<SubscriberSocket>();
                SubscriberSocket connectedSocket = null;

                EventHandler<NetMQSocketEventArgs> messageHandler = (s, e) =>
                {
                    connectedSocket = (SubscriberSocket)e.Socket;
                    tmpPoller.Stop();
                };

                // We cancel the poller without setting the connected socket.
                NetMQTimer tmpTimeoutTimer = new NetMQTimer(TIMEOUT);
                tmpTimeoutTimer.Elapsed += (s, e) => tmpPoller.Stop();
                tmpPoller.Add(tmpTimeoutTimer);

                // Attempt to subscribe to the supplied list of addresses.
                foreach (var address in addressCollection)
                {
                    SubscriberSocket socket = new SubscriberSocket();
                    socketCollection.Add(socket);

                    //socket.ReceiveReady -= messageHandler;
                    socket.ReceiveReady += messageHandler;
                    tmpPoller.Add(socket);

                    // Subscribe to welcome messages.
                    socket.Subscribe(WELCOME_MESSAGE);
                    socket.Connect(address);
                }
                tmpPoller.Run(); // Block and wait for connection. 

                // We should have an active socket/conection. 
                if (connectedSocket != null)
                {
                    // Remove the connected socket from the collection.
                    socketCollection.Remove(connectedSocket);
                    ZeroMqHelpers.CloseConnectionsImmediately(socketCollection);

                    // Set the active socket.
                    subscriber = connectedSocket;
                    //subscriber.SkipMultipartMessage(); // This skips the welcome message.

                    // Subscribe to subscriptions.
                    subscriber.Subscribe(HEARTBEAT_MESSAGE);
                    foreach (var subscription in subscriptions)
                        subscriber.Subscribe(subscription);

                    // Remove start-up handler, now handle messages properly. 
                    subscriber.ReceiveReady -= messageHandler;
                    subscriber.ReceiveReady += OnSubscriberReceiveReady;
                    poller.Add(subscriber);

                    // Reset timers.
                    timeoutTimer.Enable = true;
                    reconnectionTimer.Enable = false;
                }
                else // We need to attempt re-connection.
                {
                    // Close all existing connections. 
                    ZeroMqHelpers.CloseConnectionsImmediately(socketCollection);
                    timeoutTimer.Enable = false;
                    reconnectionTimer.Enable = true;
                }
            }
        }

        private void OnShimReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();

            if (command == NetMQActor.EndShimMessage)
            {
                poller.Stop();
            }
            else if (command == SUBSCRIBE_COMMAND)
            {
                string topic = e.Socket.ReceiveFrameString();
                subscriptions.Add(topic);
                if (subscriber != null)
                    subscriber.Subscribe(topic);
            }
        }

        private void OnTimeoutTimerElapsed(object sender, NetMQTimerEventArgs e)
        {
            if (subscriber != null)
            {
                poller.Remove(subscriber);
                subscriber.Dispose();
                subscriber = null;
                Connect();
            }
        }

        private void OnReconnectionTimerElapsed(object sender, NetMQTimerEventArgs e)
        {
            // We re-attempt connection. 
            Connect();
        }

        private void OnSubscriberReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            // Here we just forwward the message on to the actor. 
            var message = subscriber.ReceiveMultipartMessage();
            string topic = message[0].ConvertToString();

            // Let us see what is in the message.
            if (message.Count() > 1)
            {
                string content = message[1].ConvertToString();
                Console.WriteLine($"ZMQ_ALT - {topic}:: {content}");
            }

            if (topic == WELCOME_MESSAGE)
            {
                // Disconnection has occurred we might want to restore state from a snapshot. 
            }
            else if (topic == HEARTBEAT_MESSAGE)
            {
                // We got a heartbeat, lets postponed the timer.
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }
            else
            {
                shim.SendMultipartMessage(message);
            }
        }

        public void Subscribe(string topic)
        {
            actor.SendMoreFrame(SUBSCRIBE_COMMAND).SendFrame(topic);
        }

        public NetMQMessage ReceiveMessage()
        {
            return actor.ReceiveMultipartMessage();
        }

        public void PublishMessage(NetMQMessage message)
        {
            actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
                 .SendMultipartMessage(message);
        }

        private bool disposedValue = false;

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    actor?.Dispose();
                    subscriber?.Dispose();
                    shim?.Dispose();
                    poller?.Dispose();
                }
                disposedValue = true;
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }
    }
}

Now, I can send messages from the server to the client which is awesome and the client using the following code from the main method in two separate console applications

Program.cs for SERVER:

class Program
{
    static void Main(string[] args)
    {
        using (ZeroMqMessageServer server = new ZeroMqMessageServer("tcp://127.0.0.1:6669"))
        {
            while (true)
            {
                NetMQMessage message = new NetMQMessage();
                message.Append("A");
                message.Append(new Random().Next().ToString());

                server.PublishMessage(message);
                Thread.Sleep(200);
            }
        }
    }
}

Program.cs for CLIENT:

class Program
{
    static void Main(string[] args)
    {
        Task.Run(() =>
        {
            using (ZeroMqMessageClient client = new ZeroMqMessageClient("tcp://127.0.0.1:6669"))
            {
                client.Subscribe(String.Empty);
                while (true) { }
            }
        });
        Console.ReadLine();
    }
}

The client correctly auto-detects dropped connections and reconnects, fantastic little pattern.

However, this pattern out-of-the-box does not allow the client to send messages to the server. So in the client I have added the following code

public void PublishMessage(NetMQMessage message)
{
    actor.SendMoreFrame(PUBLISH_MESSAGE_TOPIC)
          .SendMultipartMessage(message);
}

and in the client I have changed the publisher.ReceiveReady += DropPublisherSubscriptions; event handler to

    private void DropPublisherSubscriptions(object sender, NetMQSocketEventArgs e)
    {
        var message = e.Socket.ReceiveMultipartMessage();
        string topic = message[0].ConvertToString();
        Console.WriteLine($"TOPIC = {topic}");

        // Let us see what is in the message.
        if (message.Count() > 1)
        {
            string content = message[1].ConvertToString();
            Console.WriteLine($"TEST RECIEVE FROM CLIENT - {topic}:: {content}");
        }

        publisher.SkipMultipartMessage();
    }

but this does not seem to handle my messages. It receives the heartbeats and welcome messages, but I am not doing this right.

How can I enable/facilitate the client to talk to the server without breaking what I have already?

Thanks for your time.

0

There are 0 answers