epoll: losing some EPOLLOUT events?

6.4k views Asked by At

This is how my server looks like:

-WorkerThread(s):

  • calls epoll_wait, accepts connections, sets fd nonblocking(EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP)
  • calls recv until EAGAIN on EPOLLIN event and pushes all data to global RecvBuffer(pthread_mutex synced)
  • on EPOLLOUT event: accesses global SendBuffer and if there's data to be sent for current ready fd, do it (in while loop until EAGAIN or until all data is sent; when whole packet is sent, pop it from SendBuffer)

-IOThread(s)

  • takes data from global RecvBuffer, proccess them
  • sends response by first trying to call send right away. If not all data is sent, push rest of it onto global SendBuffer to be sent from WorkerThread)

Problem is, that server doesnt send all queued data(they are left in SendBuffer) and amount of 'not sent' data grows by increasing number of clients. For the sake of testing im using only 1 workerthread and 1 iothread, but it doesnt seem to make any difference if i use more. Accessing global buffers is protected with pthread_mutex. Also, my response data size is 130k bytes(it needs 3 send calls at least to send this amount of data). On the other side is windows client using blocking sockets.

Thank you very much! MJ

edit:

Yes, by default I'm waiting for EPOLLOUT events even tho I have nothing to send. For implementation simplicity and man page guide, i did it like this. Also, my undestanding of it was like this:

Even if I "miss" EPOLLOUT event at the time i dont want to send anything it's no problem because when i want to send data, I'll call send until EAGAIN and EPOLLOUT should be triggered in future(and it is most of time)

Now I modified code to switch between IN/OUT events:

On accept:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);

when all data has been sent:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

when I reach EAGAIN by calling send in IOThread:

event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

..and I get same behavior. Also, I tried removing EPOLLET flag and nothing's changed

One side question: Does epoll_ctl with EPOLL_CTL_MOD flag replaces events member or just ORs it with given argument?

EDIT3: Updated IOThread function to send continiuosly until all data has been sent, or until EAGAIN. I also tried to send even if I sent all data, but most of time i was getting errno 88 Socket operation on non-socket

EDIT4: I fixed some bugs in my 'sending code' so I dont get any queued data not sent now.. But, I dont receive as much data as I should :)) Highest amount of 'missed'(not received) data I get when client calls recv right away when sending is complete, and it grows with number of clients. When I put 2 sec delay between send and recv call on client(blocking calls) I lose none to little data on server, depending how many clients im running( client test code includes simple for loop with 1 send and 1 recv call after it ) Again, tried with and without ET mode.. Below is updated WorkerThread function which is responsible for receiving data. @Admins/Mods Maybe I should open new topic now as problem is a bit different?

