线程协作之threading.Condition

 
 
 

领会下面这个示例吧,其实跟java中wait/nofity是一样一样的道理

复制代码
import threading


# 条件变量,用于复杂的线程间同步锁
"""
需求:
    男:小姐姐,你好呀!
    女:哼,想泡老娘不成?
    男:对呀,想泡你
    女:滚蛋,门都没有!
    男:切,长这么丑, 还这么吊...
    女:关你鸟事!

"""
class Boy(threading.Thread):
    def __init__(self, name, condition):
        super().__init__(name=name)
        self.condition = condition

    def run(self):
        with self.condition:
            print("{}:小姐姐,你好呀!".format(self.name))
            self.condition.wait()
            self.condition.notify()

            print("{}:对呀,想泡你".format(self.name))
            self.condition.wait()
            self.condition.notify()

            print("{}:切,长这么丑, 还这么吊...".format(self.name))
            self.condition.wait()
            self.condition.notify()


class Girl(threading.Thread):
    def __init__(self, name, condition):
        super().__init__(name=name)
        self.condition = condition

    def run(self):
        with self.condition:
            print("{}:哼,想泡老娘不成?".format(self.name))
            self.condition.notify()
            self.condition.wait()

            print("{}:滚蛋,门都没有!".format(self.name))
            self.condition.notify()
            self.condition.wait()

            print("{}:关你鸟事!".format(self.name))
            self.condition.notify()
            self.condition.wait()


if __name__ == '__main__':
    condition = threading.Condition()
    boy_thread = Boy('男', condition)
    girl_thread = Girl('女', condition)

    boy_thread.start()
    girl_thread.start()
复制代码

Condition的底层实现了__enter__和 __exit__协议.所以可以使用with上下文管理器

由Condition的__init__方法可知,它的底层也是维护了一个RLock锁

  def __enter__(self):
        return self._lock.__enter__()
   def __exit__(self, *args):
        return self._lock.__exit__(*args)
 def __exit__(self, t, v, tb):
        self.release()
复制代码
 def release(self):
        """Release a lock, decrementing the recursion level.

        If after the decrement it is zero, reset the lock to unlocked (not owned
        by any thread), and if any other threads are blocked waiting for the
        lock to become unlocked, allow exactly one of them to proceed. If after
        the decrement the recursion level is still nonzero, the lock remains
        locked and owned by the calling thread.

        Only call this method when the calling thread owns the lock. A
        RuntimeError is raised if this method is called when the lock is
        unlocked.

        There is no return value.

        """
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1
        if not count:
            self._owner = None
            self._block.release()
复制代码

上面的源码可知,__enter__方法就是accquire一把锁. __ exit__方法 最终是release锁

至于wait/notify是如何操作的,还是有点懵.....

wait()方法源码中这样三行代码 

waiter = _allocate_lock()  #从底层获取了一把锁,并非Lock锁
waiter.acquire()
self._waiters.append(waiter)  # 然后将这个锁加入到_waiters(deque)中
saved_state = self._release_save()  # 这是释放__enter__时的那把锁???

notify()方法源码

复制代码
all_waiters = self._waiters   
waiters_to_notify = _deque(_islice(all_waiters, n))# 从_waiters中取出n个
if not waiters_to_notify:    # 如果是None,结束
      return
for waiter in waiters_to_notify: # 循环release
      waiter.release()
      try:
          all_waiters.remove(waiter)  #从_waiters中移除
      except ValueError:
          pass
复制代码

大体意思: wait先从底层创建锁,acquire, 放到一个deque中,然后释放掉with锁, notify时,从deque取拿出锁,release

 源:https://www.cnblogs.com/z-qinfeng/p/12057534.html

Condition的处理流程如下:

首先acquire一个条件变量,然后判断一些条件。

  •  如果条件不满足则wait;
  •  如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。
  •  不断的重复这一过程,从而解决复杂的同步问题。

Condition的基本原理如下:

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有的线程永远处于沉默状态。

演示条件变量同步的经典问题是生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用Condition解决生产者与消费者问题的代码如下:

# -*- coding: utf-8 -*-
"""
Created on Wed Nov 28 17:15:29 2018
 
@author: 18665
"""
 
import threading
import time
 
class Producer(threading.Thread):
  # 生产者函数
  def run(self):
    global count
    while True:
      if con.acquire():
        # 当count 小于等于1000 的时候进行生产
        if count > 1000:
          con.wait()
        else:
          count = count+100
          msg = self.name+' produce 100, count=' + str(count)
          print(msg)
          # 完成生成后唤醒waiting状态的线程,
          # 从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁
          con.notify()
        con.release()
        time.sleep(1)
 
class Consumer(threading.Thread):
  # 消费者函数
  def run(self):
    global count
    while True:
      # 当count 大于等于100的时候进行消费
      if con.acquire():
        if count < 100:
          con.wait()
        
        else:
          count = count-5
          msg = self.name+' consume 5, count='+str(count)
          print(msg)
          con.notify()
          # 完成生成后唤醒waiting状态的线程,
          # 从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁
        con.release()
        time.sleep(1)
 
count = 500
con = threading.Condition()
 
def test():
  for i in range(2):
    p = Producer()
    p.start()
  for i in range(5):
    c = Consumer()
    c.start()
