linux tcp server

这里分析两种模型

A: 来源于网络,http://bbs.chinaunix.net/thread-4067753-1-1.html,号称50万QPS

B: 本人自己写的,我觉得性能上比上述的模型要好

——————————————————————————————————————————

A:

#define _GNU_SOURCE

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <sched.h>

#include <pthread.h>

#include <sys/epoll.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <signal.h>

#include <unistd.h>

#include <fcntl.h>

#include <errno.h>

#include <time.h>

typedef struct connection_st {

    int sock;

    int index; /* which epoll fd this conn belongs to*/

    int using;

#define BUF_SIZE 4096

    int roff;

    char rbuf[BUF_SIZE];

    int woff;

    char wbuf[BUF_SIZE];

}*connection_t;

#define CONN_MAXFD 65536

struct connection_st g_conn_table[CONN_MAXFD] = {0};

static sig_atomic_t shut_server = 0;

void shut_server_handler(int signo) {

    shut_server = 1;

}

#define EPOLL_NUM 8

int epfd[EPOLL_NUM];

int lisSock;

#define WORKER_PER_GROUP 1

#define NUM_WORKER (EPOLL_NUM * WORKER_PER_GROUP)

pthread_t worker[NUM_WORKER]; /* echo group has 6 worker threads */

int sendData(connection_t conn, char *data, int len) {

    if (conn->woff){

        if (conn->woff + len > BUF_SIZE) {

            return -1;

        }

        memcpy(conn->wbuf + conn->woff, data, len);

        conn->woff += len;

        return 0;

    } else {

        int ret = write(conn->sock, data, len);

        if (ret > 0){

            if (ret == len) {

                return 0;

            }

            int left = len - ret;

            if (left > BUF_SIZE) return -1;

           

            memcpy(conn->wbuf, data + ret, left);

            conn->woff = left;

        } else {

            if (errno != EINTR && errno != EAGAIN) {

                return -1;

            }

            if (len > BUF_SIZE) {

                return -1;

            }

            memcpy(conn->wbuf, data, len);

            conn->woff = len;

        }

    }

    return 0;

}

int handleReadEvent(connection_t conn) {

    if (conn->roff == BUF_SIZE) {

        return -1;

    }

   

    int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);

    if (ret > 0) {

        conn->roff += ret;

       

        int beg, end, len;

        beg = end = 0;

        while (beg < conn->roff) {

            char *endPos = (char *)memchr(conn->rbuf + beg, ' ', conn->roff - beg);

            if (!endPos) break;

            end = endPos - conn->rbuf;

            len = end - beg + 1;

           

            /*echo*/

            if (sendData(conn, conn->rbuf + beg, len) == -1) return -1;

            beg = end + 1;

            printf("request_finish_time=%ld ", time(NULL));

        }

        int left = conn->roff - beg;

        if (beg != 0 && left > 0) {

            memmove(conn->rbuf, conn->rbuf + beg, left);

        }

        conn->roff = left;

    } else if (ret == 0) {

        return -1;

    } else {

        if (errno != EINTR && errno != EAGAIN) {

            return -1;

        }

    }

    return 0;

}

int handleWriteEvent(connection_t conn) {

    if (conn->woff == 0) return 0;

    int ret = write(conn->sock, conn->wbuf, conn->woff);

    if (ret == -1) {

        if (errno != EINTR && errno != EAGAIN) {

            return -1;

        }

    } else {

        int left = conn->woff - ret;

        if (left > 0) {

            memmove(conn->wbuf, conn->wbuf + ret, left);

        }

        conn->woff = left;

    }

    return 0;

}

void closeConnection(connection_t conn) {

    struct epoll_event evReg;

    conn->using = 0;

    conn->woff = conn->roff = 0;

    epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

    close(conn->sock);

}

void *workerThread(void *arg) {

    int epfd = *(int *)arg;

   

    struct epoll_event event;

    struct epoll_event evReg;

    /* only handle connected socket */

    while (!shut_server) {

        int numEvents = epoll_wait(epfd, &event, 1, 1000);

        

        if (numEvents > 0) {

            int sock = event.data.fd;

            connection_t conn = &g_conn_table[sock];

               

            if (event.events & EPOLLOUT) {

                if (handleWriteEvent(conn) == -1) {

                    closeConnection(conn);

                    continue;

                }

            }

            if (event.events & EPOLLIN) {

                if (handleReadEvent(conn) == -1) {

                    closeConnection(conn);

                    continue;

                }

            }

               

            evReg.events = EPOLLIN | EPOLLONESHOT;

            if (conn->woff > 0) evReg.events |= EPOLLOUT;

            evReg.data.fd = sock;

            epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);

        }

    }

    return NULL;

}