void CNetServer::WorkerThread(void* param)
{
    CNetServer* pNetServer =(CNetServer*)param;
    struct epoll_event event;
    struct epoll_event *events;
    int s = 0;

//  events = (epoll_event*)calloc (MAXEVENTS, sizeof event);

    while (1)
    {
        int n, i;

//      printf ("BLOCKING NOW! epoll_wait thread %d\n",pthread_self());
        n = pNetServer->m_epollCtrl.Wait(-1);
//      printf ("epoll_wait thread %d\n",pthread_self());
        pthread_mutex_lock(&g_mtx_WorkerThd);
        for (i = 0; i < n; i++)
        {
            if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
            {
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)

            //  g_SendBufferArray.RemoveAll( 0 );

                char szFileName[30] = {0};
                sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
                remove(szFileName);

            /*  printf( "\n\n\n");
                printf( "\tDATA LEFT COUNT:%d\n",g_SendBufferArray.size());
                for (int k=0;k<g_SendBufferArray.size();k++)
                    printf( "\tSD: %d DATA LEFT:%d\n",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
*/

            //  fprintf (stderr, "epoll error\n");
            //  fflush(stdout);
                close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                continue;
            }
            else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
            {
                // We have a notification on the listening socket, which   means one or more incoming connections. 
                while (1)
                {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;
                    infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
                    if (infd == -1)
                    {
                        if ((errno == EAGAIN) ||
                            (errno == EWOULDBLOCK))
                        {
                            // We have processed all incoming connections.
                            break;
                        }
                        else
                        {
                            perror ("accept");
                            break;
                        }
                    }

                    s = getnameinfo (&in_addr, in_len,
                        hbuf, sizeof hbuf,
                        sbuf, sizeof sbuf,
                        NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0)
                    {
                        printf("Accepted connection on descriptor %d "
                            "(host=%s, port=%s) thread %d\n", infd, hbuf, sbuf,pthread_self());
                    }

                    // Make the incoming socket non-blocking and add it to the list of fds to monitor.
                    CEpollCtrl::SetNonBlock(infd,true);
                    if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
                    {
                        perror ("epoll_ctl");
                        abort ();
                    }

                }
                continue;
            }
            if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
            {

                pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
            }
            if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
            {
                printf("EPOLLIN TRIGGERED FOR SD: %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                // We have data on the fd waiting to be read. 
                int done = 0;
                ssize_t count = 0;
                char buf[512];
                while (1)
                {
                    count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
                    printf("recv sd %d size %d thread %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
                    if (count == -1)
                    {
                        // If errno == EAGAIN, that means we have read all data. So go back to the main loop.
                        if ( errno != EAGAIN )
                        {
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        //connection is closed by peer.. do a cleanup and close
                        done = 1;
                        break;
                    }
                    else if (count > 0)
                    {
                        static int nDataCounter = 0;
                        nDataCounter+=count;
                        printf("RECVDDDDD %d\n",nDataCounter);
                        CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
                    }
                }

                if (done)
                {
                    printf ("Closed connection on descriptor %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                    // Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. 
                    close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                }
            }

        }
//      

        pNetServer->IOThread( (void*)pNetServer );

        pthread_mutex_unlock(&g_mtx_WorkerThd);
    }

}

void CNetServer::IOThread(void* param)
{

    BYTEARRAY* pbPacket = new BYTEARRAY;
    int fd;
    struct epoll_event event;
    CNetServer* pNetServer =(CNetServer*)param;

    printf("IOThread startin' !\n");

    for (;;)
    {
        bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );

        if( bGotIt )
        {

            //process packet here
            printf("Got 'em packet yo !\n");
            BYTE* re = new BYTE[128000];
            memset((void*)re,0xCC,128000);
            buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;

            pthread_mutex_lock(&g_mtx_WorkerThd);

            while( 1 )
            {
                    int s;
                    int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
                    printf ("IOT: Trying to send nSent: %d buffsize: %d \n",nSent,responsebuff->nSize - responsebuff->nBytesSent);

                    if (nSent == -1)
                    {

                        if (errno == EAGAIN || errno == EWOULDBLOCK )
                        {
                                g_vSendBufferArray.push_back( *responsebuff );
                                printf ("IOT: now waiting for EPOLLOUT\n");
                                event.data.fd = fd;
                                event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
                                s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
                                break;
                                if (s == -1)
                                {
                                    perror ("epoll_ctl");
                                    abort ();
                                }

                        }
                        else
                        {
                            printf( "%d\n",errno );
                            perror ("send");
                            break;
                        }
                        printf ("IOT: WOOOOT\n");
                        break;
                    }
                    else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT:all is sent! wOOhOO\n");
                        responsebuff->sd = 0;
                        responsebuff->nBytesSent += nSent;
                        delete responsebuff;
                        break;
                    }
                    else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT: partial send!\n");
                        responsebuff->nBytesSent += nSent;

                    }

            }
            delete [] re;

            pthread_mutex_unlock(&g_mtx_WorkerThd);

        }
    }

}
1

There are 1 answers

5
David Schwartz On
  1. Stop using EPOLLET. It's almost impossible to get right.

  2. Don't ask for EPOLLOUT events if you have nothing to send.

  3. When you have data to send on a connection, follow this logic:

    A) If there's already data in your send queue for that connection, just add the new data. You're done.

    B) Try to send the data immediately. If you send it all, you're done.

    C) Save the leftover data in the send queue for this connection. Now ask for EPOLLOUT for this connection.