Why doesn't my epoll based program improve performance by increasing the number of connections?

512 views Asked by At

I have a program which reads a file of domain names (one domain per line) and performs asynchronous DNS resolution and then downloads the landing page for each domain. The network communication is performed with an epoll based event loop. Testing has shown that increasing the number of concurrent connections does not increase performance in terms of Mbits/s throughput and in terms of number of pages downloaded. The results of my test are as follows. The first test is for 1024 concurrent connections, the second test for 2048, the third for 4096 and the fourth for 8192:

:~$ ./crawler com.lowercase 
iterations=156965 total domains=5575 elapsed=65.51s domains/s=85.10 KB=28163 Mbit/s=2.86
:~$ ./crawler com.lowercase 
iterations=88339 total domains=10525 elapsed=64.98s domains/s=161.98 KB=52936 Mbit/s=5.41
:~$ ./crawler com.lowercase 
iterations=143989 total domains=9409 elapsed=64.80s domains/s=145.20 KB=48166 Mbit/s=4.94
:~$ ./crawler com.lowercase 
iterations=109597 total domains=10532 elapsed=65.13s domains/s=161.71 KB=51874 Mbit/s=5.29

As you can see there is really no trend of increase in terms of domains downloaded per second or Mbits/s throughput. These results run contrary to expectations. Can anyone explain these results? Suggest a fix to improve performance with increasing number of connections?

For those interested a full code listing is provided below:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <resolv.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <time.h>
#include <ares.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdarg.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>

#define MAXWAITING 1000 /* Max. number of parallel DNS queries */
#define MAXTRIES      3 /* Max. number of tries per domain */
#define DNSTIMEOUT    3000 /* Max. number of ms for first try */
#define DNS_MAX_EVENTS 10000
#define DNS_MAX_SERVERS 2
#define SERVERS    "1.0.0.1,8.8.8.8" /* DNS server to use (Cloudflare & Google) */
#define MAXDOMAINS 8192
#define PORT 80
#define MAXBUF 1024
#define MAX_EPOLL_EVENTS 8192
#define MAX_CONNECTIONS 8192
#define TIMEOUT 10000
ares_socket_t dns_client_fds[ARES_GETSOCK_MAXNUM] = {0};
struct epoll_event ev, dns_events[DNS_MAX_EVENTS];
int i,bitmask,nfds, epollfd, timeout, fd_count, ret;
int epfd;
int sockfd[MAX_CONNECTIONS];
struct epoll_event event[MAX_CONNECTIONS];
struct sockaddr_in dest[MAX_CONNECTIONS];
char resolved[MAXDOMAINS][254];
char ips[MAXDOMAINS][128];
int current = 0, active = 0, next = 0;
char servers[MAX_CONNECTIONS][128];
char domains[MAX_CONNECTIONS][254];
char get_buffer[MAX_CONNECTIONS][1024];
char buffer[MAX_CONNECTIONS][MAXBUF];
int buffer_used[MAX_CONNECTIONS];
struct timespec startTime, stopTime;
int i, num_ready, connections = 0, done = 0, total_bytes = 0, total_domains = 0, iterations = 0, count = 0;
FILE * fp;
struct epoll_event events[MAX_EPOLL_EVENTS];
static int nwaiting;

static void state_cb(void *data, int s, int read, int write)
{
    //printf("Change state fd %d read:%d write:%d\n", s, read, write);
}

static void callback(void *arg, int status, int timeouts, struct hostent *host)
{
    nwaiting--;

    if(!host || status != ARES_SUCCESS){
        //fprintf(stderr, "Failed to lookup %s\n", ares_strerror(status));
        return;
    }

    char ip[INET6_ADDRSTRLEN];

    if (host->h_addr_list[0] != NULL){
        inet_ntop(host->h_addrtype, host->h_addr_list[0], ip, sizeof(ip));
        strcpy(resolved[current], host->h_name);
        strcpy(ips[current], ip);
        if (current < MAXDOMAINS - 1) current++; else current = 0;
        active++;
        printf("active %d\r", active);
    }
}

