posix thread条件变量

概述

  • 等待条件变量总是返回锁住的互斥量。
  • 条件变量的作用是发送信号,而不是互斥。
  • 与条件变量相关的共享数据是“谓词”,如队列满或队列空条件。
  • 一个条件变量应该与一个谓词相关。如果一个条件变量与多个谓词相关,或者多个条件变量与一个谓词相关,有可能死锁。

主线程(Main Thread  

  1. 声明和初始化需要同步的全局数据/变量(如“count”) 
  2. 生命和初始化一个条件变量对象 
  3. 声明和初始化一个相关的互斥量 
  4. 创建工作线程A和B 

Thread A  

  • 工作,一直到一定的条件满足(如“count”等于一个指定的值) 
  •  锁定相关互斥量并检查全局变量的值 
  • 调用pthread_cond_wait()阻塞等待Thread-B的信号。注意pthread_cond_wait()能够自动地并且原子地解锁相关的互斥量,以至于它可以被Thread-B使用。 
  • 当收到信号,唤醒线程,互斥量被自动,原子地锁定。 
  • 显式解锁互斥量 
  •  继续 

Thread B  

  • 工作 
  • 锁定相关互斥量 
  • 改变Thread-A所等待的全局变量 
  • 检查全局变量的值,若达到需要的条件,像Thread-A发信号。 
  • 解锁互斥量 
  • 继续 

  

创建和销毁条件变量

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init (pthread_cond_t *cond, pthread_condattr_t *condattr);
int pthread_cond_destroy(pthread_cond_t *cond);
  • 永远不要copy一个条件变量,因为使用条件变量的备份是不可知的。不过,可以传递条件变量的指针以使不同函数和线程可以使用它来同步。
  • 为了获得最好的结果,应该将条件变量与相关的谓词“链接”在一起。
 
静态初始化条件变量:
 1 #include<pthread.h>
 2 #include "errors.h"
 3 
 4 typedef struct my_struct_tag {
 5     pthread_mutex_t mutex;
 6     pthread_cond_t cond;
 7     int value;
 8 } my_struct_t;
 9 
10 my_struct_t data = {
11     PTHREAD_MUTEX_INITIALIZER,
12     PTHREAD_COND_INITIALIZER,
13     0
14 };
15 
16 int main(int argc, char* argv[])
17 {
18     return 0;
19 }
cond_static.c
动态初始化条件变量:
 1 #include<pthread.h>
 2 #include "errors.h"
 3 
 4 typedef struct my_struct_tag {
 5     pthread_mutex_t mutex;
 6     pthread_cond_t cond;
 7     int value;
 8 } my_struct_t;
 9 
10 int main(int argc, char* argv[])
11 {
12     my_struct_t *data;
13     int status;
14 
15     data = malloc(sizeof(my_struct_t));
16     if ( data ==NULL ) {
17         errno_abort("malloc");
18     }
19 
20     /*
21      * init
22      */
23     status = pthread_mutex_init(&data->mutex, NULL);
24     if ( status != 0 ) {
25         err_abort(status, "mutex init");
26     }
27     status = pthread_cond_init(&data->cond);
28     if ( status != 0 ) {
29         err_abort(status, "cond init");
30     }
31 
32     /*
33      * destroy
34      */
35     status = pthread_cond_destroy(&data->mutex);
36     if ( status != 0 ) {
37         err_abort(status, "destroy cond");
38     }
39 
40     status = pthread_mutex_destroy(&data->mutex);
41     if ( status != 0 ) {
42         err_abort(status, "destroy mutex");
43     }
44     free(data);
45     return status;
46 }
cond_dynamic.c

 

 等待条件变量

  • 在阻塞线程之前,条件变量等待操作将解锁互斥量;而在重新返回线程之前,将再次锁住互斥量。
  • 所有并发等待同一个条件变量的线程必须指定同一个相关互斥量。
  • 任何条件变量在特定时刻只能与一个互斥量相关联,而互斥量则可以同时与多个条件变量关联。
  • 在锁住相关的互斥量之后和在等待条件变量之前,测试谓词是很重要的。
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, struct timespec* expiration)
 1 #include<time.h>
 2 #include<pthread.h>
 3 #include "errors.h"
 4 
 5 typedef struct my_struct_tag {
 6     pthread_cond_t cond;
 7     pthread_mutex_t mutex;
 8     int value;
 9 } my_struct_t;
