条件变量 避免惊群 虚假唤醒

1. 为什么调用pthread_cond_wait之前需要检查条件

在线程调用 pthread_cond_signal() 之前,如果没有线程调用 pthread_cond_wait() 处于阻塞状态,那么什么都不会发生;

在线程调用 pthread_cond_signal() 之后,线程调用了 pthread_cond_wait() ,那么这个线程将永远被阻塞,所以要在调用了 pthread_cond_wait() 之前检查条件

比如下面的例子线程B如果是在线程A执行过pthread_cond_signal()之后调用,那么将一直被阻塞

如果线程A

pthread_mutex_lock(&m);
cond = true;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);

线程B不判断条件直接wait

pthread_mutex_lock(&m);
pthread_cond_wait(&c, &m);
/* cond now true */​
pthread_mutex_unlock(&m);​

2. 为什么条件变量要和互斥锁一块使用

如果不配合互斥锁使用,会导致丢失唤醒

线程A

pthread_mutex_lock(&m);
while (cond == FALSE)
    pthread_cond_wait(&c, &m);
pthread_mutex_unlock(&m);​

线程B

condition = TRUE;
pthread_cond_signal(&cond);​

考虑下面的执行序列,在线程A判断过 cond == FALSE后,线程B修改了cond的值为TRUE并且调用了pthread_cond_signal 然后线程A才调用pthread_cond_wait 那么线程B的signal就被丢失了,线程A会被一直阻塞

Thread A                       Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {
 
                               cond = TRUE;
                               pthread_cond_signal(&c);
 
    pthread_cond_wait(&c, &m);​

如果将线程B修改为如下,保证了线程A的cond == FALSE和pthread_cond_wait是原子操作,那么此时线程B因为拿不到锁,就无法调用pthread_cond_signal

pthread_mutex_lock(&m);
cond = TRUE;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);​

3. 为什么要用while来检查条件

在wait前必须使用while来等待条件变量而不使用if语句,原因在于要避免spurious wakeups,即虚假唤醒。

1)什么是虚假唤醒:  即使没有线程broadcast 或者signal条件变量,pthread_cond_wait也可能偶尔返回

2)出现虚假唤醒的两种case

a. 函数 pthread_cond_wait() 底层实现是用的  futex 系统调用,每一个阻塞的系统调用在进程收到信号后立刻返回一个 EINTR 错误,然后 pthread_cond_wait() 不能重启调用futex等待,因为在收到EINTR错误到重新调用futex 期间pthread_cond_signal有可能被调用,那么就错过了真正的唤醒,所以直接返回,造成了虚假唤醒,并不是pthread_cond_signal 唤醒的

b. 函数 pthread_cond_wait()是被pthread_cond_signal唤醒的,但是发现条件不成立。这是可能因为线程调度,被条件变量唤醒的线程在本线程内真正执行「加锁并返回」前,另一个线程插了进来,完整地进行了一套「加锁、改条件、释放锁锁」的操作。

3)为什么while可以避免虚假唤醒

case 1:线程A,出现虚假唤醒后,重新进入while循环判断cond是FALSE,重新进入wait等待

Thread A                        Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {
    //spurious wakeups
    pthread_cond_wait(&c, &m);​
}                                        
                                pthread_mutex_lock(&m);
                                cond = TRUE;
                                pthread_cond_signal(&c);
                                pthread_mutex_unlock(&m);​
pthread_mutex_unlock(&m);​

case2:线程A调用pthread_cond_wait后,线程B拿到了锁,此时发生了虚假唤醒,线程B设置条件为TRUE,触发唤醒并释放锁,线程Apthread_cond_wait拿到锁后返回,while循环判断cond 已经变成TRUE,不会进入pthread_cond_wait,这种情况其实是丢失了真正的唤醒,虚假唤醒起了作用,但是程序却没有问题

