ZeroMQ pattern for load balancing work across workers based on idleness

9.1k views Asked by At

I have a single producer and n workers that I only want to give work to when they're not already processing a unit of work and I'm struggling to find a good zeroMQ pattern.

1) REQ/REP

The producer is the requestor and creates a connection to each worker. It tracks which worker is busy and round-robins to idle workers

Problem:

  • How to be notified of responses and still able to send new work to idle workers without dedicating a thread in the producer to each worker?

2) PUSH/PULL

Producer pushes into one socket that all workers feed off, and workers push into another socket that the producer listens to.

Problem:

  • Has no concept of worker idleness, i.e. work gets stuck behind long units of work

3) PUB/SUB

Non-starter, since there is no way to make sure work doesn't get lost

4) Reverse REQ/REP

Each worker is the REQ end and requests work from the producer and then sends another request when it completes the work

Problem:

  • Producer has to block on a request for work until there is work (since each recv has to be paired with a send ). This prevents workers to respond with work completion
  • Could be fixed with a separate completion channel, but the producer still needs some polling mechanism to detect new work and stay on the same thread.

5) PAIR per worker

Each worker has its own PAIR connection allowing independent sending of work and receipt of results

Problem:

  • Same problem as REQ/REP with requiring a thread per worker

As much as zeroMQ is non-blocking/async under the hood, I cannot find a pattern that allows my code to be asynchronous as well, rather than blocking in many many dedicated threads or polling spin-loops in fewer. Is this just not a good use case for zeroMQ?

2

There are 2 answers

2
John Jefferies On BEST ANSWER

Your problem is solved with the Load Balancing Pattern in the ZMQ Guide. It's all about flow control whilst also being able to send and receive messages. The producer will only send work requests to idle workers, whilst the workers are able to send and receive other messages at all times, e.g. abort, shutdown, etc.

5
bazza On

Push/Pull is your answer.

When you send a message in ZeroMQ, all that happens initially is that it sits in a queue waiting to be delivered to the destination(s). When it has been successfully transferred it is removed from the queue. The queue is limited in length, but can be set by changing a socket's high water mark.

There is a/some background thread(s) that manage all this on your behalf, and your calls to the ZeroMQ API are simply issuing instructions to that/those threads. The threads at either end of a socket connection are collaborating to marshall the transfer of messages, i.e. a sender won't send a message unless the recipient can receive it.

Consider what this means in a push/pull set up. Suppose one of your pull workers is falling behind. It won't then be accepting messages. That means that messages being sent to it start piling up until the highwater mark is reached. ZeroMQ will no longer send messages to that pull worker. In fact AFAIK in ZeroMQ, a pull worker whose queue is more full than those of its peers will receive less messages, so the workload is evened out across all workers.

So What Does That Mean?

Just send the messages. Let 0MQ sort it out for you.

Whilst there's no explicit flag saying 'already busy', if messages can be sent at all then that means that some pull worker somewhere is able to receive it solely because it has kept up with the workload. It will therefore be best placed to process new messages.

There are limitations. If all the workers are full up then no messages are sent and you get blocked in the push when it tries to send another message. You can discover this only (it seems) by timing how long the zmq_send() took.

Don't Forget the Network

There's also the matter of network bandwidth to consider. Messages queued in the push will tranfer at the rate at which they're consumed by the recipients, or at the speed of the network (whichever is slower). If your network is fundamentally too slow, then it's the Wrong Network for the job.

Latency

Of course, messages piling up in buffers represents latency. This can be restricted by setting the high water mark to be quite low.

This won't cure a high latency problem, but it will allow you to find out that you have one. If you have an inadequate number of pull workers, a low high water mark will result in message sending failing/blocking sooner.

Actually I think in ZeroMQ it blocks for push/pull; you'd have to measure elapsed time in the call to zmq_send() to discover whether things had got bottled up.

Thought about Nanomsg?

Nanomsg is a reboot of ZeroMQ, one of the same guys is involved. There's many things I prefer about it, and ultimately I think it will replace ZeroMQ. It has some fancier patterns which are more universally usable (PAIR works on all transports, unlike in ZeroMQ). Also the patterns are essentially a plugable component in the source code, so it is far simpler for patterns to be developed and integrated than in ZeroMQ. There is a discussion on the differences here

Philisophical Discussion

Actor Model

ZeroMQ is definitely in the realms of Actor Model programming. Messages get stuffed into queues / channels / sockets, and at some undetermined point in time later they emerge at the recipient end to be processed.

The danger of this type of architecture is that it is possible to have the potential for deadlock without knowing it.

Suppose you have a system where messages pass both ways down a chain of processes, say instructions in one way and results in the other. It is possible that one of the processes will be trying to send a message whilst the recipient is actually also trying to send a message back to it.

That only works so long as the queues aren't full and can (temporarily) absorb the messages, allowing everyone to move on.

But suppose the network briefly became a little busy for some reason, and that delayed message transfer. The message send might then fail because the high water mark had been reached. Whoops! No one is then sending anything to anyone anymore!

CSP

A development of the Actor Model, called Communicating Sequential Processes, was invented to solve this problem. It has a restriction; there is no buffering of messages at all. No process can complete sending a message until the recipient has received all the data.

The theoretical consequence of this was that it was then possible to mathematically analyse a system design and pronounce it to be free of deadlock. The practical consequence is that if you've built a system that can deadlock, it will do so every time. That's actually not so bad; it'll show up in testing, not post-deployment.

Curiously this is hinted at in the documentation of Microsoft's Task Parallel library, where they advocate setting buffer lengths to zero in the intersts of achieving a more robust application.

It'd be like setting the ZeroMQ high water mark to zero, but in zmq_setsockopt() 0 means default, not nought. The default is non-zero...

CSP is much more suited to real time applications. Any shortage of available workers immediately results in an inability to send messages (so your system knows it's failed to keep up with the real time demand) instead of resulting in an increased latency as data is absorbed by sockets, etc. (which is far harder to discover).

Unfortunately almost every communications technology we have (Ethernet, TCP/IP, ZeroMQ, nanomsg, etc) leans towards Actor Model. Everything has some sort of buffer somewhere, be it a packet buffer on a NIC or a socket buffer in an operating system.

Thus to implement CSP in the real world one has to implement flow control on top of the existing transports. This takes work, and it's slightly inefficient. But if a system that needs it, it's definitely the way to go.

Personally I'd love to see 0MQ and Nanomsg to adopt it as a behavioural option.