使用互斥量和条件变量实现线程同步控制

管程(monitor)说明

在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计。

管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下:

condition x, y;

x.wait();

...

x.signal();

调用x.wait()操作可能会使得一个进程挂起,直到另一个进程调用x.signal()操作。与信号量中的signal()操作相比,管程中如果在没有任何进程挂起的情况下调用signal()没有任何作用,而在信号量中,则必然会改变信号量的状态。

一个管程(mointor)的示意图如下所示:

 

一个mointor中的程序运行前必须首先获取mutex,直至程序运行完成或者线程等待的某个条件发生时才释放mutex。当一个线程执行mointor中的一个子程序时,称为占用(occupy)该mointor,因此必须等到没有其他线程执行管程程序时方可调用管程程序,这是互斥保证。在管程的简单实现中,编译器为每个管程对象自动加入一把私有的mutex(lock),初始状态为unlock,管程中的每个对象入口处执行lock操作,出口处执行unlock操作。

因此设计monitor时至少必须包含mutex(lock) object(互斥量)和condition variables(条件变量)。一个条件变量可以看作是等待该条件发生的线程集合。

注:monitor也称为<线程安全对象/类/模块>。

 

条件变量

为何需要条件变量?

考虑如下一个busy waiting loop:

while not(P)

    do skip

如果仅有mutex,则线程必须等待P为真时才能继续执行。如此,将会导致其他线程无法进入临界区使得条件P为真,因此该管程可能发生死锁。

可以用条件变量解决。一个条件变量C可以看作是一个线程队列,其中存放的线程正在等待与之关联的条件变为真。当一个线程等待一个条件变量C时,其将mutex释放,然后其他线程就可以进入该管程中,通过改变C的值可以使得条件C满足,因此对条件变量C可以有如下操作:

(1)wait(c, m):线程调用该操作,等待条件C满足后继续执行,在等待过程中,释放mutex,因此此过程中,该线程不占用管程。

(2)signal(c):线程调用该操作表明此时条件C为真。

一个线程发生signal()后,至少有两个线程想要占用包含条件变量的管程:发出signal()操作的线程P,等待条件变量的线程Q,此时有两种选择:

1.非阻塞式条件变量:Q继续等待直到P完成。

2.阻塞式条件变量:P继续等待直到Q完成。

两种条件变量类型

阻塞式条件变量

也被称为霍尔风格(Hoare-style)管程,如下图所示:

 

每个管程包含两个线程队列e,s,其中:

e:入口队列

s:发出signal的线程队列

对于每个条件变量C,有一个线程队列,用C.q表示,如上图的a.q、b.q,这些队列很多情况下可以实现为FIFO模式。

阻塞式条件变量实现如下:

 

非阻塞式条件变量

也称为Mesa风格管程,如下图所示:

 

该模型中,发出signal()操作的线程不会失去管程的占用权,被notified()的线程将会被移到队列e中,相较于阻塞式条件变量,该模型不需要队列s。例如Pthread中的条件变量就采用这种非阻塞模式,即发出signal()操作的线程优先级高于被notified()的线程,要使用这种条件变量:首先利用pthread_mutex_lock获取互斥锁,然后调用pthread_cond_wait在线程睡眠等待之前先释放互斥锁,在其被唤醒后再重新获取互斥锁。关于pthread条件变量如下会有详细介绍。

非阻塞条件变量实现如下:

 

POSIX同步之互斥锁和条件变量的使用

如下为经典的有界缓冲区问题,可以用生产者/消费者模型描述,示意图如下:

 