void *listenThread(void *arg) {

    int lisEpfd = epoll_create(5);

    struct epoll_event evReg;

    evReg.events  = EPOLLIN;

    evReg.data.fd = lisSock;

    epoll_ctl(lisEpfd, EPOLL_CTL_ADD, lisSock, &evReg);

   

    struct epoll_event event;

    int rrIndex = 0; /* round robin index */

   

    /* only handle listen socekt */

    while (!shut_server) {

        int numEvent = epoll_wait(lisEpfd, &event, 1, 1000);

        if (numEvent > 0) {

            int sock = accept(lisSock, NULL, NULL);

            if (sock > 0) {

                g_conn_table[sock].using = 1;

                   

                int flag;

                flag = fcntl(sock, F_GETFL);

                fcntl(sock, F_SETFL, flag | O_NONBLOCK);

                   

                evReg.data.fd = sock;

                evReg.events = EPOLLIN | EPOLLONESHOT;

                           

                /* register to worker-pool's epoll,

                 * not the listen epoll */

                g_conn_table[sock].index= rrIndex;

                epoll_ctl(epfd[rrIndex], EPOLL_CTL_ADD, sock, &evReg);

                rrIndex = (rrIndex + 1) % EPOLL_NUM;

            }

        }

    }

    close(lisEpfd);

    return NULL;

}

int main(int argc, char *const argv[]) {

    int c;

    for (c = 0; c < CONN_MAXFD; ++c) {

        g_conn_table[c].sock = c;

    }

    struct sigaction act;

    memset(&act, 0, sizeof(act));

    act.sa_handler = shut_server_handler;

    sigaction(SIGINT, &act, NULL);

    sigaction(SIGTERM, &act, NULL);

    /* create 2 different epoll fd */

   

    lisSock = socket(AF_INET, SOCK_STREAM, 0);

   

    int reuse = 1;

    setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

   

    int flag;

    flag = fcntl(lisSock, F_GETFL);

    fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);

    struct sockaddr_in lisAddr;

    lisAddr.sin_family = AF_INET;

    lisAddr.sin_port = htons(9876);

    lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);

   

    if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {

        perror("bind");

        return -1;

    }

    listen(lisSock, 4096);

   

    pthread_t lisTid;

    pthread_create(&lisTid, NULL, listenThread, NULL);

    int epi;

    for (epi = 0; epi < EPOLL_NUM; ++ epi) {

        epfd[epi] = epoll_create(20);

    }

    int i;

    cpu_set_t mask;

    for (i = 0; i < EPOLL_NUM; ++i) {

        int j;

        for (j = 0; j < WORKER_PER_GROUP; ++j) {

            pthread_create(worker + (i * WORKER_PER_GROUP + j), NULL, workerThread, epfd + i);

            CPU_ZERO(&mask);

            CPU_SET(i, &mask);

            if (pthread_setaffinity_np(*(worker + (i * WORKER_PER_GROUP + j)), sizeof(mask), &mask) < 0)

            {

                fprintf(stderr, "set thread affinity failed ");

            }

        }

    }

   

    for (i = 0; i < NUM_WORKER; ++i) {

        pthread_join(worker[i], NULL);

    }

    pthread_join(lisTid, NULL);

   

    struct epoll_event evReg;

    for (c = 0; c < CONN_MAXFD; ++c) {

        connection_t conn = g_conn_table + c;

        if (conn->using) {

            epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

            close(conn->sock);

        }

    }   

    for (epi = 0; epi < EPOLL_NUM; ++epi) {

        close(epfd[epi]);

    }

    close(lisSock);

    return 0;

}

B:

#define _GNU_SOURCE

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <netdb.h>

#include <unistd.h>

#include <fcntl.h>

#include <sys/epoll.h>

#include <error.h>

#include <errno.h>

#include <signal.h>

#include <sched.h>

#include "thread-pool.h"

#define MAXEVENTS 64

static int make_socket_non_blocking(int sfd)