Thread A                          Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {              
    // unlock锁,阻塞wait函数
    // 发生spurious wakeups       
    pthread_cond_wait(&c, &m);    pthread_mutex_lock(&m);
                                  cond = TRUE;
                                  pthread_cond_signal(&c);
                                  pthread_mutex_unlock(&m);​
    //函数返回,加锁等B释放锁   
    pthread_cond_wait(&c, &m);​                                 
}
pthread_mutex_unlock(&m);​

注意:pthread_cond_wait函数会先unlock锁,返回时会lock锁(不管是否是虚假唤醒还是真的唤醒)

typedef union
{
  struct
  {
    int __lock;保护多线程中cond结构本身的变量操作不会并发,例如对于total_seq进而wakup_seq的使用和递增操作。
    unsigned int __futex;另一个线程和这个线程之间在条件点上同步的方式,也就是如果需要和其它线程同步的话,使用这个互斥锁替换pthread_cond_wait传入的互斥锁进行同步。
    __extension__ unsigned long long int __total_seq;这个表示在这个条件变量上有多少个线程在等待这个信号。
    __extension__ unsigned long long int __wakeup_seq;已经在这个条件变量上执行了多少次唤醒操作。
    __extension__ unsigned long long int __woken_seq;这个条件变量中已经被真正唤醒的线程数目。
    void *__mutex;保存pthread_cond_wait传入的互斥锁,需要保证pthread_cond_wait和pthread_cond_signal传入的值都是相同值。
    unsigned int __nwaiters;表示这个cond结构现在还有多少个线程在使用,当有人在使用的时候,pthread_cond_destroy需要等待所有的操作完成
    unsigned int __broadcast_seq; 广播动作发生了多少次,也就是执行了多少次broadcast
  } __data;
  char __size[__SIZEOF_PTHREAD_COND_T];
  __extension__ long long int __align;
} pthread_cond_t;

pthread_cond_wait的操作

 

pthread_cond_wait :1.首先解锁相当于pthread_mutex_unlock。2.然后建立锁与条件变量的联系,3.等待唤醒,4.唤醒后第一件事情是上锁相当于pthread_mutex_lock

