TCP Server workers with kqueue

3.9k views Asked by At

I recently did some testing with kernel events and I came up with the following:

  • Does it make sense to use a kernel event for accepting sockets? My testing showed that I was only able to handle one accept at once (even if the eventlist array is bigger)(Makes sense to me cause .ident == sockfd is only true for one socket).

  • I thought the use of kevent is mainly to read from multiple sockets at once. Is that true?

Is this how a TCP server is done with a kqueue implementation? :


  • Listening Thread (without kqueue)
    • Accepts new connections and adds FD to a worker kqueue. QUESTION: Is this even possible? My testing showed yes, but is it guaranteed that the worker thread will be aware of the changes and is kevent really thread safe?

  • Worker thread (with kqueue)

    • Waits on reads on file descriptors added from the listening thread.

    QUESTION: How many sockets at once would make sense to check for updates?


Thanks

2

There are 2 answers

9
rici On BEST ANSWER

Normally, you use kqueue as an alternative to threads. If you're going to use threads, you can just set up a listening thread and a worker threadpool with one thread per accepted connection. That's a much simpler programming model.

In an event-driven framework, you would put both the listening socket and all the accepted sockets into the kqueue, and then handle events as they occur. When you accept a socket, you add it to the kqueue, and when a socket handler finishes it works, it could remove the socket from the kqueue. (The latter is not normally necessary because closing a fd automatically removes any associated events from any kqueue.)

Note that every event registered with a kqueue has a void* userdata, which can be used to identify the desired action when the event fires. So it's not necessary that every event queue have a unique event handler; in fact, it is common to have a variety of handlers. (For example, you might also want to handle a control channel set up through a named pipe.)

Hybrid event/thread models are certainly possible; otherwise, you cannot take advantage of multicore CPUs. One possible strategy is to use the event queue as a dispatcher in a producer-consumer model. The queue handler would directly handle events on the listening socket, accepting the connection and adding the accepted fd into the event queue. When a client connection event occurs, the event would be posted into the workqueue for later handling. It's also possible to have multiple workqueues, one per thread, and have the accepter guess which workqueue a new connection should be placed in, presumably on the basis of that thread's current load.

8
Marco On

This is not really an answer but I made a little server script with kqueue explaining the problem:

#include <stdio.h>          // fprintf
#include <sys/event.h>      // kqueue
#include <netdb.h>          // addrinfo
#include <arpa/inet.h>      // AF_INET
#include <sys/socket.h>     // socket
#include <assert.h>         // assert
#include <string.h>         // bzero
#include <stdbool.h>        // bool
#include <unistd.h>         // close

int main(int argc, const char * argv[])
{

    /* Initialize server socket */
    struct addrinfo hints, *res;
    int sockfd;

    bzero(&hints, sizeof(hints));
    hints.ai_family     = AF_INET;
    hints.ai_socktype   = SOCK_STREAM;

    assert(getaddrinfo("localhost", "9090", &hints, &res) == 0);

    sockfd = socket(AF_INET, SOCK_STREAM, res->ai_protocol);

    assert(sockfd > 0);

    {
        unsigned opt = 1;

        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0);

        #ifdef SO_REUSEPORT
        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0);
        #endif
    }

    assert(bind(sockfd, res->ai_addr, res->ai_addrlen) == 0);

    freeaddrinfo(res);

    /* Start to listen */
    (void)listen(sockfd, 5);

    {
        /* kevent set */
        struct kevent kevSet;
        /* events */
        struct kevent events[20];
        /* nevents */
        unsigned nevents;
        /* kq */
        int kq;
        /* buffer */
        char buf[20];
        /* length */
        ssize_t readlen;

        kevSet.data     = 5;    // backlog is set to 5
        kevSet.fflags   = 0;
        kevSet.filter   = EVFILT_READ;
        kevSet.flags    = EV_ADD;
        kevSet.ident    = sockfd;
        kevSet.udata    = NULL;

        assert((kq = kqueue()) > 0);

        /* Update kqueue */
        assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

        /* Enter loop */
        while (true) {
            /* Wait for events to happen */
            nevents = kevent(kq, NULL, 0, events, 20, NULL);

            assert(nevents >= 0);

            fprintf(stderr, "Got %u events to handle...\n", nevents);

            for (unsigned i = 0; i < nevents; ++i) {
                struct kevent event = events[i];
                int clientfd        = (int)event.ident;

                /* Handle disconnect */
                if (event.flags & EV_EOF) {

                    /* Simply close socket */
                    close(clientfd);

                    fprintf(stderr, "A client has left the server...\n");
                } else if (clientfd == sockfd) {
                    int nclientfd = accept(sockfd, NULL, NULL);

                    assert(nclientfd > 0);

                    /* Add to event list */
                    kevSet.data     = 0;
                    kevSet.fflags   = 0;
                    kevSet.filter   = EVFILT_READ;
                    kevSet.flags    = EV_ADD;
                    kevSet.ident    = nclientfd;
                    kevSet.udata    = NULL;

                    assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

                    fprintf(stderr, "A new client connected to the server...\n");

                    (void)write(nclientfd, "Welcome to this server!\n", 24);
                } else if (event.flags & EVFILT_READ) {

                    /* sleep for "processing" time */
                    readlen = read(clientfd, buf, sizeof(buf));

                    buf[readlen - 1] = 0;

                    fprintf(stderr, "bytes %zu are available to read... %s \n", (size_t)event.data, buf);

                    sleep(4);
                } else {
                    fprintf(stderr, "unknown event: %8.8X\n", event.flags);
                }
            }
        }
    }

    return 0;
}

Every time a client sends something the server experiences a "lag" of 4 seconds. (I exaggerated a bit, but for testing quite reasonable). So how do get around to that problem? I see worker threads (pool) with own kqueue as possible solution, then no connection lag would occur. (each worker thread reads a certain "range" of file descriptors)