if __name__ == '__main__':
  test()

Python下有很多的Lock锁,比如Mutex,Rlock,semaphore…  这些都是比较常用的Lock锁。  然而很多时候我们都忘记threading下还有一个叫做condition的条件变量。   condition内部是含有锁的逻辑,不然也没法保证线程之间的同步。 

那么condition一般用于什么场景?  最多的场景是什么?

线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。在pthread库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。

通俗的讲,生产者,消费者的模型。 condition很适合那种主动休眠,被动唤醒的场景。 condition使用难度要高于mutex,一不注意就会被死锁,SO一定要理解condition实现后,再用。

     首先我们知道python下的线程是真实的线程,底层用的是pthread。pthread内部Condition条件变量有两个关键函数, await和signal方法,对应python threading Condition是wait和notify方法。 

 
     一个Condition实例的内部实际上维护了两个队列,一个是等待锁队列,mutex内部其实就是维护了一个队列。 另一个队列可以叫等待条件队列,在这队列中的节点都是由于(某些条件不满足而)线程自身调用wait方法阻塞的线程,记住是自身阻塞。最重要的Condition方法是wait和 notify方法。另外condition还需要lock的支持, 如果你构造函数没有指定lock,condition会默认给你配一个rlock。   
 
下面是这两个方法的执行流程。
 
          await方法:
 
                            1. 入列到条件队列(注意这里不是等待锁的队列)
 
                            2. 释放锁
 
                            3. 阻塞自身线程
 
                             ————被唤醒后执行————-
 
                            4. 尝试去获取锁(执行到这里时线程已不在条件队列中,而是位于等待(锁的)队列中,参见signal方法)
 
                                4.1 成功,从await方法中返回,执行线程后面的代码
 
                                4.2 失败,阻塞自己(等待前一个节点释放锁时将它唤醒)
 
         注意: 调用wait可以让当前线程休眠,等待其他线程的唤醒,也就是等待signal,这个过程是阻塞的。 当队列首线程被唤醒后,会继续执行await方法中后面的代码。

         signal (notify)方法:

                           1. 将条件队列的队首节点取出,放入等待锁队列的队尾
 
                           2. 唤醒节点对应的线程.
 

注: signal ( notify ) 可以把wait队列的那些线程给唤醒起来。  

下面是一个python测试代码.

#xiaorui.cc
!/usr/bin/env python
 -*- coding:utf-8 -*-
mport threading
mport time

ask = []

lass Producer(threading.Thread):
   def run(self):
       while True:
           if con.acquire():
               print "producer role ---------------------"
               if len(task) > 1000:
                   con.wait()
               else:
                   for i in range(100):
                       task.append(i)
                   msg = self.name+' produce 100, count=' + str(len(task))
                   print msg
                   con.notify()
                   time.sleep(5)
                   con.release()

lass Consumer(threading.Thread):
   def run(self):
       while True:
           if con.acquire():
               print "consumer role"
               if len(task) < 100:
                   con.wait()
               else:
                   for i in range(10):
                       task.pop()
                   msg = self.name+' consume 3, count='+str(len(task))
                   print msg
                   con.notify()
                time.sleep(3)
                con.release()

con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
if __name__ == '__main__':
    test()

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

除了上面画的acquire方法、 release方法、notify方法、wait方法外还有notifyAll方法,不过notifyAll方法不常用。

51cto博客上我看到一篇博文中,形象的以二人对话 (生产者-消费者模)来解释上面的具体理论。

其中空格哥对应原理图中的A函数 ,西米对应的B 函数,每句话是doing操作,空格哥未“doing” 前,西米需要一直等待。最后,你来我往,直到最后都release掉,对话结束。由于代码太长,我给个精简版的,模拟上面的对话:

# coding:utf-8
# ---- Condition
# ---- 捉迷藏的游戏
import threading, time


class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)  # 确保先运行Seeker中的方法
        self.cond.acquire()
        print(self.name + ': 我已经把眼睛蒙上了')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 我找到你了 ~_~')
        self.cond.notify()
        self.cond.release()
        print(self.name + ': 我赢了')


class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        self.cond.wait()  # a    #释放对琐的占用,同时线程挂起在这里,直到被notify并重新占有琐。
        print(self.name + ': 我已经藏好了,你快来找我吧')
        self.cond.notify()
        self.cond.wait()
        self.cond.release()
        print(self.name + ': 被你找到了,哎~~~')


cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

执行结果如下:

便于对比,这里再给一个无限循环的例子。经典的生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用Condition解决生产者与消费者问题的代码如下:

import threading
import time


class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count > 1000:
                    con.wait()
                else:
                    count = count + 100
                    msg = self.name + ' produce 100, count=' + str(count)
                    print('produce 100', msg)
                    con.notify()
                con.release()
                time.sleep(1)


class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count < 100:
                    con.wait()
                else:
                    count = count - 3
                    msg = self.name + ' consume 3, count=' + str(count)
                    print('consume 3', msg)
                    con.notify()
                con.release()
                time.sleep(1)


count = 500
con = threading.Condition()


def test():
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()


if __name__ == '__main__':
    test()
 
 
原文地址:https://www.cnblogs.com/a00ium/p/13944154.html