Linux基础——多线程实现任务

这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。

声明代码如下:

 1 #ifndef _HEAD_H
 2 #define _HEAD_H
 3 #include <stdio.h>
 4 #include <stdlib.h>
 5 #include <unistd.h>
 6 #include <string.h>
 7 #include <sys/stat.h>
 8 #include <sys/select.h>
 9 #include <sys/types.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <sys/time.h>
13 #include <signal.h>
14 #define MSG_LEN 1024
15 #define TASK_CNT 1024
16 extern pthread_mutex_t lock;
17 extern pthread_cond_t cond1;
18 extern pthread_cond_t cond2;
19 typedef struct tag_fd
20 {
21     int s_rfd;
22     int s_wfd;
23     struct tag_fd *next;
24 }FD,*pFD;
25 typedef struct tag_task
26 {
27     char s_msg[MSG_LEN];
28     int s_fd;
29 }TASK,*pTASK;
30 typedef struct tag_que
31 {
32     TASK arr[TASK_CNT+1];
33     int front;
34     int tail;
35 }QUEUE,*pQUEUE;
36 void fd_insert(pFD *phead,int rfd,int wfd);
37 void fd_init(pFD *phead);
38 int  fd_find(pFD phead,int rfd);
39 void fd_del(pFD *phead,int rfd);
40 void add_task(pQUEUE pq,pTASK pt);
41 void get_task(pQUEUE pq,pTASK pt);
42 void excute_task(pTASK pt);
43 #endif
View Code

我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
实现代码如下:

 1 #include "head.h"
 2 void fd_init(pFD *phead)
 3 {
 4     *phead= NULL;
 5 }
 6 void fd_insert(pFD *phead,int rfd,int wfd)
 7 {
 8     pFD pnew = (pFD )calloc(1,sizeof(FD));
 9     pnew->s_rfd=rfd;
10     pnew->s_wfd=wfd;
11     pnew->next = *phead;
12     *phead = pnew;
13 }
14 int fd_find(pFD phead,int rfd)
15 {
16     while(phead)
17     {
18         if(phead->s_rfd==rfd)
19             break;
20         else
21             phead = phead->next;
22     }
23     if(phead == NULL)
24         return -1;
25     else
26         return phead->s_wfd;
27 }
28 
29 void fd_del(pFD *phead,int rfd)
30 {
31     pFD pcur,ppre;
32     pcur=*phead;
33     ppre=NULL;
34     while(pcur)
35     {
36         if(pcur->s_rfd == rfd)
37             break;
38         else
39         {
40             ppre=pcur;
41             pcur = pcur ->next;
42         }
43     }
44     if(ppre==NULL)
45     {
46         *phead=pcur->next;
47         free(pcur);
48         pcur=NULL;
49     }
50     else
51     {
52         ppre->next=pcur->next;
53         free(pcur);
54         pcur=NULL;
55     }
56 }
View Code

然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。

实现代码如下:

 1 #include "head.h"
 2 static int que_empty(pQUEUE pq)
 3 {
 4     return pq->front == pq->tail;
 5 }
 6 static int que_full(pQUEUE pq)
 7 {
 8     return (pq->tail+1)%(TASK_CNT+1)==pq->front;
 9 }
