Socket通信过程中环形缓冲区应用

  实验室的项目需求,需要做一个转发服务器,简单来说服务器由客户端A接收数据,转发给客户端B,可以有多个A与多个B存在,他们之间存在一个对应关系,主要是通过查询数据库来得到对应的关系,这里不细说。但是当客户端A的数量到达100个时,B接收到的数据会有延迟,这里说一下A每秒发送的数据量在256B左右。通过netstat命令,发现数据大量缓存在服务器的接收缓冲区中。

  经过网上的查询,发现瓶颈在于:我用同一个线程去接收数据与转发数据(用epoll实现,通过epoll获得读数据事件,然后读取数据转发)。但是这样效率不高,应该用两个线程来实现,一个线程用来转发数据,另一个用来接收数据,中间通过一个缓冲区来进行沟通。这个模型就与我们操作系统中学习的消费者和生产者模式相似。

  由于以前也没有写过环形缓冲区的例子,这里先用一个小的实例来做实验,实例中同样包括服务器和客户端。服务器一个线程用来接收数据放入缓冲区,一个线程用来读取缓冲区中的数据并进行打印。

  至于环形缓冲区的代码,我是参考先人的代码,在此基础上加以应用,先人代码网址如下:

  http://www.cnblogs.com/iyjhabc/p/3143264.html

  先人的代码写的真心不错,代码风格也很规范,将条件同步应用到了缓冲区的同步问题中。

  现在奉上在下的代码:

  这是服务器端的代码,其中有两个线程,一个接收数据,另一个打印接收的数据。

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<string.h>
#include<netinet/in.h>
#include<pthread.h>

#include"ytsSocket.h"

#define SERV_IP "127.0.0.1"
#define CLIENT_PORT 6666
#define MONITOR_PORT 5555

int Bind_socket();
void *Client_Service(void*);
void *Output_Service(void*);

struct prodcons buffer;         /* 全局的环形缓冲区 */