static void wait_ares(ares_channel channel)
{
    nfds=0;
    bitmask=0;
    for (i =0; i < DNS_MAX_SERVERS ; i++) {
        if (dns_client_fds[i] > 0) {
            if (epoll_ctl(epollfd, EPOLL_CTL_DEL, dns_client_fds[i], NULL) < 0) {
                continue;
            }
        }
    }
    memset(dns_client_fds, 0, sizeof(dns_client_fds));
    bitmask = ares_getsock(channel, dns_client_fds, DNS_MAX_SERVERS);
    for (i =0; i < DNS_MAX_SERVERS ; i++) {
       if (dns_client_fds[i] > 0) {
            ev.events = 0;
            if (ARES_GETSOCK_READABLE(bitmask, i)) {
                ev.events |= EPOLLIN;
            }
            if (ARES_GETSOCK_WRITABLE(bitmask, i)) {
                ev.events |= EPOLLOUT;
            }
            ev.data.fd = dns_client_fds[i];
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, dns_client_fds[i], &ev) < 0) {
                if(errno == EEXIST) {
                    nfds++;
                    continue;
                }
                continue;
            }
            nfds++;
        }
    }
    if(nfds==0)
    {
        return;
    }
    timeout = 1000;//millisecs
    fd_count = epoll_wait(epollfd, dns_events, DNS_MAX_EVENTS, timeout);
    if (fd_count < 0) {
        return;
    }
    if (fd_count > 0) {
        for (i = 0; i < fd_count; ++i) {
            ares_process_fd(channel, ((dns_events[i].events) & (EPOLLIN) ? dns_events[i].data.fd:ARES_SOCKET_BAD), ((dns_events[i].events) & (EPOLLOUT)? dns_events[i].data.fd:ARES_SOCKET_BAD));
        }
    } else {
        ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
    }       
}

void make_socket_and_connect (int sock)
{
    if ( (sockfd[sock] = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) < 0 ) {
        perror("Socket");
        exit(errno);
    }
    count++;
    event[sock].events = EPOLLIN|EPOLLOUT;
    event[sock].data.fd = sockfd[sock];
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd[sock], &event[sock]);
    bzero(&dest[sock], sizeof(dest[sock]));
    dest[sock].sin_family = AF_INET;
    dest[sock].sin_port = htons(PORT);
    if ( inet_pton(AF_INET, servers[sock], &dest[sock].sin_addr.s_addr) == 0 ) {
        printf("\n");
        perror(servers[sock]);
        exit(errno);
    }
    if ( connect(sockfd[sock], (struct sockaddr*)&dest[sock], sizeof(dest[sock])) != 0 ) {
        if(errno != EINPROGRESS) {
            printf("%s\n", servers[sock]);
            perror("Connect again ");
            //exit(errno);
        }
        buffer_used[sock] = 0;
    }
}

int is_valid_ip(char *domain)
{
    if (!strcmp(domain, "255.255.255.255"))
        return 0;
    if (!strcmp(domain, "192.168.1.0"))
        return 0;
    if (!strcmp(domain, "127.0.0.0"))
        return 0;
        
    return 1;
}

void close_socket (int socket)
{
    close(sockfd[socket]);
    count--;
    epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd[socket], &event[socket]);
}

void get_domain_and_ip(int id)
{
    //close_socket(id);
    active--;
get_domain_name:
    strcpy(servers[id], ips[next]);
    strcpy(domains[id], resolved[next]);
    if (next < (MAXDOMAINS - 1)) next++; else next = 0;
    if (is_valid_ip(servers[id]))
    {
        make_socket_and_connect(id);
        total_domains++;
    }
    else
        goto get_domain_name;                
}

void get_domain_and_ip_without_connect(int id)
{
get_domain_name2:
    strcpy(servers[id], ips[next]);
    strcpy(domains[id], resolved[next]);
    if (next < (MAXDOMAINS - 1)) next++; else next = 0;
    if (!is_valid_ip(servers[id]))
        goto get_domain_name2;                
}

void get_time()
{
    clock_gettime(CLOCK_MONOTONIC, &stopTime);
    uint64_t msElapsed = (stopTime.tv_nsec - startTime.tv_nsec) / 1000000 + (stopTime.tv_sec - startTime.tv_sec) * 1000;
    double seconds = (double)msElapsed / 1000.0;
    iterations++;
    fprintf(stderr, "iterations=%d total domains=%d elapsed=%2.2fs domains/s=%2.2f KB=%d Mbit/s=%2.2f num_ready=%d count=%d active=%d next=%d current=%d.....\r"
            , iterations, total_domains, seconds, total_domains/seconds, total_bytes/1024, 8*total_bytes/seconds/1024/1204, num_ready, count, active, next, current);
}

