基于消息队列的UDP并发服务器v1

UDP是无状态的,无法用TCP一样的并发服务器。我们可以用消息队列的方式模拟下。

首先,我们看消息队列节点

typedef struct msg_buf
{
    int sockfd;
    struct sockaddr_in their_addr;  /* 连接对方的地址信息 */
    int sin_size;
    char buf[BUFF_SIZE];    
    size_t len;
    struct msg_buf *next;
}msgbuf_t;

关于分配与释放的接口,比较习惯这样的方式了

msgbuf_t *get_msgbuf()
{
    return (msgbuf_t *)malloc(sizeof(struct msg_buf));
}
void put_msgbuf(msgbuf_t *msg)
{
    free(msg);
}

服务线程相关,只有一个线程

struct msg_buf *msg_mbox;
sem_t recv_sem;
pthread_mutex_t recv_mutex;
pthread_t recv_pid;

上述参数的初始化

void server_init()
{
    sem_init(&recv_sem,0,0);
    pthread_mutex_init(&recv_mutex,NULL);
    pthread_mutex_init(&send_mutex,NULL);
    pthread_create(&recv_pid,NULL,recv_thread,NULL);

}

还有个void server_destroy()处理方法。

通过atexit在main中注册。


消息队列发送

void msg_post(struct msg_buf *msg)
{
    pthread_mutex_lock(&recv_mutex);
    if(msg_mbox)
    {
        //链表插入操作
        msg->next = msg_mbox->next;
        msg_mbox->next = msg;
    }else{
        msg_mbox = msg;
        msg->next = NULL;
    }
    pthread_mutex_unlock(&recv_mutex);
    sem_post(&recv_sem);
}

最后调用

sem_post通知阻塞线程处理。

处理接收数据包线程——线程只是是实现了echo功能。

View Code
void *recv_thread(void *arg)
{
    static int cnt = 0;
    while(1)
    {
        sem_wait(&recv_sem); 
        pthread_mutex_lock(&recv_mutex);
        struct msg_buf *msg = msg_mbox;
        msg_mbox = msg->next;
        pthread_mutex_unlock(&recv_mutex);

        //处理msg消息
        int retval = sendto(msg->sockfd, msg->buf,msg->len, 0,
                        (struct sockaddr *)&(msg->their_addr), msg->sin_size);
        printf("Received %d from %s\n%s\n",++cnt,inet_ntoa((msg->their_addr).sin_addr),msg->buf);
        put_msgbuf(msg);
    }
}

main中循环

View Code
int main(int argc,char *argv[])
{

    int sockfd;                     /* 数据端口 */
    struct sockaddr_in my_addr;     /* 自身的地址信息 */
   
    int  retval;
    if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    }
    /* 设置地址可复用 */
    int option = 1;
    setsockopt( sockfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option) );

    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(MYPORT); /* 网络字节顺序 */
    my_addr.sin_addr.s_addr = INADDR_ANY; /* 自动填本机IP */
    bzero(&(my_addr.sin_zero), 8); /* 其余部分置0 */

    if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {
        perror("bind");
        exit(1);
    }

    /**
     * 其它初始化 
     * */

    server_init();

    /* 主循环 */
    while(1) {
        struct msg_buf     *recvmsg = get_msgbuf();
        size_t len = sizeof(recvmsg->buf);
        char *buf  = recvmsg->buf;
        struct sockaddr_in their_addr; 
        int sin_size;
        memset(buf,len,0);

        retval = recvfrom(sockfd, buf, len, 0,
                          (struct sockaddr *)&their_addr, &sin_size);
        printf("%s\t%s\n",inet_ntoa(their_addr.sin_addr),buf);
        if (retval == 0) {
            perror ("recvfrom");
            close(sockfd);
            break;
        }//
        //封装消息
        recvmsg->their_addr = their_addr;
        recvmsg->sin_size = sin_size;
        recvmsg->sockfd = sockfd;
        recvmsg->len = retval;
        //发送给某消息处理
        msg_post(recvmsg);
    }
    return 0;
}

Over!

原文地址:https://www.cnblogs.com/westfly/p/2446461.html