UIUC 系统编程 assignment 3 线程间信号量通信

题意是由main thread产生一个server thread 和多个 client threads
./smp3 client1.txt client2.txt client3.txt
则产生3个client threads, 每个对应一个文件,client thread会一次读文件中的一行,然后将该行信息和它的线程编号作为queue element
加入到queue中。

注意所有client 线程并行工作。

server thread每次从queue中取出一个queue element 并加入到 notice bord queue中去,同时计算出当前notice bord中一共有多少字符,作为返回
给对应 client thread的信息。

client thread等待它sever thread处理完它发出的上一行信息,并得到server thread 反馈的当前notice bord 一共有多少字符信息,打印
Server returned X to client Y after request Z. 到 Y.out 文件,
X 是notice bord 当前字符数目, Y对应该线程处理的文件名,Z对应该线程处理的当前行数,如1,2,3...
然后继续读下一行....重复工作。

main thread等待所有的client thread 处理完对应文本的所有行,然后通知server thread可以结束,sever thread 接到信息后结束。


需要注意的是很多资源如queue elemt 中 file name 指针所指的内容在client thread生成,确定内容,在server thread中free ,所以要堆内存动态分配。

char *message = (char *)malloc(256 * sizeof(char)); 
 while (fgets(message, 256, pFile_read)) {  
      ......
       message = (char *)malloc(256 * sizeof(char)); 
}
free(message);
这里注意当前行的message会在前面处理过程中被server thread free掉,所以最后一定注意要从新分配内存。     

added = 0  queue = 1  read[i] = 0
client thread
while ( can read one line)
      wait(queue)
      add_element_to_queue()
      signal(added)  // tell server has added

      signal(queue)
      wait(read[i])   //wait server to read the message,i is client num
      print(result)
  
sever thread
while(1)
      wait(added)
      wait(queue)
      if (queue is empty)
            print info , finsh running
      else
            read message from queue top , get client num iClientID, free resouces of the queue element
      signal(queue)
      push message to notice bored queue
      caclculate current total character num ,  client_result[iClientID] = char_count(); //通过全局变量返回信息给线程iClientID
      signal(read[iClientID])

main thread
      inint semophores
      inint goloal resouces
      launch sever thread, client threads
     wait client threads finish
     singnal(added)
     wait server thread finish
     free resouces main thread allocate

 1 //smp3.h
 2 #ifndef __SMP3_H
 3 #define __SMP3_H
 4 
 5 #include <stdio.h>
 6 #include <semaphore.h>
 7 #include <pthread.h>
 8 #include <errno.h>
 9 
10 sem_t queue_semaphore;
11 
12 
13 typedef struct __queue_element
14 {
15     char *message;
16     int client_id;
17     struct __queue_element *next;
18 } queue_element;
19 
20 queue_element *queue;
21 
22 
23 #endif /* __SMP3_H */

//smp3.c
 1 /*
 2 Small Machine Problem #3
 3  Spring 2009
 4 */
 5 
 6 /*Start: include appropriate header files*/
 7 // Include appropriate header files here
 8 /*End: include appropriate header files*/
 9 