ssize_t send_data(int id)
{
    ssize_t nByte = send(sockfd[id], get_buffer[id] + buffer_used[id], strlen(get_buffer[id]) - buffer_used[id], 0);
    return nByte;
}

ssize_t recv_data(int id)
{
    ssize_t nByte = recv(sockfd[id], buffer[id], sizeof(buffer[id]), 0);
    return nByte;
}

int wait()
{
    int ret = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, TIMEOUT/*timeout*/);
    return ret;
}
                
int main(int argc, char *argv[]) {
        
    sigaction(SIGPIPE, &(struct sigaction){SIG_IGN}, NULL);
    FILE * fp;
    char domain[254];
    size_t len = 0;
    ssize_t read;
    ares_channel channel;
    int status, dns_done = 0;
    int optmask;
    
    status = ares_library_init(ARES_LIB_INIT_ALL);
    if (status != ARES_SUCCESS) {
        printf("ares_library_init: %s\n", ares_strerror(status));
        return 1;
    }

    struct ares_options options = {
        .timeout = DNSTIMEOUT,     /* set first query timeout */
        .tries = MAXTRIES       /* set max. number of tries */
    };
    optmask = ARES_OPT_TIMEOUTMS | ARES_OPT_TRIES;

    status = ares_init_options(&channel, &options, optmask);
    if (status != ARES_SUCCESS) {
        printf("ares_init_options: %s\n", ares_strerror(status));
        return 1;
    }

    status = ares_set_servers_csv(channel, SERVERS);
    if (status != ARES_SUCCESS) {
        printf("ares_set_servers_csv: %s\n", ares_strerror(status));
        return 1;
    }
    
    memset(dns_client_fds, 0, sizeof(dns_client_fds));
    memset((char *)&ev, 0, sizeof(struct epoll_event));
    memset((char *)&dns_events[0], 0, sizeof(dns_events));

    epollfd = epoll_create(DNS_MAX_SERVERS);
    
    fp = fopen(argv[1], "r");
    if (!fp)
        exit(EXIT_FAILURE);

    do{
        if (nwaiting >= MAXWAITING || dns_done) {
            do {
                wait_ares(channel);
                
            } while (nwaiting > MAXWAITING);
        }
        if (!dns_done) {
            if (fscanf(fp, "%253s", domain) == 1) {
                ares_gethostbyname(channel, domain, AF_INET, callback, NULL);
                nwaiting++;
            } else {
                //fprintf(stderr, "done sending\n");
                dns_done = 1;
            }
        }
    } while (active < MAX_CONNECTIONS);
    
    /*---Open sockets for streaming---*/
    for (i = 0; i < MAX_CONNECTIONS; i++)
    { 
        if ( (sockfd[i] = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0)) < 0 ) {
            perror("Socket");
            exit(errno);
        }
        count++;
    }

    /*---Add sockets to epoll---*/
    epfd = epoll_create1(0);
    for (i = 0; i < MAX_CONNECTIONS; i++)
    {
        event[i].events = EPOLLIN|EPOLLOUT; 
        event[i].data.fd = sockfd[i];
        epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd[i], &event[i]);
    }
    
    /*---Initialize server address/port structs---*/
    for (i = 0; i < MAX_CONNECTIONS; i++)
    {
        get_domain_and_ip_without_connect(i);
        //printf("%s %s\n", servers[i], domains[i]);
        bzero(&dest[i], sizeof(dest[i]));
        dest[i].sin_family = AF_INET;
        dest[i].sin_port = htons(PORT);
        if ( inet_pton(AF_INET, servers[i], &dest[i].sin_addr.s_addr) == 0 ) {
           perror(servers[i]);
           exit(errno);
        }
    }
    
    /*---Connect to servers---*/
    for (i = 0; i < MAX_CONNECTIONS; i++)
    {
        if ( connect(sockfd[i], (struct sockaddr*)&dest[i], sizeof(dest[i])) != 0 ) {
            if(errno != EINPROGRESS) {
                perror("Connect ");
                //exit(errno);
            }
            buffer_used[i] = 0;
        }
    }
    clock_gettime(CLOCK_MONOTONIC, &startTime);
    while (1)
    {
        /*---Do async DNS---*/
        while (active < MAXDOMAINS && nwaiting > 0) {
            //printf("active = %d MAXDOMAINS = %d nwaiting = %d MAXWAITING = %d\n", active, MAXDOMAINS, nwaiting, MAXWAITING);
            if (nwaiting >= MAXWAITING || dns_done) {
                do {
                    wait_ares(channel);
                } while (nwaiting > MAXWAITING);
            }
            if (!dns_done) {
                if (fscanf(fp, "%253s", domain) == 1) {
                    ares_gethostbyname(channel, domain, AF_INET, callback, NULL);
                    nwaiting++;
                } else {
                    //fprintf(stderr, "done sending\n");
                    dns_done = 1;
                }
            }
        } //while (active < MAXDOMAINS);
        /*---Wait to be able to send---*/
        num_ready = wait();
        get_time();
        if (!num_ready) break;
        for(i = 0; i < num_ready; i++) {
            int index;
            if(events[i].events & EPOLLOUT) {
                for (int j = 0; j < MAX_CONNECTIONS; j++)
                {
                    if (events[i].data.fd == sockfd[j])
                    {
                        index = j;
                        break;
                    }
                }
                snprintf(get_buffer[index], sizeof(get_buffer[index]), 
                "GET %s HTTP/1.1\r\nHost: %s\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\r\n\r\n", "/", domains[i]);
                ssize_t nByte = 0;
                if (buffer_used[index] < strlen(get_buffer[index]))
                    nByte = send_data(index);
                if (nByte > 0)
                {
                    buffer_used[index] += nByte;
                    total_bytes += nByte;
                }
                if (nByte == -1 && errno == EPIPE)
                {
                    get_domain_and_ip(index);
                }
            }
            if(events[i].events & EPOLLIN) {
                for (int j = 0; j < MAX_CONNECTIONS; j++)
                {
                    if (events[i].data.fd == sockfd[j])
                    {
                        index = j;
                        break;
                    }
                }
                bzero(buffer[index], MAXBUF);
                ssize_t nByte = recv_data(index);
                //if (nByte > 0) printf("Received: %s from %s at %s \n", buffer[index], domains[index], servers[index]);
                if (nByte > 0) total_bytes += nByte;
                if (nByte == 0)
                {
                    close_socket(index);
                    if (!done)
                    {
                        get_domain_and_ip(index);
                    }
                }
            }
        }
        get_time();
        if (done && count == 0) break;
    }
    ares_destroy(channel);
    ares_library_cleanup();
    fclose(fp);
    printf("\nFinished without errors\n");
    return 0;
}