采用互斥量的生产者/消费者代码如下:

  1 [root@bogon unp]# cat producer_consumer_mutex.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9 
 10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
 11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
 12 #define BUFFERSIZE 10
 13 
 14 int g_buffer[BUFFERSIZE];
 15 
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18 
 19 pthread_mutex_t g_mutex;
 20 
 21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
 22 
 23 void* consumer(void* arg)
 24 {
 25         int num = (int)arg;
 26         /* 不断消费 */
 27         while (1)
 28         {
 29                 pthread_mutex_lock(&g_mutex);
 30 
 31                 /* 打印仓库当前状态 */
 32                 int i;
 33                 for (i = 0; i < BUFFERSIZE; i++) 
 34                 {
 35                         if (g_buffer[i] == -1)
 36                                 printf("g_buffer[%d] = %s
", i, "null");
 37                         else
 38                                 printf("g_buffer[%d] = %d
", i, g_buffer[i]);
 39 
 40                         if (i == out)
 41                                 printf("g_buffer[%d]可以消费
", i);
 42                 }
 43 
 44                 /* 消费产品 */
 45                 printf("thread %d 开始消费产品 %d
", num, g_buffer[out]);
 46 sleep(4);       /* 消费一个产品需要4秒 */
 47                 g_buffer[out] = -1;
 48                 printf("消费完毕
");
 49                 out = (out + 1) % BUFFERSIZE;
 50 
 51                 pthread_mutex_unlock(&g_mutex);
 52         }
 53 
 54         return NULL;
 55 }
 56 
 57 void* producer(void* arg)
 58 {
 59         int num = (int)arg;
 60         /* 不断生产 */
 61         while (1)
 62         {
 63                 pthread_mutex_lock(&g_mutex);
 64 
 65                 /* 打印仓库当前状态 */
 66                 int i;
 67                 for (i = 0; i < BUFFERSIZE; i++)
 68         {
 69                 if (g_buffer[i] == -1)
 70                 printf("g_buffer[%d] = %s
", i, "null");
 71             else
 72                 printf("g_buffer[%d] = %d
", i, g_buffer[i]);
 73   
 74             if (i == in)
 75                 printf("g_buffer[%d]可以生产
", i);
 76         }
 77 
 78                 /* 生产产品 */
 79                 g_buffer[in]++;
 80                 printf("thread %d 开始生产产品 %d
", num, g_buffer[in]);
 81                 sleep(2);       /* 生产一个产品需要2秒 */
 82                 printf("生产完毕
");
 83                 in = (in + 1) % BUFFERSIZE;
 84 
 85                 pthread_mutex_unlock(&g_mutex);
 86         }
 87 
 88         return NULL;
 89 }
 90 
 91 int main(void)
 92 {
 93         /* 初始化仓库 */
 94         int i;
 95         for (i = 0; i < BUFFERSIZE; i++)
 96                 g_buffer[i] = -1;
 97 
 98         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
 99         for (i = 0; i < CONSUMER_COUNT; i++)
100         {
101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
102         }
103 
104         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
105         for (i = 0; i < PRODUCER_COUNT; i++)
106         {
107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
108         }
109 
110         /* 等待创建的所有线程退出 */
111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
112         {
113                 pthread_join(g_thread[i], NULL);
114         }
115 
116         return 0;
117 }
118 
119 // output
120 ...
121 thread 2 开始生产产品 4
122 生产完毕
123 g_buffer[0] = 4
124 g_buffer[1] = 4
125 g_buffer[2] = 4
126 g_buffer[3] = 2
127 g_buffer[3]可以生产
128 g_buffer[4] = 2
129 g_buffer[5] = 1
130 g_buffer[6] = 1
131 g_buffer[7] = 0
132 g_buffer[8] = 0
133 g_buffer[9] = 4
134 thread 1 开始生产产品 3
135 生产完毕
136 g_buffer[0] = 4
137 g_buffer[1] = 4
138 g_buffer[2] = 4
139 g_buffer[3] = 3
140 g_buffer[4] = 2
141 g_buffer[5] = 1
142 g_buffer[6] = 1
143 g_buffer[7] = 0
144 g_buffer[8] = 0
145 g_buffer[9] = 4
146 g_buffer[9]可以消费
147 thread 0 开始消费产品 4
148 消费完毕
149 ...
View Code

但是上述程序中存在一个问题,就是当生产者线程未准备好产品时,消费者线程却在不断执行循环,这种被称为轮转(spinning)或者轮询(polling)的现象是对CPU资源的一大浪费。如下引入条件变量与互斥锁共同工作,互斥锁用于加锁互斥,而条件变量则专注于等待,每个条件变量总是和一个互斥锁关联。

采用条件变量的生产者/消费者代码如下:

  1 [root@bogon unp]# cat producer_consumer_condition.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9 
 10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
 11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
 12 #define BUFFERSIZE 10
 13 
 14 int g_buffer[BUFFERSIZE];
 15 
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18 
 19 pthread_mutex_t g_mutex;
 20 
 21 typedef struct
 22 {
 23         pthread_mutex_t mutex;
 24         pthread_cond_t cond;
 25 } Condition;
 26 
 27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 29 
 30 int nready;             /* 可以消费的产品数量 */
 31 
 32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
 33 
 34 void* consumer(void* arg)
 35 {
 36         int num = (int)arg;
 37         /* 不断消费 */
 38         while (1)
 39         {
 40                 pthread_mutex_lock(&g_mutex);
 41 
 42                 /* 打印仓库当前状态,(为了便于比较,这段打印临界区依然只使用互斥锁保护) */
 43                 int i;
 44         for (i = 0; i < BUFFERSIZE; i++) 
 45         {
 46                 if (g_buffer[i] == -1)
 47                 printf("g_buffer[%d] = %s
", i, "null");
 48             else
 49                 printf("g_buffer[%d] = %d
", i, g_buffer[i]);
 50 
 51             if (i == out)
 52                 printf("g_buffer[%d]可以消费
", i);
 53         }
 54 
 55                 pthread_mutex_unlock(&g_mutex);
 56 
 57                 /* 消费产品 */
 58                 pthread_mutex_lock(&not_empty.mutex);
 59 
 60                 while (nready == 0)
 61                         pthread_cond_wait(&not_empty.cond, &not_empty.mutex);
 62                 printf("thread %d 开始消费产品 %d
", num, g_buffer[out]);
 63                 sleep(4);       /* 消费一个产品需要4秒 */
 64                 g_buffer[out] = -1;
 65                 printf("消费完毕
");
 66                 --nready;
 67                 out = (out + 1) % BUFFERSIZE;
 68 
 69                 pthread_cond_signal(&not_full.cond);
 70                pthread_mutex_unlock(&not_empty.mutex);
 71         }
 72 
 73         return NULL;
 74 }
 75 
 76 void* producer(void* arg)
 77 {
 78         int num = (int)arg;
 79         /* 不断生产 */
 80         while (1)
 81         {
 82                 pthread_mutex_lock(&g_mutex);
 83 
 84                 /* 打印仓库当前状态 */
 85                 int i;
 86                 for (i = 0; i < BUFFERSIZE; i++)
 87         {
 88                 if (g_buffer[i] == -1)
 89                 printf("g_buffer[%d] = %s
", i, "null");
 90             else
 91                 printf("g_buffer[%d] = %d
", i, g_buffer[i]);
 92   
 93             if (i == in)
 94                 printf("g_buffer[%d]可以生产
", i);
 95         }
 96 
 97                 pthread_mutex_unlock(&g_mutex);
 98 
 99                 /* 生产产品 */
100                 pthread_mutex_lock(&not_full.mutex);
101 
102                 while (nready == BUFFERSIZE)
103                         pthread_cond_wait(&not_full.cond, &not_full.mutex);
104                 g_buffer[in]++;
105                 printf("thread %d 开始生产产品 %d
", num, g_buffer[in]);
106                 sleep(2);       /* 生产一个产品需要2秒 */
107                 printf("生产完毕
");
108                 ++nready;
109                 in = (in + 1) % BUFFERSIZE;
110 
111                 pthread_cond_signal(&not_empty.cond);
112                 pthread_mutex_unlock(&not_full.mutex);
113         }
114 
115         return NULL;
116 }
117 
118 int main(void)
119 {
120         /* 初始化仓库 */
121         int i;
122         for (i = 0; i < BUFFERSIZE; i++)
123                 g_buffer[i] = -1;
124 
125         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
126         for (i = 0; i < CONSUMER_COUNT; i++)
127         {
128                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
129         }
130 
131         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
132         for (i = 0; i < PRODUCER_COUNT; i++)
133         {
134                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
135         }
136 
137         /* 等待创建的所有线程退出 */
138         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
139         {
140                 pthread_join(g_thread[i], NULL);
141         }
142 
143         return 0;
144 }
145 
146 // output is the same as above
View Code

条件变量使用说明:

一个条件变量的改变是原子性的,因此需要一个互斥锁来保证,因此,条件变量的使用代码可以如下:

1 typedef struct
2 {
3     pthread_mutex_t mutex;
4     pthread_cond_t cond;
5     // 与条件变量相关的变量声明
6 } Condition;
7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
9 ...

1.执行signal操作的线程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
// 设置条件为真
pthread_cond_signal(&cond_a.cond);
pthread_mutex_unlock(&cond_a.mutex);

说明:

pthread_cond_signal与pthread_mutex_unlock的顺序:如果先signal后unlock,则可以确定signal操作是由lock住cond_a.mutex的线程调用的;如果先unlock后signal,则任一线程都可调用signal操作。如果需要可预见的调度行为,最好先signal后unlock,就像上面那样。

2.执行wait操作的线程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
while (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
// 修改条件
pthread_mutex_unlock(&cond_a.mutex);

说明:

(1)pthread_cond_wait执行如下3个操作:

  • 解锁cond_a.mutex,使得其他线程可以进入以便改变条件
  • 将调用线程阻塞在条件变量cond_a上(睡眠了),直到某个线程将条件设为真
  • 成功返回后(此时某个线程调用了pthread_cond_signal/broadcast)重新对cond_a.mutex加锁。

(2)是否可以将:

while (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

替换为:

if (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

答案是如果将while替换为if,可以发生虚假(spurious)唤醒:即发出signal的线程并为将条件设为真就调用了pthread_cond_signal,此时pthread_cond_wait却成功返回了,如此将导致后续的代码执行失败。因此必须在pthread_cond_wait返回后再次判断条件是否确实为真,即必须使用循环而非条件判断。

 

 

 

原文地址:https://www.cnblogs.com/benxintuzi/p/4874516.html