10 #include "smp3.h"
11 #include "smp3.server.h"
12 #include "smp3.client.h"
13 
14 void init_semaphores(int num_clients)
15 {
16     if(sem_init(&queue_semaphore, 01== -1 ||
17        sem_init(&server_notify_semaphore, 00== -1) {
18         printf("Failed to init semaphore!\n");
19         exit(1);
20     }
21     int i;
22     for (i = 0; i < num_clients; i++) {
23         if (sem_init(&client_notify_semaphore[i], 00== -1) {
24             printf("Failed to init semaphore\n");
25             exit(1);
26         }
27     }
28 }
29 
30 void free_resources(int num_clients) 
31 {
32     int i;
33     sem_destroy(&queue_semaphore);
34     sem_destroy(&server_notify_semaphore);
35     for (i = 0; i < num_clients; i++
36         sem_destroy(&client_notify_semaphore[i]);
37 
38     free(client_notify_semaphore);
39     free(client_result);
40 }
41 
42 int main(int argc, char **argv)
43 {
44     /* Start your code */
45     if (argc == 1) {
46         printf("You should at least give one input file\n");
47         exit(1);
48     }
49     
50     int i;
51     int num_clients = argc - 1;
52     //init resouce
53     pthread_t tid_client[num_clients];
54     pthread_t tid_server; 
55     client_notify_semaphore = (sem_t *) malloc(num_clients * sizeof(sem_t));
56     client_result = (int *) malloc(num_clients * sizeof(int));
57 
58     init_semaphores(num_clients);
59 
60     //launch server thread
61     pthread_create(&tid_server, NULL, server_thread, NULL);
62     //launch num_clients client threads
63     client_info client_info[num_clients];
64     for (i = 0; i < num_clients; i++) {
65         client_info[i].client_id = i;
66         client_info[i].file_name = argv[i + 1]; 
67         pthread_create(&tid_client[i], NULL, client_thread, (void *)&client_info[i]);
68     }
69 
70     //wait all clients thread to finish
71     for (i = 0; i < num_clients; i++
72         pthread_join(tid_client[i], NULL);
73 
74     //notify server it should print info and exit,now the queue is empty
75     if (sem_post(&server_notify_semaphore) == -1) {
76             fprintf(stderr, "Main thread failed to post sever_notify_semaphore");
77             exit(1);
78     }
79 
80     //wait server finish
81     pthread_join(tid_server, NULL);
82 
83     free_resources(num_clients);
84     /* End your code */
85 
86     return 0
87 }
88 


 1 //smp3.client.h
 2 #ifndef __SMP3CLIENT_H
 3 #define __SMP3_CLIENT_H
 4 
 5 #include <semaphore.h>
 6 #include "smp3.h"
 7 
 8 sem_t *client_notify_semaphore;
 9 int *client_result;
10 
11 typedef struct __client_info
12 {
13     int client_id;
14     char *file_name;
15 } client_info;
16 
17 
18 void add_element_to_end_of_queue(queue_element *);
19 void *client_thread(void *);
20 
21 
22 #endif /* __SMP3_CLIENT_H */


  1 //smp3.client.c
  2 /*Start: include appropriate header files*/
  3 // Include appropriate header files here
  4 /*End: include appropriate header files*/
  5 
  6 #include "smp3.client.h"
  7 #include "smp3.server.h"
  8 #include "smp3.h"
  9 
 10 void add_element_to_end_of_queue(queue_element *ptr_queue_element)
 11 {
 12     /* Sanity to ensure the queue ends */
 13     ptr_queue_element->next = NULL;
 14 
 15     /* Insert into queue */
 16     if (queue == NULL)
 17         queue = ptr_queue_element;
 18     else
 19     {
 20         queue_element *thru = queue;
 21 
 22         while (thru->next != NULL)
 23             thru = thru->next;
 24 
 25         thru->next = ptr_queue_element;
 26     }
 27 }
 28 
 29 void *client_thread(void *vptr_client_info)
 30 {
 31     int my_client_id = ((client_info *)vptr_client_info)->client_id;
 32     char *my_file_name = ((client_info *)vptr_client_info)->file_name;
 33     /* Start your code */
 34     
 35     //You can safely assume that each line will contain less than 256 characters.
 36     FILE *pFile_read, *pFile_write;
 37     char *message = (char *)malloc(256 * sizeof(char)); 
 38 
 39     pFile_read = fopen (my_file_name , "r");
 40     if (pFile_read == NULL) {
 41         perror ("Error opening file");
 42         return
 43     }
 44    
 45     //the outfile name is inpufile.out
 46     char out_file_name[strlen(my_file_name) + 4];
 47     strcpy(out_file_name, my_file_name);
 48     strcat(out_file_name,".out");
 49 
 50     pFile_write = fopen(out_file_name, "w");
 51     if (pFile_write == NULL) {
 52         perror("Error writting file");
 53         return;
 54     }
 55   
 56     queue_element *p_queue_element;
 57     int request = 1;
 58     while (fgets(message, 256, pFile_read)) {  //fgets will incluce '\n'
 59         
 60         message[strlen(message) - 1= '\0'//remove the last '\n'
 61         //printf("%s\n", message);
 62         p_queue_element = (queue_element *) malloc(sizeof(queue_element));
 63         p_queue_element->client_id = my_client_id;
 64         p_queue_element->message = message;
 65         
 66         /*  acquire a lock on the queue. */
 67         if (sem_wait(&queue_semaphore) == -1) {
 68             fprintf(stderr, "The client thread failed to acquire a lock on the queue. (Error Value = %d)\n", errno);
 69             return;
 70         }
 71         
 72         add_element_to_end_of_queue(p_queue_element);
 73 
 74         //notify sever I have add element to queue
 75         if (sem_post(&server_notify_semaphore) == -1) {
 76             fprintf(stderr, "The client thread failed to post server_notify_semaphore. (Error Value = %d)\n", errno);
 77             return;
 78         }
 79 
 80         //release the queue lock
 81         if (sem_post(&queue_semaphore) == -1) {
 82             fprintf(stderr, "The client thread failed to post queue_semaphore. (Error Value = %d)\n", errno);
 83             return;
 84         }
 85         
 86         //waiting sever finish dealing with this message this particular clinet thread sent
 87         if (sem_wait(&client_notify_semaphore[my_client_id]) == -1) {
 88             fprintf(stderr, "The client thread failed to wait client_notify_semaphore . (Error Value = %d)\n", errno);
 89             return;
 90         }
 91 
 92         //print the severt returned result and then loop to deal the next line
 93         fprintf(pFile_write, "Server returned %d to client %s after request %d\n"
 94                 client_result[my_client_id], my_file_name, request++);
 95         message = (char *)malloc(256 * sizeof(char));  //do not forget this!!Since it has been freed by server here must realloc
 96 
 97     }
 98     
 99     free(message); // the last malloc need to be freed
100     /* End your code */
101     
102     return NULL;
103 }
104 
105 


 1 //smp3.server.h
 2 #ifndef __SMP3_SERVER_H
 3 #define __SMP3_SERVER_H
 4 
 5 #include <semaphore.h>
 6 
 7 sem_t server_notify_semaphore;
 8 
 9 typedef struct __notice
10 {
11     char *message;
12     int client_id;
13     struct __notice *next;
14 } notice;
15 
16 notice *start_notice;
17 
18 void add_element_to_end_of_notice(notice *);
19 void free_notice_memory();
20 void print_notice();
21 int my_strlen(const char *);
22 int char_count();
23 void *server_thread(void *);
24 
25 
26 #endif /* __SMP3_SERVER_H */


  1 //smp3.server.c
  2 /*
  3 Small Machine Problem #3
  4  Spring 2009
  5 */
  6 
  7 /*Start: include appropriate header files*/
  8 // Include appropriate header files here
  9 /*End: include appropriate header files*/
 10 
 11 #include "smp3.server.h"
 12 #include "smp3.client.h"
 13 #include "smp3.h"
 14 
 15 void add_element_to_end_of_notice(notice *ptr_notice)
 16 {
 17     /* Sanity to ensure the notice queue ends */
 18     ptr_notice->next = NULL;
 19 
 20     /* Insert into queue */
 21     if (start_notice == NULL)
 22         start_notice = ptr_notice;
 23     else
 24     {
 25         notice *thru = start_notice;
 26 
 27         while (thru->next != NULL)
 28             thru = thru->next;
 29 
 30         thru->next = ptr_notice;
 31     }
 32 }
 33 
 34 void free_notice_memory()
 35 {
 36     notice *thru = start_notice;
 37     while(thru)
 38     {
 39         notice *temp = thru;
 40         thru = thru->next;
 41         free(temp->message);
 42         free(temp);
 43     }
 44 }
 45 void print_notice()
 46 {
 47     notice *thru = start_notice;
 48     while(thru) 
 49     {
 50         fprintf(stderr, "CLIENT %d> %s\n", thru->client_id, thru->message);
 51         thru = thru->next;
 52     }
 53 }
 54 
 55 int my_strlen(const char *str)
 56 {
 57     if(*str == '\0')
 58     {
 59         return 0;
 60     }
 61     return 1 + my_strlen(str + 1);
 62 }
 63 
 64 int char_count()
 65 {
 66     notice *thru = start_notice;
 67     int count = 0;
 68     while(thru) 
 69     {
 70         count += my_strlen(thru->message);
 71         thru = thru->next;
 72     }
 73     return count;
 74 }
 75 
 76 void *server_thread(void *vptr)
 77 {
 78 
 79     while (sem_wait(&server_notify_semaphore) == 0)
 80     {
 81         /* 1. As soon as the server has been notified that there is data in its queue, it will
 82            acquire a lock on the queue. */
 83 
 84         /*  acquire a lock on the queue. */
 85         if (sem_wait(&queue_semaphore) == -1)
 86         {
 87             fprintf(stderr, "The server thread failed to acquire a lock on the queue. (Error Value = %d)\n", errno);
 88             free_notice_memory();
 89             exit(1);
 90         }
 91 
 92         /*  NULL check the queue for exit (and sanity). */
 93         if (queue == NULL)
 94         {
 95             print_notice();
 96             free_notice_memory();
 97             return NULL;
 98         }
 99 
100         /*  store the information needed from the queue for processing. */
101         int iClientID = queue->client_id;
102         char *message = (char *)malloc(strlen(queue->message) + 1);
103         strcpy(message, queue->message);
104         
105         /*  remove the element from the queue. */
106         queue_element *ptrElementToFree = queue;
107         free(queue->message);
108         queue = queue->next;
109         free(ptrElementToFree);
110 
111         /*  finally, release the lock on the queue. */
112         if (sem_post(&queue_semaphore) == -1)
113         {
114             fprintf(stderr, "The server thread failed to release the lock on the queue. (Error Value = %d)\n", errno);
115             free_notice_memory();
116             exit(1);
117         }
118 
119 
120         /* 2. Insert data into the notice queue */
121         notice *ptr_notice = (notice *)malloc(sizeof(notice));
122         ptr_notice->client_id = iClientID;
123         ptr_notice->message = (char *)malloc(strlen(message) + 1);
124         strcpy(ptr_notice->message, message);
125         add_element_to_end_of_notice(ptr_notice);
126         
127         free(message);
128         
129         /* 3. Store the result of the processed data in the client's queue. */
130 
131         /*  update the child's communications value. */
132         client_result[iClientID] = char_count();
133 
134         /*  and, notify the client that data is waiting. */
135         if (sem_post(&client_notify_semaphore[iClientID]) == -1)
136         {
137             fprintf(stderr, "The server thread failed to release post to a client notify queue. (Client ID = %d, Error Value = %d)\n", iClientID, errno);
138             free_notice_memory();
139             exit(1);
140         }
141 
142     }
143     print_notice();
144     free_notice_memory();
145     return NULL;
146 }
147 

#Makefile
CC = gcc
INC = -I.
FLAGS = -W -Wall
LIBS = -lpthread

OBJECTS = smp3.client.o smp3.server.o smp3.o


all: smp3

smp3: $(OBJECTS)
 $(CC) $(LIBS) $(OBJECTS) -o $@

smp3.client.o: smp3.client.c smp3.client.h
 $(CC) $(INC) $(FLAGS) -c smp3.client.c -o $@

smp3.server.o: smp3.server.c smp3.server.h
 $(CC) $(INC) $(FLAGS) -c smp3.server.c -o $@

smp3.o: smp3.c smp3.h
 $(CC) $(INC) $(FLAGS) -c smp3.c -o $@

clean:
 $(RM) *.o smp3



allen:~/study/system_programming/uiuc_assignment/smp3$ valgrind --tool=memcheck --leak-check=full  --show-reachable=yes ./smp3 client1.txt client2.txt client3.txt
==25025== Memcheck, a memory error detector.
==25025== Copyright (C) 2002-2007, and GNU GPL'd, by Julian Seward et al.
==25025== Using LibVEX rev 1804, a library for dynamic binary translation.
==25025== Copyright (C) 2004-2007, and GNU GPL'd, by OpenWorks LLP.
==25025== Using valgrind-3.3.0-Debian, a dynamic binary instrumentation framework.
==25025== Copyright (C) 2000-2007, and GNU GPL'd, by Julian Seward et al.
==25025== For more details, rerun with: -v
==25025==
CLIENT 0> Hi all!
CLIENT 1> Hello, World!
CLIENT 1> I am client2!
CLIENT 0> I am client1!
CLIENT 0> I do nothing ..
CLIENT 0> Happy valentines day.
CLIENT 0> I can program in C.
CLIENT 0> I can write semaphores too!
CLIENT 0> Bye, everyone.
CLIENT 2> Hi all!
CLIENT 1> Done.
CLIENT 2> I am client3!
CLIENT 2> I do nothing...
CLIENT 2> Done.
CLIENT 2> Huh, I'm not really done :)
==25025==
==25025== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 13 from 1)
==25025== malloc/free: in use at exit: 2,112 bytes in 6 blocks.
==25025== malloc/free: 90 allocs, 84 frees, 8,142 bytes allocated.
==25025== For counts of detected errors, rerun with: -v
==25025== searching for pointers to 6 not-freed blocks.
==25025== checked 94,164 bytes.
==25025==
==25025== 2,112 bytes in 6 blocks are still reachable in loss record 1 of 1
==25025==    at 0x4022AB8: malloc (vg_replace_malloc.c:207)
==25025==    by 0x40ADD2E: (within /lib/tls/i686/cmov/libc-2.7.so)
==25025==    by 0x40ADDFB: fopen (in /lib/tls/i686/cmov/libc-2.7.so)
==25025==    by 0x804891E: client_thread (in /home/allen/study/system_programming/uiuc_assignment/smp3/smp3)
==25025==    by 0x40404FA: start_thread (in /lib/tls/i686/cmov/libpthread-2.7.so)
==25025==    by 0x4129E5D: clone (in /lib/tls/i686/cmov/libc-2.7.so)
==25025==
==25025== LEAK SUMMARY:
==25025==    definitely lost: 0 bytes in 0 blocks.
==25025==      possibly lost: 0 bytes in 0 blocks.
==25025==    still reachable: 2,112 bytes in 6 blocks.
==25025==         suppressed: 0 bytes in 0 blocks.

allen:~/study/system_programming/uiuc_assignment/smp3$ ./smp3 client1.txt client2.txt client3.txt
CLIENT 2> Hi all!
CLIENT 2> I am client3!
CLIENT 2> I do nothing...
CLIENT 1> Hello, World!
CLIENT 0> Hi all!
CLIENT 2> Done.
CLIENT 2> Huh, I'm not really done :)
CLIENT 1> I am client2!
CLIENT 0> I am client1!
CLIENT 1> Done.
CLIENT 0> I do nothing ..
CLIENT 0> Happy valentines day.
CLIENT 0> I can program in C.
CLIENT 0> I can write semaphores too!
CLIENT 0> Bye, everyone.
allen:~/study/system_programming/uiuc_assignment/smp3$ more client1.txt.out
Server returned 55 to client client1.txt after request 1
Server returned 113 to client client1.txt after request 2
Server returned 133 to client client1.txt after request 3
Server returned 154 to client client1.txt after request 4
Server returned 173 to client client1.txt after request 5
Server returned 200 to client client1.txt after request 6
Server returned 214 to client client1.txt after request 7
原文地址:https://www.cnblogs.com/rocketfan/p/1535876.html