10 
11 my_struct_t data = {
12     PTHREAD_COND_INITIALIZER,
13     PTHREAD_MUTEX_INITIALIZER,
14     0
15 };
16 
17 int hibernation = 1;
18 
19 void* wait_thread(void* arg)
20 {
21     int status;
22     sleep(hibernation);
23     status = pthread_mutex_lock(&data.mutex);
24     if (status != 0)
25         err_abort(status, "Lock mutex");
26     data.value = 1;
27     status = pthread_cond_signal(&data.cond);
28     if (status != 0)
29         err_abort(status, "Signal condition");
30     status = pthread_mutex_unlock(&data.mutex);;
31     if (status != 0)
32         err_abort(status, "Unlock mutex");
33     return NULL;
34 }
35 
36 int main(int argc, char* argv[])
37 {
38     int status;
39     pthread_t wait_thread_id;
40     struct timespec timeout;
41 
42     if (argc > 1)
43         hibernation = atoi(argv[1]);
44 
45     status = pthread_create(&wait_thread_id, NULL, wait_thread, NULL);
46     if (status != 0)
47         err_abort(status, "Create wait thread");
48 
49     timeout.tv_sec = time(NULL) + 2;
50     timeout.tv_nsec = 0;
51 
52     status = pthread_mutex_lock(&data.mutex);
53     if (status != 0)
54         err_abort(status, "Lock mutex");
55 
56     while(data.value == 0) {
57         status = pthread_cond_timedwait(&data.cond, &data.mutex, &timeout);
58         if (status == ETIMEDOUT) {
59             printf("Condition wait timed out.
");
60             break;
61         }
62         else if (status != 0)
63             err_abort(status,  "wait on condition");
64     }
65     if (data.value != 0)
66         printf("Condition was signaled.
");
67     status = pthread_mutex_unlock(&data.mutex);
68     if (status != 0)
69         err_abort(status, "Unlock mutex");
70     return 0;
71 }
cond.c

 

唤醒条件变量等待线程

int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);
  • pthread_cond_wait()阻塞调用线程直到指定的条件受信(signaled)。该函数应该在互斥量锁定时调用,当在等待时会自动解锁互斥量。在信号被发送,线程被激活后,互斥量会自动被锁定,当线程结束时,由程序员负责解锁互斥量。
  • pthread_cond_signal()函数用于向其他等待在条件变量上的线程发送信号(激活其它线程)。应该在互斥量被锁定后调用。
  • 若不止一个线程阻塞在条件变量上,则应用pthread_cond_broadcast()向其它所以线程发生信号。 
  • 在调用pthread_cond_wait()前调用pthread_cond_signal()会发生逻辑错误。
  • 使用这些函数时适当的锁定和解锁相关的互斥量是非常重要的。如:
    1. 调用pthread_cond_wait()前锁定互斥量失败可能导致线程不会阻塞。
    2. 调用pthread_cond_signal()后解锁互斥量失败可能会不允许相应的pthread_cond_wait()函数结束(保存阻塞)。 

 

Alarm最终版本

  1 #include<pthread.h>
  2 #include<time.h>
  3 #include"errors.h"
  4 
  5 typedef struct alarm_tag {
  6     struct alarm_tag *link;
  7     int seconds;
  8     time_t time;
  9     char message[64];
 10 } alarm_t;
 11 
 12 pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
 13 pthread_cond_t alarm_cond = PTHREAD_COND_INITIALIZER;
 14 alarm_t* alarm_list = NULL;
 15 time_t current_alarm = 0;
 16 
 17 /*
 18  * insert alarm entry on list, in order.
 19  */
 20 void alarm_insert(alarm_t* alarm)
 21 {
 22     int status;
 23     alarm_t **last, *next;
 24 
 25     /*
 26      * this routine requires that the caller have locker the alarm_mutex.
 27      */
 28     last = &alarm_list;
 29     next = *last;
 30     next = *last;
 31     while (next != NULL) {
 32         if (next->time >= alarm->time) {
 33             alarm->link = next;
 34             *last = alarm;
 35             break;
 36         }
 37         last = &next->link;
 38         next = next->link;
 39     }
 40     if (next == NULL) {
 41         *last = alarm;
 42         alarm->link = NULL;
 43     }
 44 #ifdef DEBUG
 45     printf("[list: ");
 46     for (next = alarm_list; next != NULL; next = next->link)
 47         printf("%d(%d)["%s"]", next->time, next->time - time(NULL), next->message);
 48     printf("]
");
 49 #endif
 50     /*
 51      * wake the alarm thread if it is not busy(
 52      * that is, if current alarm is 0, signifying that it's waiting for work),
 53      * or if the new alarm comes before the one on which the alarm thread is waiting.
 54      */
 55     if (current_alarm == 0 || alarm->time < current_alarm) {
 56         current_alarm = alarm->time;
 57         status = pthread_cond_signal(&alarm_cond);
 58         if (status != 0)
 59             err_abort(status, "Signal cond");
 60     }
 61 }
 62 
 63 void *alarm_thread(void* arg)
 64 {
 65     alarm_t* alarm;
 66     struct timespec cond_time;
 67     time_t now;
 68     int status, expired;
 69 
 70     /*
 71      * loop forever, processing commands.
 72      * the alarm thread will be disintegrated when the process exits.
 73      * lock the mutex at the start -- it while be unlocked during condi
 74      * -tion waits, so the main thread can insert alarms.
 75      */
 76     status = pthread_mutex_lock(&alarm_mutex);
 77     if (status != 0)
 78         err_abort(status, "Lock mutex");
 79     while (1) {
 80         /*
 81          * if the alarm list is enpty, wait until an alarm is added.
 82          * setting current_alarm to 0 informs the insert routine that
 83          * the trhead is not busy.
 84          */
 85         current_alarm = 0;
 86         while (alarm_list == NULL) {
 87             status = pthread_cond_wait(&alarm_cond, &alarm_mutex);
 88             if (status != 0)
 89                 err_abort(status, "wait on cond");
 90         }
 91         alarm = alarm_list;
 92         alarm_list = alarm->link;
 93         now = time(NULL);
 94         expired = 0;
 95         if (alarm->time > now) {
 96 #ifdef DEBUG
 97             printf("[waiting: %d(%d)"%s"]
",alarm->time, 
 98                     alarm->time - time(NULL), alarm->message);
 99 #endif
100             cond_time.tv_sec = alarm->time;
101             cond_time.tv_nsec = 0;
102             current_alarm = alarm->time;
103             while (current_alarm == alarm->time) {
104                 status = pthread_cond_timedwait(
105                         &alarm_cond, &alarm_mutex, &cond_time);
106                 if (status == ETIMEDOUT) {
107                     expired = 1;
108                     break;
109                 }
110                 if (status != 0)
111                     err_abort(status, "cond timedwait");
112             }
113             if (!expired)
114                 alarm_insert(alarm);
115         }
116         else
117             expired = 1;
118         if (expired) {
119             printf("(%d) %s
", alarm->seconds, alarm->message);
120             free(alarm);
121         }
122     }
123 
124 }
125 
126 int main(int argc, char* argv[])
127 {
128     int status;
129     char line[128];
130     alarm_t *alarm;
131     pthread_t thread;
132 
133     status = pthread_create(
134             &thread, NULL, alarm_thread, NULL);
135     if (status != 0)
136         err_abort(status, "create alarm thread");
137     while (1) {
138         printf("Alarm> ");
139         if (fgets(line, sizeof(line), stdin) == NULL) exit(0);
140         if (strlen(line) <= 1) continue;
141         alarm = (alarm_t*)malloc(sizeof(alarm_t));
142         if (alarm == NULL)
143             errno_abort("Allocate alarm");
144 
145         if (sscanf(line, "%d %64[^
]", &alarm->seconds, alarm->message) < 2) {
146             fprintf(stderr, "Bad command
");
147             free(alarm);
148         } else {
149             status = pthread_mutex_lock(&alarm_mutex);
150             if (status != 0)
151                 err_abort(status, "Lock mutex");
152             alarm->time = time(NULL) + alarm->seconds;
153 
154             alarm_insert(alarm);
155             status = pthread_mutex_unlock(&alarm_mutex);
156             if (status != 0)
157                 err_abort(status, "Unlock mutex");
158         }
159     }
160 }
alarm_cond.c
原文地址:https://www.cnblogs.com/licongyu/p/5055558.html