int main(int argc, char *argv[])
{
    
    int clifd,clientfd;
    socklen_t clilen;
    pthread_t ntid;
    int err;
    char buf[32];

    // struct prodcons buffer;         /* 全局的环形缓冲区 */
    
    /* 初始化全局的缓冲区 */
    init_buffer(&buffer);

    struct sockaddr_in cliaddr;
    if((clifd=Bind_socket())==-1) /* 绑定套接字 */
    {
        printf ("创建套接字失败
");
        pthread_exit((void*)-1);
    }
    if((err=pthread_create(&ntid,NULL,Output_Service,NULL))!=0)
        {
            printf("can not create thread:%s
",strerror(err));
            return -1;
        }
    
    for( ; ; )
    {

        // printf ("waiting for connect...
");
         clilen=sizeof(cliaddr);
         if((clientfd=accept(clifd,(struct sockaddr*)&cliaddr,&clilen))<0)
         {
             perror("accept");
             pthread_exit((void*)-1);
         }
         inet_ntop(AF_INET,&cliaddr.sin_addr,buf,sizeof(buf));
         printf ("Recieve connection from %s
",buf);
        
        
         if((err=pthread_create(&ntid,NULL,Client_Service,(void*)&clientfd))!=0)
        {
            printf("can not create thread:%s
",strerror(err));
            return -1;
        }

        

        int rst;
    }
    
    
    
    return 0;
}

/* 为客户端服务的线程 */
void* Client_Service(void*arg)
{
    int clientfd;
    
    clientfd=*(int *)arg;

         int rstn;
         
         for( ;  ; )
         {
             rstn=receive_message(clientfd,&buffer);
             
             if(rstn==-1)
             {
                 close(clientfd);
                 pthread_exit((void*)-1);
                 
             }else if(rstn==0)
             {
                 close(clientfd);
                 printf ("peer close connection
");
                 pthread_exit((void*)-1);
             }
             else
             {
                 // buffer[rstn]=0;
                 //printf("receive from client data: %s
",buffer);
             }

             sleep(1);
         }
    
    
   
}

/* 建立套接字 */
int Bind_socket()
{
    int clifd;

    socklen_t clilen;
    
    struct sockaddr_in servaddr_cli,servaddr_mon,cliaddr;
    
    if((clifd=socket(AF_INET,SOCK_STREAM,0))<0)
    {
         perror("socket");
         return -1;
    }
       

    bzero(&servaddr_cli,sizeof(servaddr_cli));
    servaddr_cli.sin_family=AF_INET;
    servaddr_cli.sin_port=htons(CLIENT_PORT);
    servaddr_cli.sin_addr.s_addr=htonl(INADDR_ANY);

   

    if( bind(clifd,(struct sockaddr*)&servaddr_cli,sizeof(servaddr_cli))<0)
    {
        perror("bind");
        return -1;
    }

    if(listen(clifd,5)<0)
    {
        perror("listen");
        return -1;
    }

    return clifd;
}

void* Output_Service(void*arg)
{

    for( ; ;)
    {
        print_buffer(&buffer);
    }
    
}

  客户端代码,客户端十分简单,就是不断向服务器发送数据:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<string.h>
#include<netinet/in.h>
#include<pthread.h>
#include<unistd.h>

#include "ytsSocket.h"
#define SERV_IP "127.0.0.1"
#define CLIENT_PORT 6666
#define MONITOR_PORT 5555


int main(int argc, char *argv[])
{
    int clifd;
    struct sockaddr_in servaddr;

    

    if( (clifd=socket(AF_INET,SOCK_STREAM,0))<0)
        perror("socket");
    
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family=AF_INET;
    servaddr.sin_port=htons(CLIENT_PORT);
    inet_pton(AF_INET,SERV_IP,&servaddr.sin_addr);

    

    if(connect(clifd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
        perror("connect");
    
    char buffer[32];
    int i,rstn;
    for( i=0;i<10;i++)
    {
        snprintf(buffer,sizeof(buffer),"Data Id is : %d",i);
        printf ("%s
",buffer);
        
        /* if((rstn=write(clifd,buffer,strlen(buffer)))<0) */
        /*     perror("write"); */
        SendMessageClient(clifd,buffer,strlen(buffer));
        usleep(1000000);
    }
    
    return 0;
}

  环形缓冲区主要代码和socket通信粘包问题处理的代码:

#include"ytsSocket.h"


/* 向对端发送数据,由共享缓冲区中读取数据 */
int SendMessage(int socketFd,char * sendBuffer)
{
    

    char len[5];
    int sendLen=strlen(sendBuffer);
    snprintf(len,sizeof(len),"%04d",sendLen);
    writen(socketFd,len,sizeof(len)-1);

    int sendRst=writen(socketFd,sendBuffer,sendLen);
    return sendRst;
}

int SendMessageClient(int socketFd,char * sendBuffer,int sendLen)
{
    char len[5];
    snprintf(len,sizeof(len),"%04d",sendLen);
    //printf("len:%s
",len);
    if(send(socketFd,len,sizeof(len)-1,0)<0)
    {
        perror("send :");
        return -1;
    }

    int sendRecv;
    if((sendRecv=send(socketFd,sendBuffer,sendLen,0))<0)
    {
        perror("send sendBuffer:");
        return -1;
    }

    return sendRecv;
}



int ReceiveMessage(int sockFd,char recvMsg[])
{
    
    char msgSize[5];
    int sizeLen=sizeof(msgSize);
    readn(sockFd,msgSize,sizeLen-1);
    msgSize[sizeLen-1]=0;
    int msgLen=atoi(msgSize);
    
    int rcvLen=readn(sockFd,recvMsg,msgLen);
    if(rcvLen>0)
    {
        recvMsg[rcvLen]=0;      /* 如果成功接收数据,在数据结尾后加入‘0/’ */
    }
    return rcvLen;
        
}

int readn(int fd,void *vptr,int n)
{
    int nleft;
    int nread;
    char *ptr;
    ptr=(char*)vptr;
    nleft=n;
    while(nleft>0)
    {
        if( (nread=read(fd,ptr,nleft))<0)
        {
            if(errno==EINTR)//read again
            {
                nread=0;
            }
            else
            {
                
                return -1;
                
                //pthread_exit((void*)0);//end thread
            }
        }
        else if (nread==0)//receive FIN
        {
            close(fd);   //close socket
            //pthread_exit((void*)0);//end thread
            return 0;
        }
            

        nleft-=nread;
        ptr+=nread;
    }

    return (n-nleft);
}

int writen(int fd,const void * vptr,int n)
{
    int nleft;
    int nwritten;
    const char *ptr;

    ptr=(char*)vptr;
    nleft=n;
    while(nleft>0)
    {
        if( (nwritten=write(fd,ptr,nleft))<=0)
        {
            if(nwritten<0 && errno==EINTR)
                nwritten=0;
            else
            {
                return -1;
                //pthread_exit((void*)0);//end thread
            }
                
            
        }

        nleft-=nwritten;
        ptr+=nwritten;
    }
}

/* 接收数据,并向共享缓冲区中写入数据 */
int receive_message(int fd,struct prodcons*buffer)
{
    // printf ("recevie message
");
    int rstn;                   /* 接收数据 */
    pthread_mutex_lock(&buffer->lock);
    //printf ("size:%d
",buffer->size);
    /* 等待缓冲区未满 */
    while(buffer->size==BUFFER_SIZE)
    {
        pthread_cond_wait(&buffer->notfull,&buffer->lock);
    }

    if((rstn=ReceiveMessage(fd,buffer->buffer[buffer->writepos]))<=0)
    {
        pthread_mutex_unlock(&buffer->lock);
        return rstn;
    }
    else
    {
        buffer->writepos++;

        if(buffer->writepos>=BUFFER_SIZE)
        {
            buffer->writepos=0;
        }
        buffer->size++;
        pthread_cond_signal(&buffer->notempty);
        pthread_mutex_unlock(&buffer->lock);

        return rstn;
    }
    
        
}

/* 发送数据,读共享缓冲区 */
int send_message(int fd,struct prodcons *buffer)
{
    int rstn;                   /* 记录发送数据的个数 */
    pthread_mutex_lock(&buffer->lock);

    while(buffer->size==0)
    {
        pthread_cond_wait(&buffer->notempty,&buffer->lock);
    }

    /* 读取缓冲区中的数据,进行发送 */
    if((rstn=SendMessage(fd,buffer->buffer[buffer->readpos]))<=0)
    {
        pthread_mutex_unlock(&buffer->lock);
        return rstn;
    }  
    else
    {
        buffer->readpos++;
        if(buffer->readpos>=BUFFER_SIZE)
        {
            buffer->readpos=0;
        }
        buffer->size--;
        
        pthread_cond_signal(&buffer->notfull);
        pthread_mutex_unlock(&buffer->lock);
        return rstn;
    }
}

/* 初始化缓冲区相关数据 */
void init_buffer(struct prodcons*buffer)
{
    
    pthread_mutex_init(&buffer->lock,NULL);
    pthread_cond_init(&buffer->notempty,NULL);
    pthread_cond_init(&buffer->notfull,NULL);
    buffer->readpos=0;
    buffer->writepos=0;
    buffer->size=0;
        
}

//输出共享buffer中的信息
void print_buffer(struct prodcons*buffer)
{
                      /* 记录发送数据的个数 */
    pthread_mutex_lock(&buffer->lock);

    while(buffer->size==0)
    {
        pthread_cond_wait(&buffer->notempty,&buffer->lock);
    }

    /* 读取缓冲区中的数据,进行发送 */
    printf ("%s
",buffer->buffer[buffer->readpos]);
    buffer->readpos++;
    if(buffer->readpos>=BUFFER_SIZE)
    {
       buffer->readpos=0;
    }
    
    buffer->size--;

    pthread_cond_signal(&buffer->notfull);
    pthread_mutex_unlock(&buffer->lock);
    
}

下面是运行结果:

我编译了两个客户端,同时向服务器发送数据,一个发送Data Number is [0-9] ,另一个发送Data Id is [0-9];

服务器的输出如下:

[yts@ytsCentos ringBuffer]$ ./main
Recieve connection from 127.0.0.1
Data Id is : 0
Recieve connection from 127.0.0.1
Data Number is : 0
Data Id is : 1
Data Number is : 1
Data Id is : 2
Data Number is : 2
Data Id is : 3
Data Number is : 3
Data Id is : 4
Data Number is : 4
Data Id is : 5
Data Number is : 5
Data Id is : 6
Data Number is : 6
Data Id is : 7
Data Number is : 7
Data Id is : 8
Data Number is : 8
Data Id is : 9
Data Number is : 9
peer close connection
peer close connection

客户端输出:

[yts@ytsCentos ringBuffer]$ ./client
Data Number is : 0
Data Number is : 1
Data Number is : 2
Data Number is : 3
Data Number is : 4
Data Number is : 5
Data Number is : 6
Data Number is : 7
Data Number is : 8
Data Number is : 9
[yts@ytsCentos ringBuffer]$ ./client1
Data Id is : 0
Data Id is : 1
Data Id is : 2
Data Id is : 3
Data Id is : 4
Data Id is : 5
Data Id is : 6
Data Id is : 7
Data Id is : 8
Data Id is : 9

达到了预期的效果,下阶段的目标是把环形缓冲区应用到服务器中,希望一且顺利。。。

源代码:

ringBuffer.zip

原文地址:https://www.cnblogs.com/yts1dx/p/3418443.html