[Python 多线程] Condition (十)

Condition常用于生产者、消费者模型,为了解决生产者消费者速度匹配问题。

构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认RLock。

方法:

acquire(*args)  获取锁

release()     释放锁

wait(timeout=None)  等待通知或直到发生超时

notify(n=1)  唤醒至多指定个数的等待的线程,没有等待的线程就没有任何操作

notify_all()  唤醒所有等待的线程。wake up

以下例子,不考虑线程安全问题:

例1:

#Condition
import threading,random,logging
logging.basicConfig(level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = 0
        self.event = threading.Event()

    def produce(self):
        for i in range(100):
            self.event.wait(1) #每1秒生成一条数据
            data = random.randint(1,100)
            self.data = data

    def custom(self):
        while True:
            logging.info(self.data)
            self.event.wait(0.5) # 替换为1表示消费者每1秒取一次数据


d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)

c.start()
p.start()

以下结果:
INFO:root:0
INFO:root:0
INFO:root:0
INFO:root:77
INFO:root:64
INFO:root:64
INFO:root:8
INFO:root:8
INFO:root:85

  生产者每1秒钟生成一条数据,消费者每0.5秒就来取一次数据。

例2:

#Condition 通知机制,解决重复
import threading,random,logging
logging.basicConfig(level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = 0
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self):
        for i in range(100):
            data = random.randint(1,100)
            with self.cond:
                self.data = data
                self.cond.notify_all() #通知所有waiter
            self.event.wait(1) #1秒生产一次数据

    def custom(self):
        while True:
            with self.cond:
                self.cond.wait() #无限等待
                logging.info(self.data) #消费

            self.event.wait(0.5) #0.5秒消费一次数据


d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)

c.start()
p.start()

运行结果:
INFO:root:64
INFO:root:43
INFO:root:11
INFO:root:28
INFO:root:30
INFO:root:33
INFO:root:93
INFO:root:69
INFO:root:4

  使用with来管理Condition的上下文(acquire/release),利用Condition通知机制解决消费者获取重复数据。

例3:

#Condition 先生成后消费,1对1
import threading,random,logging
logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")

class Dispatcher:
    def __init__(self):
        self.data = 0
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self):
        for i in range(100):
            data = random.randint(1,100)
            logging.info(self.data)
            with self.cond:
                self.data = data
                self.cond.notify(1)
                # self.cond.notify_all()
            self.event.wait(1)

    def custom(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info(self.data)

            self.event.wait(0.5)


d = Dispatcher()
p = threading.Thread(target=d.produce,name='produce')
c = threading.Thread(target=d.custom,name='c')
c1 = threading.Thread(target=d.custom,name='c1')
p.start()

e = threading.Event()
e.wait(3)

c1.start()
c.start()

运行结果:
7520 produce 0
7520 produce 78
7520 produce 88
7520 produce 14
7520 produce 83
2508 c1 86
7520 produce 86
1136 c 79
7520 produce 79
2508 c1 77
7520 produce 77
1136 c 47
7520 produce 47
2508 c1 76
7520 produce 76
1136 c 69

  生产者先生产数据,2个消费者一个一个来消费数据。

例4:

#Condition 1对多,2个2个通知
import threading,random,logging
logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")

class Dispatcher:
    def __init__(self):
        self.data = 0
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self):
        for i in range(100):
            data = random.randint(1,100)
            logging.info(self.data)
            with self.cond:
                self.data = data
                self.cond.notify(2)
                # self.cond.notify_all()
            self.event.wait(1)

    def custom(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info(self.data)
            # self.event.wait(0.5)


d = Dispatcher()
p = threading.Thread(target=d.produce,name='produce')

for i in range(5):
    threading.Thread(target=d.custom,name='c-{}'.format(i)).start()

p.start()

以下结果:
8688 produce 0
10376 c-0 90
7928 c-1 90
8688 produce 90
7640 c-2 61
10748 c-3 61
8688 produce 61
10376 c-0 73
1344 c-4 73
8688 produce 73
7928 c-1 57
7640 c-2 57
8688 produce 57
10748 c-3 71
10376 c-0 71

  1对多,2个2个通知来处理数据。

以上例子中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解Condition的使用,和生产者消费者模型。

Condition总结:

Condition采用通知机制,常用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

使用方法:

使用Condition,必须先acquire,用完之后要release,因为内部使用了锁,默认使用RLock,最好的方法是使用with上下文管理。

生产者wait,会阻塞等待通知,被激活。

生产者生产好消息,对消费者发通知,可以使用notidy_all() 通知所有消费者或者notify()。

原文地址:https://www.cnblogs.com/i-honey/p/8068195.html