EDIT

As requested I've performed more extensive testing with smaller numbers of concurrent connections. The results are below. The results are for 2, 4, 8, 16, 32, 64, 128, 256 and 512 concurrent connections.

total domains=4 elapsed=60.37s domains/s=0.07 KB=5 Mbit/s=0.00
total domains=0 elapsed=60.39s domains/s=0.00 KB=5 Mbit/s=0.00
total domains=17 elapsed=60.40s domains/s=0.28 KB=73 Mbit/s=0.01
total domains=17 elapsed=60.48s domains/s=0.28 KB=106 Mbit/s=0.01
total domains=40 elapsed=60.45s domains/s=0.66 KB=189 Mbit/s=0.02
total domains=172 elapsed=60.37s domains/s=2.85 KB=907 Mbit/s=0.10
total domains=312 elapsed=60.56s domains/s=5.15 KB=1272 Mbit/s=0.14
total domains=1165 elapsed=60.65s domains/s=19.21 KB=5687 Mbit/s=0.62
total domains=1966 elapsed=60.34s domains/s=32.58 KB=9884 Mbit/s=1.09

The tests show that for smaller numbers of concurrent connections we do see an increase in performance for increasing numbers of concurrent connections.

EDIT

As requested in chat here is a link to the file of domains I am testing with. Warning - the file is over 2GB in size. It has over 130M domains. domains Or if you just want 100,000 here is another link to play with 100,000 domains

0

There are 0 answers