10 static int que_cnt(pQUEUE pq)
11 {
12     return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1);
13 }
14 void add_task(pQUEUE pq ,pTASK pt)
15 {
16     pthread_mutex_lock(&lock);
17     while(que_full(pq))
18         pthread_cond_wait(&cond1,&lock);
19     pq->arr[pq->tail]=*pt;
20     pq->tail = (pq->tail+1)%(TASK_CNT+1);
21     if(que_cnt(pq)==1)
22         pthread_cond_broadcast(&cond2);
23     printf("添加了一个任务!!
");
24     pthread_mutex_unlock(&lock);
25 }
26 void get_task(pQUEUE pq ,pTASK pt)
27 {
28     pthread_mutex_lock(&lock);
29     while(que_empty(pq))
30         pthread_cond_wait(&cond2,&lock);
31     *pt=pq->arr[pq->front];
32     pq->front = (pq->front+1)%(TASK_CNT+1);
33     if(que_cnt(pq)== TASK_CNT -1)
34         pthread_cond_broadcast(&cond1);
35     printf("获得了一个任务!!
");
36     pthread_mutex_unlock(&lock);
37 }
38 
39 
40 void excute_task(pTASK pt)
41 {
42     char buf[1024];
43     memset(buf,0,1024);
44     strcpy(buf,pt->s_msg);
45     int index;
46     for(index=0;index < strlen(buf);index++)
47         buf[index]=toupper(buf[index]);
48     buf[index]='';
49     write(pt -> s_fd,buf,strlen(buf));
50 }
View Code

最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。

服务器实现代码如下:

  1 #include "head.h"
  2 pthread_mutex_t lock;
  3 pthread_cond_t cond1,cond2;
  4 void* hand(void* arg)
  5 {
  6     pthread_detach(pthread_self());
  7     TASK task;
  8     pQUEUE pq = (pQUEUE)arg;
  9     while(1)
 10     {
 11         get_task(pq,&task);
 12         excute_task(&task);
 13         sleep(1);
 14     }
 15 }
 16 int main(int argc,char *argv[])
 17 {
 18     if(argc != 3)
 19     {
 20         perror("参数错误!!
");
 21         exit(1);
 22     }
 23     signal(SIGINT,SIG_IGN);
 24     signal(SIGPIPE,SIG_IGN);
 25     signal(SIGQUIT,SIG_IGN);
 26     QUEUE que;
 27     int fd;
 28     fd_set read_set,revc;
 29     pFD list;
 30     memset(&que,0,sizeof(QUEUE));
 31     fd_init(&list);
 32     int cnt = atoi(argv[2]);
 33     pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t));
 34     pthread_mutex_init(&lock,NULL);
 35     pthread_cond_init(&cond1,NULL);
 36     pthread_cond_init(&cond2,NULL);
 37     int index=0;
 38     while(cnt > 0)
 39     {
 40         pthread_create(arr+index,NULL,hand,(void*)&que);
 41         cnt--;
 42         index++;
 43     }
 44     fd = open(argv[1],O_RDONLY);
 45     if(fd == -1)
 46     {
 47         perror("管道打开失败!!
");
 48         exit(1);
 49     }
 50     struct timeval tm;
 51     int ret;
 52     FD_ZERO(&read_set);
 53     FD_ZERO(&revc);
 54     FD_SET(fd,&read_set);
 55     while(1)
 56     {
 57         tm.tv_sec=0;
 58         tm.tv_usec=1000;
 59         revc = read_set;
 60         ret=select(1024,&revc,NULL,NULL,&tm);
 61         if(ret == 0)
 62             continue;
 63         else if(ret > 0)
 64         {
 65             if(FD_ISSET(fd,&revc))
 66             {
 67                 char buf[32];
 68                 memset(buf,0,32);
 69                 if(read(fd,buf,32)==0)
 70                     continue;
 71                 else
 72                 {
 73                     char name[32];
 74                     int r_fd,w_fd;
 75                     buf[strlen(buf)-1]='';
 76                     memset(name,0,32);
 77                     sprintf(name,"r.%s",buf);
 78                     w_fd=open(name,O_WRONLY);
 79                     memset(name,0,32);
 80                     sprintf(name,"w.%s",buf);
 81                     r_fd=open(name,O_RDONLY);
 82                     fd_insert(&list,r_fd,w_fd);
 83                     FD_SET(r_fd,&read_set);
 84                 }
 85             }
 86         }
 87         pFD pcur=list;
 88         while(pcur)
 89         {
 90             if(FD_ISSET(pcur->s_rfd,&revc))
 91             {
 92                 char buf[1024];
 93                 memset(buf,0,1024);
 94                 if(read(pcur->s_rfd,buf,1024)==0)
 95                 {
 96                     FD_CLR(pcur->s_rfd,&read_set);
 97                     int i=pcur->s_rfd;
 98                     pcur=pcur->next;
 99                     fd_del(&list,i);
100                 }
101                 else
102                 {
103                     TASK tk;
104                     memset(&tk,0,sizeof(TASK));
105                     tk.s_fd=pcur->s_wfd;
106                     strcpy(tk.s_msg,buf);
107                     add_task(&que,&tk);
108                     pcur=pcur->next;
109                 }
110             }
111             else
112                 pcur=pcur->next;
113         }
114     }
115     pthread_mutex_destory(&lock);
116     pthread_cond_destory(&cond1);
117     pthread_cond_destory(&cond2);
118     return 0;    
119 }
View Code

客户端实现代码如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <fcntl.h>
 4 #include <unistd.h>
 5 #include <string.h>
 6 #include <sys/stat.h>
 7 #include <sys/types.h>
 8 int main(int argc,char *argv[])
 9 {
10     int fd_server,send,revc;
11     char rname[32],wname[32];
12     memset(rname,0,32);
13     memset(wname,0,32);
14     sprintf(rname,"r.%d",getpid());
15     sprintf(wname,"w.%d",getpid());
16     mkfifo(rname,0666);
17     mkfifo(wname,0666);
18     fd_server=open(argv[1],O_WRONLY);
19     char msg[1024];
20     memset(msg,0,1024);
21     sprintf(msg,"%d
",getpid());
22     write(fd_server,msg,strlen(msg));
23     revc=open(rname,O_RDONLY);
24     send=open(wname,O_WRONLY);
25     while(memset(msg,0,1024),fgets(msg,1024,stdin))
26     {
27         write(send,msg,strlen(msg));
28         memset(msg,0,1024);
29         read(revc,msg,1024);
30         write(1,msg,strlen(msg));
31     }
32     close(fd_server);
33     close(send);
34     close(revc);
35     unlink(rname);
36     unlink(wname);
37     return 0;
38 }
View Code
原文地址:https://www.cnblogs.com/gjn135120/p/4009313.html