__pthread_cond_wait (cond, mutex)
     pthread_cond_t *cond;
     pthread_mutex_t *mutex;
{
  struct _pthread_cleanup_buffer buffer;
  struct _condvar_cleanup_buffer cbuffer;
  int err;
  int pshared = (cond->__data.__mutex == (void *) ~0l)
    ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are along.  */
  lll_lock (cond->__data.__lock, pshared);即将对cond结构的成员进行操作和判断,所以首先获得结构本身保护互斥锁。
  /* Now we can release the mutex.  */
  err = __pthread_mutex_unlock_usercnt (mutex, 0);
释放用户传入的互斥锁,此时另外一个执行pthread_cond_signal的线程可以通过pthread_mutex_lock执行可能的signal判断,
但是我们还没有释放数据操作互斥锁,所以另一方执行pthread_cond_signal的时候依然可能会等待。
if (__builtin_expect (err, 0)) { lll_unlock (cond->__data.__lock, pshared); return err; } /* We have one new user of the condvar. */ ++cond->__data.__total_seq;增加系统中所有需要执行的唤醒次数。 ++cond->__data.__futex;增加futex,主要是为了保证用户态数据一致性。 cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增加cond结构的使用次数。 /* Remember the mutex we are using here. If there is already a different address store this is a bad user bug. Do not store anything for pshared condvars. */ if (cond->__data.__mutex != (void *) ~0l) cond->__data.__mutex = mutex; /* Prepare structure passed to cancellation handler. */ cbuffer.cond = cond; cbuffer.mutex = mutex; /* Before we block we enable cancellation. Therefore we have to install a cancellation handler. */ __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);注册撤销点。 /* The current values of the wakeup counter. The "woken" counter must exceed this value. */ unsigned long long int val; unsigned long long int seq; val = seq = cond->__data.__wakeup_seq; /* Remember the broadcast counter. */ cbuffer.bc_seq = cond->__data.__broadcast_seq; do { unsigned int futex_val = cond->__data.__futex; /* Prepare to wait. Release the condvar futex. */ lll_unlock (cond->__data.__lock, pshared);此处真正释放cond操作互斥锁,我们已经不再对其中的变量进行操作。 /* Enable asynchronous cancellation. Required by the standard. */ cbuffer.oldtype = __pthread_enable_asynccancel (); /* Wait until woken by signal or broadcast. */ lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
等待在futex变量上,由于我们刚才保存了futex的原始值,所以如果在上面我们释放了data.lock之后另一个线程修改了这个变量的值,那么这里的lll_futex_wait将会返回失败,
所以会继续进行下一轮的while循环,直到连个执行相同,说明我们做的判断时正确的。
/* Disable asynchronous cancellation. */如果执行到这里,说明我们已经被signal唤醒。 __pthread_disable_asynccancel (cbuffer.oldtype); /* We are going to look at shared data again, so get the lock. */ lll_lock (cond->__data.__lock, pshared);访问变量,需要获得互斥锁。 /* If a broadcast happened, we are done. */ if (cbuffer.bc_seq != cond->__data.__broadcast_seq) goto bc_out; /* Check whether we are eligible for wakeup. */ val = cond->__data.__wakeup_seq; } while (val == seq || cond->__data.__woken_seq == val); 当val!=seq&&cond->data.wokenup!=val的时候可以进行唤醒,也就是另一个放修改了已经执行了唤醒的次数并且已经被唤醒的线程还有名额的时候。 /* Another thread woken up. */ ++cond->__data.__woken_seq;增加系统中已经被唤醒的线程的数目。 bc_out: broadcast跳转到这里。 cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT; /* If pthread_cond_destroy was called on this varaible already, notify the pthread_cond_destroy caller all waiters have left and it can be successfully destroyed. */ if (cond->__data.__total_seq == -1ULL && cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT)) lll_futex_wake (&cond->__data.__nwaiters, 1, pshared); /* We are done with the condvar. */ lll_unlock (cond->__data.__lock, pshared); /* The cancellation handling is back to normal, remove the handler. */ __pthread_cleanup_pop (&buffer, 0); /* Get the mutex before returning. */ return __pthread_mutex_cond_lock (mutex);
//再次获得mutex互斥锁,可能会睡眠,因为我们的这个释放是对上层透明的,而在进入函数的时候我们已经释放了这个互斥锁,所以此时还要进行一次获得操作,从而配对。 }
 

1. 为什么是pthread_cond_wait(cond, mutex)而不是pthread_cond_wait(cond)

我当初学习条件变量时,也有过和楼主相同的疑问,在上操作系统实践课程时,班上的个别学生也问过这个问题。相信这是一个初学者的共性问题,但很少有书籍仔细解释这个问题。

为什么pthread_cond_wait的api被设计为

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

而不是被设计为

int pthread_cond_wait(pthread_cond_t *cond);

pthread_cond_wait(cond, mutex)的功能有3个:

  • 调用者线程首先释放mutex
  • 然后阻塞,等待被别的线程唤醒
  • 当调用者线程被唤醒后,调用者线程会再次获取mutex

pthread_cond_wait(cond)的功能只有1个:

  • 调用者线程阻塞,等待被别的线程唤醒。

这里首先给一个简洁的回答:

  • 通常的应用场景下,当前线程执行pthread_cond_wait时,处于临界区访问共享资源,存在一个mutex与该临界区相关联,这是理解pthread_cond_wait带有mutex参数的关键
  • 当前线程执行pthread_cond_wait前,已经获得了和临界区相关联的mutex;执行pthread_cond_wait会阻塞,但是在进入阻塞状态前,必须释放已经获得的mutex,让其它线程能够进入临界区
  • 当前线程执行pthread_cond_wait后,阻塞等待的条件满足,条件满足时会被唤醒;被唤醒后,仍然处于临界区,因此被唤醒后必须再次获得和临界区相关联的mutex