{

    int flags, s;

    flags = fcntl(sfd, F_GETFL, 0);

    if (-1==flags)

    {

        perror("fcntl");

         return -1;

    }

    flags |= O_NONBLOCK;

    s = fcntl(sfd, F_SETFL, flags);

    if (-1==s)

    {

        perror("fcntl");

         return -1;

    }

    return 0;

}

static int create_and_bind(char *port)

{

    struct addrinfo hints;

    struct addrinfo *result, *rp;

    int s, sfd;

    memset(&hints, 0, sizeof(struct addrinfo));

    hints.ai_family = AF_UNSPEC;//return IPv4 and IPv6 choices

    hints.ai_socktype = SOCK_STREAM;//we want a TCP socket

    hints.ai_flags = AI_PASSIVE;//all interfaces

    s = getaddrinfo(NULL, port, &hints, &result);

    if (0!=s)

    {

        fprintf(stderr, "getaddrinfo:%s ", gai_strerror(s));

         return -1;

    }

    for(rp=result; NULL!=rp; rp=rp->ai_next)

    {

        sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);

         if (-1==sfd)

        {

             continue;

         }

         s = bind(sfd, rp->ai_addr, rp->ai_addrlen);

         if (0==s)

         {

             //we managed to bind successfully

             break;

         }

         close(sfd);

    }

    if (NULL==rp)

    {

         fprintf(stderr, "could not bind");

             return -1;

    }

    freeaddrinfo(result);

    return sfd;

}

int run = 1;

void SignalHandler(int iSignNum)

{

    printf("capture signal number:%d ", iSignNum);

    run = 0;

}

void *handler(void *arg)

{

    int s;

    int fd = *((int*)arg);

    /*we have data on the fd waiting to be read. read and

    display it. we must read whatever data is available

    completely, as we are running in edge-triggered mode

    and won't get notification again for the same data.*/

    int done = 0;

    while(1)

    {

        ssize_t count;

        char buf[512];

        count = read(fd, buf, sizeof(buf));

        if (-1==count)

        {

            /*if errno==EAGAIN, that means we have read all

            data. so go back to the main loop*/

            if (errno==EAGAIN||errno==EWOULDBLOCK)

            {

                done = 1;

            }

            else

            {

                fprintf(stderr, "fd:%d ", fd);

                perror("read client data");

            }

            break;

        }

        else if (0==count)

        {

            /*end of file. the remote has closed the connection*/

            done = 1;

            break;

        }

        //write the buffer to standard output

        s = write(1, buf, count);

        if (-1==s)

        {

            perror("write");

            abort();

        }

    }

    if (done)

    {

        write(fd, "fine, thank you", strlen("fine, thank you")+1);

        printf("closed connection on descriptor %d ", fd);

        /*closing the descriptor will make epoll remove it

        from the set of descriptors which are monitored.*/

        close(fd);

    }

}

int main(int argc, char *argv[])

