ZeroMQ Job Distribution

378 views Asked by At

I have the following setup: There is a single client, multiple workers, and a single sink. The workers receive job requests from the client via a ZeroMQ message. They work on the input, and send the answer to another process (sink). Processing a message takes about 1ms, and we need to process about 50,000 messages/sec - which means we need more than 50 workers to handle the load.

I tried a simple setup, in which the client creates a single ZeroMQ PUSH socket, to which all the workers connect (via a PULL) socket. Similarly, the sink creates a single PULL socket to which all the workers connect with a PUSH socket.

IIUC, ZeroMQ sends the messages to workers using "round robin" - each time another worker gets the job. This setup seems to work efficiently enough with ~ 10 workers (and appropriate load). However, when increasing the number of workers and the load further, this breaks very quickly and the system starts to accumulate delays.

I know there are several patterns that take care of the load balancing problem, however they are geared towards multiple clients and require a router in between, which means additional code + cpu cycles. The question is:

1) What would be the best pattern to use in case of single client, multiple workers, single sink?

2) Is it possible to do this without a router between the client and the workers, by routing on the client side instead?

3) What kind of ZeroMQ sockets should be used?

Thanks!

Diagram

EDIT: Adding the code.

Client:

    void *context = zmq_ctx_new ();

    //  Socket to send messages on
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (sender, "tcp://*:5557");

    //  Socket to send start of batch message on
    void *sink = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sink, "tcp://localhost:5558");

    printf ("Press Enter when the workers are ready: ");
    getchar ();
    printf ("Sending tasks to workers\n");

    //  The first message is "0" and signals start of batch
    s_send (sink, "0");

    unsigned long i;
    const int nmsgs = atoi(argv[1]);
    const int nmsgs_sec = atoi(argv[2]);
    const int buff_size = 1024; // 1KB msgs
    unsigned long t, t_start;
    t_start = timestamp();
    for (i = 0; i < nmsgs; i++) {
            t = timestamp();
            // Pace the sending according to nmsgs_sec
            while( i * 1000000 / (t+1-t_start) > nmsgs_sec) {
                    // busy wait
                    t = timestamp();
            }
            char buffer [buff_size];
            // Write current timestamp in the packet beginning
            sprintf (buffer, "%lu", t);
            zmq_send (sender, buffer, buff_size, 0);
    }
    printf("Total time: %lu ms Planned time: %d ms\n", (timestamp() - t_start)/1000, nmsgs * 1000 / nmsgs_sec);

    zmq_close (sink);
    zmq_close (sender);
    zmq_ctx_destroy (context);

Worker:

//  Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, receiver_addr);

//  Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, sender_addr);

//  Process tasks forever
const int buff_size = 1024;
char buffer[buff_size];
while (1) {
    zmq_recv (receiver, buffer, buff_size, 0);
    s_send (sender, buffer);
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);

Sink:

//  Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");

//  Wait for start of batch
char *string = s_recv (receiver);
free (string);

unsigned long t1;
unsigned long maxdt = 0;
unsigned long sumdt = 0;

int task_nbr;
int nmsgs = atoi(argv[1]);
printf("nmsgs = %d\n", nmsgs);
for (task_nbr = 0; task_nbr < nmsgs; task_nbr++) {
    char *string = s_recv (receiver);
    t1 = timestamp();
    unsigned long t0 = atoll(string);
    free (string);

    unsigned long dt = t1-t0;
    maxdt = (maxdt > dt ? maxdt : dt);
    sumdt += dt;

    if(task_nbr % 10000 == 0) {
            printf("%d %lu\n", task_nbr, dt);
    }
}

printf("Average time: %lu usec\tMax time: %lu usec\n", sumdt/nmsgs, maxdt);

zmq_close (receiver);
zmq_ctx_destroy (context);
1

There are 1 answers

3
Jason On

You have multiple options, depending upon where the actual errors are cropping up in your current setup (which is impossible to tell from the info you've given).

You definitely don't need another "in-between" node.

If the problem is the volume of connections (1->50) is the problem in your current setup, you could set up multiple PUSH sockets on the client, each with a subset of workers, and just load balance internally on the client.

If the problem is the PUSH socket itself, you could instead use DEALER sockets on the "push" side and ROUTER sockets on the "pull" side. But I don't expect this is the issue.

In general, I would expect your current setup to be the "correct" one, and perhaps there is a bug in your implementation. Do you know where the errors are being introduced? Client -> Worker or Worker -> Sink? Or perhaps somewhere else?