综上,调用pthread_cond_wait时,线程总是位于某个临界区,该临界区与mutex相关,pthread_cond_wait需要带有一个参数mutex,用于释放和再次获取mutex。

while (predicates do not hold) {
    /* 1 */
    pthread_cond_wait(&cond);
}

若在1这个点正好条件满足且signal了一下,那么这个signal就丢了,上面的代码可能会陷入永久的等待。mutex就是确保1这个位置不可能穿插signal代码。当然相应地,signal一端也要加锁,否则仍然无法确保这点,这也是一个常见错误,因为pthread_cond_signal并不要求mutex。

一种特殊情况是,当你的条件简单到只有一条原子指令时,就可以直接使用futex了。事实上condition主要是简化多线程用户代码的开发,当需要编写较为底层且性能关键的代码时,你需要深入地了解atomicMemory barrier


 
pthread_cond_signal的操作
int
__pthread_cond_signal (cond)
     pthread_cond_t *cond;
{
  int pshared = (cond->__data.__mutex == (void *) ~0l)
  ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are alone.  */
  lll_lock (cond->__data.__lock, pshared);
  /* Are there any waiters to be woken?  */
  if (cond->__data.__total_seq > cond->__data.__wakeup_seq)如果待唤醒次数比已经唤醒的次数多,那么此时就进行一个唤醒操作。
    {
      /* Yes.  Mark one of them as woken.  */
      ++cond->__data.__wakeup_seq;
      ++cond->__data.__futex;改变futex的值,这个值的具体意义并不重要,只是为了告诉另一方,这个值已经变化,如果另一方使用的是原始值,那么对futex的wait操作将会失败。
      /* Wake one.  */
      if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
           1, &cond->__data.__lock,
           pshared), 0))
 return 0;
      lll_futex_wake (&cond->__data.__futex, 1, pshared);
    }
  /* We are done.  */
  lll_unlock (cond->__data.__lock, pshared);
  return 0;
}
5、__pthread_cond_broadcast 
int
__pthread_cond_broadcast (cond)
     pthread_cond_t *cond;
{
  int pshared = (cond->__data.__mutex == (void *) ~0l)
  ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are alone.  */
  lll_lock (cond->__data.__lock, pshared);
  /* Are there any waiters to be woken?  */
  if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判断是否有等待唤醒的线程。
    {
      /* Yes.  Mark them all as woken.  */
      cond->__data.__wakeup_seq = cond->__data.__total_seq;
      cond->__data.__woken_seq = cond->__data.__total_seq;
      cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
      int futex_val = cond->__data.__futex;
      /* Signal that a broadcast happened.  */
      ++cond->__data.__broadcast_seq;
      /* We are done.  */
      lll_unlock (cond->__data.__lock, pshared);
      /* Do not use requeue for pshared condvars.  */
      if (cond->__data.__mutex == (void *) ~0l)
 goto wake_all;
      /* Wake everybody.  */
      pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
      /* XXX: Kernel so far doesn't support requeue to PI futex.  */
      /* XXX: Kernel so far can only requeue to the same type of futex,
  in this case private (we don't requeue for pshared condvars).  */
      if (__builtin_expect (mut->__data.__kind
       & (PTHREAD_MUTEX_PRIO_INHERIT_NP
          | PTHREAD_MUTEX_PSHARED_BIT), 0))
 goto wake_all;
      /* lll_futex_requeue returns 0 for success and non-zero
  for errors.  */
      if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
            INT_MAX, &mut->__data.__lock,
            futex_val, LLL_PRIVATE), 0))把futex上的转移到data.lock中并唤醒,如果失败则直接唤醒而不转移。
 {
   /* The requeue functionality is not available.  */
 wake_all:
   lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);这里的INT_MAX就是告诉内核唤醒所有在这个变量上等待的线程。
 }
      /* That's all.  */
      return 0;
    }
  /* We are done.  */
  lll_unlock (cond->__data.__lock, pshared);
  return 0;
}
原文地址:https://www.cnblogs.com/dream397/p/14690724.html