{

    int sfd, s;

    int efd;

    struct epoll_event event;

    struct epoll_event *events;

    if (2!=argc)

    {

        fprintf(stderr, "Usage:%s [port] ", argv[0]);

             exit(EXIT_FAILURE);

    }

    // init thread-pool

    unsigned count = 1;

    count = sysconf(_SC_NPROCESSORS_ONLN);

    pool_init(count);

    thread_pool *pool = (thread_pool*)pool_instance();

    // wait thread to run

    sleep(5);

    // thread cpu affinity

    cpu_set_t mask;

    cpu_set_t get;

    int thread_ccore = 0;

    for(thread_ccore=0; thread_ccore<count; thread_ccore++)

    {

        CPU_ZERO(&mask);

        CPU_SET(thread_ccore, &mask);

        if (pthread_setaffinity_np(pool->threadid[thread_ccore], sizeof(mask), &mask) < 0)

        {

            fprintf(stderr, "set thread affinity failed ");

        }

        CPU_ZERO(&get);

        if (pthread_getaffinity_np(pool->threadid[thread_ccore], sizeof(get), &get) < 0)

        {

            fprintf(stderr, "get thread affinity failed ");

        }

        if (CPU_ISSET(thread_ccore, &get))

        {

            printf("thread %ld is running in processor %d ", pool->threadid[thread_ccore], thread_ccore);

        }

    }

    // listen

    sfd = create_and_bind(argv[1]);

    if (-1==sfd)

    {

        abort();

    }

    s = make_socket_non_blocking(sfd);

    if (-1==s)

    {

        abort();

    }

    s = listen(sfd, SOMAXCONN);

    if (-1==s)

    {

             perror("listen");

             abort();

    }

    efd = epoll_create1(0);

    if (-1==efd)

    {

        perror("epoll_create");

        abort();

    }

    event.data.fd = sfd;

    event.events = EPOLLIN|EPOLLET;

    s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);

    if (-1==s)

    {

        perror("epoll_ctl");

        abort();

    }

   

    //buffer where events are returned

    events = calloc(MAXEVENTS, sizeof event);

    //the event loop

    while(1)

    {

        signal(SIGINT, SignalHandler);

        if (!run)

        {

            break;

        }

        int n, i;

       

        n = epoll_wait(efd, events, MAXEVENTS, -1);

        for(i=0; i<n; i++)

        {

            if ((events[i].events&EPOLLERR)||

                (events[i].events&EPOLLHUP)||

                (!(events[i].events&EPOLLIN)))

            {

                /*an error has occured on this fd, or the socet is not

                 ready for reading (whe were we notified then?) */

                fprintf(stderr, "epoll error ");

                close(events[i].data.fd);

                continue;

            }

            else if (sfd!=events[i].data.fd)

            {

                pool_add_job(handler, (void*)&(events[i].data.fd));

            }

            else

            {

                /*we have a notification on the listening socket, which

                 means one or more incoming connections*/

                while(1)

                {

                    struct sockaddr in_addr;

                    socklen_t in_len;

                    int infd;

                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;

                    infd = accept(sfd, &in_addr, &in_len);

                    if (-1==infd)

                    {

                        if ((errno==EAGAIN)||

                            (errno==EWOULDBLOCK))

                        {

                            //we have processed all incoming connections

                            break;

                        }

                        else

                        {

                            perror("accept");

                            break;

                        }

                    }

                    s = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf,

                                    sbuf, sizeof sbuf,

                                    NI_NUMERICHOST|NI_NUMERICSERV);

                    if (0==s)

                    {

                        printf("accepted connection on descriptor %d"

                                "(host=%s, port=%s) ", infd, hbuf, sbuf);

                    }

                    /*make the incoming socket non-blocking and add it to the

                     list of fds to monitor*/

                    s = make_socket_non_blocking(infd);

                    if (-1==s)

                    {

                        abort();

                    }

                    event.data.fd = infd;

                    event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;

                    s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);

                    if (-1==s)

                    {

                        perror("epoll_ctl");

                        abort();

                    }

                }

            }//else

        }

    }

    free(events);

    close(sfd);

    close(efd);

    pool_destroy();

    printf("process exit ");

}

————————————————————————————————————————————————————————————————————

本机环境,CPU八核(虚拟机)

8 工作者 线程

A: 测试

//LoadRunner

#include "lrs.h"


Action()
{
    lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:9876",  LrsLastArg);

    lr_think_time(7);

    lrs_send("socket0", "buf0", LrsLastArg);

    lrs_receive("socket0", "buf1", LrsLastArg);

    lrs_close_socket("socket0");

    return 0;
}

B: 测试

#include "lrs.h"


Action()
{
    lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:8093",  LrsLastArg);

    lr_think_time(6);

    lrs_send("socket0", "buf0", LrsLastArg);

    lrs_receive("socket0", "buf1", LrsLastArg);

    lrs_close_socket("socket0");

    return 0;
}

 Finally:

A模式:多epoll, 多线程。accept后,将socket fd分配给各个epoll fd,各个线程epoll_wait各自的epoll fd,不设置锁。

             以大多数开发者的想法,这种不设置锁的多线程应该高效。但其实不然!!!!

             首先,这个模型里,各个线程没有休眠,再有,connnect结构占用内容偏高。

     结果,造成系统响应迟钝,退出缓慢,网络吞吐并不高。

             

     

B模式:单epoll,启用工作者线程池

     大多数开发者看见了线程池有锁,就认为效率低下。其实不然!!!!

             有人分析过,内核锁的效率不是应用效率的主障碍!!!!!!!

             首先,这个模型里,cpu和内存占用极低,所有耗时都费在了应该费时的I/O上。

             结果,系统响应极快,退出正常,网络吞吐是上个模型的2.5倍

             

             

有时候,新生事务是要比老事务先进的多的。因为A模型实在2013年提出的!!!

原文地址:https://www.cnblogs.com/woodzcl/p/7987249.html