Python线程同步(1)

概念

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

不同操作系统实现拘束有所不同,有临界区(Critical Section),互斥量(Mutex)、信号量(Semaphore)、事件event等。

Event(重要)

event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的true或false的变化来进行操作。

set():标记设置为true

clear():标记设置为false

is_set():标记是否为true

wait(timeout = none):设置等待标记为true的时长,none为无限等待,等到返回true,未等到超时了返回false.

需求

老板雇佣了一个工人,让他生产一个杯子,老板一直等着这个工人,知道生产了10个杯子。

from threading import Event,Thread
import logging
import time

FORMAT = "%(asctime)s %(threadNname)s %(thread)d %(message)s"
logging.basicConfig(format = FORMAT,level = logging.INFO)

def boss(event:Event):
    logging.info("i'm boss,waiting for u")
    #等待
    event.wait()
    logging.info("good job")
    
def worker(event:Event,count = 10):
    logging.info("i'm working for u")
    cups = []
    while True:
        logging.info("make 1")
        time.sleep(0.5)
        cups.append(1)
        if len(cups)>= count:
            #通知
            event.set()
            break
    logging.info("i finished my job. cups = {}".format(cups))
    
event = Event()
w = Thread(target = worker,args = (event,))
b = Thread(target = boss,args = (event,))
w.start()
b.start()

结果为:

2019-11-25 17:13:57,577 Thread-1 9760 i'm working for u
2019-11-25 17:13:57,577 Thread-1 9760 make 1
2019-11-25 17:13:57,577 Thread-2 10592 i'm boss,waiting for u.
2019-11-25 17:13:58,077 Thread-1 9760 make 1
2019-11-25 17:13:58,577 Thread-1 9760 make 1
2019-11-25 17:13:59,077 Thread-1 9760 make 1
2019-11-25 17:13:59,577 Thread-1 9760 make 1
2019-11-25 17:14:00,077 Thread-1 9760 make 1
2019-11-25 17:14:00,577 Thread-1 9760 make 1
2019-11-25 17:14:01,077 Thread-1 9760 make 1
2019-11-25 17:14:01,577 Thread-1 9760 make 1
2019-11-25 17:14:02,077 Thread-1 9760 make 1
2019-11-25 17:14:02,577 Thread-1 9760 i finished my job. cups = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
2019-11-25 17:14:02,577 Thread-2 10592 good job

 

总结

使用同一个Event对象的标记flag,谁wait就是等到flag变为true,或等到超时返回false,不限制等待的个数。

wait的作用

from threading import Event,Thread
import logging
logging.basicConfig(level = logging.INFO)

def do(event:Event,interval:int):
    while not event.wait(interval):#条件中使用,返回true或false
        logging.info("do sth")
        
e = Event()
Thread(target = do,args = (e,3)).start()

e.wait(10)#也可以使用time.sleep(10)
e.set()
print("main exit")

Event的wait优于time.sleep,它会更快的切换到其他线程,提高并发效率。

Event练习

实现Timer,延时执行的线程

延时计算add(x,y)

思路

Timer的构造函数中参数得有哪些?

如何实现start启动一个线程执行函数

如何cancel取消待执行任务

思路实现

from threading import Event,Thread
import logging
logging.basicConfig(level = logging.INFO)

def add(x:int,y:int):
    logging.info(x+y)
    
class Timer():
    def __init__(self,interval,fn,*args,**kwargs):
        pass
    
    def start(self):
        pass
    
    def cancel(self):
        pass
    

完整实现

from threading import Event,Thread
import logging
import datetime
logging.basicConfig(level = logging.INFO)

def add(x:int,y:int):
    logging.info(x+y)
    
class Timer():
    def __init__(self,interval,fn,*args,**kwargs):
        self.interval = interval
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.event = Event()
    
    def start(self):
        Thread(target = self.__run).start()
    
    def cancel(self):
        self.event.set()
        
    def __run(self):
        start = datetime.datetime.now()
        logging.info("waiting")
        
        self.event.wait(self.interval)
        if not self.event.is_set():
            self.fn(*self.args,**self.kwargs)
        delta = (datetime.datetime.now()  - start).total_seconds()
        logging.info("finished {} ".format(delta))
        self.event.set()
        
t = Timer(10,add,4,50)
t.start()
e = Event()
e.wait(4)
t.cancel()
print("============")
    

或者

from threading import Event,Thread
import logging
import datetime
logging.basicConfig(level = logging.INFO)

def add(x:int,y:int):
    logging.info(x+y)
    
class Timer():
    def __init__(self,interval,fn,*args,**kwargs):
        self.interval = interval
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.event = Event()
    
    def start(self):
        Thread(target = self.__run).start()
    
    def cancel(self):
        self.event.set()
        
    def __run(self):
        start = datetime.datetime.now()
        logging.info("waiting")
        
        
        if not self.event.wait(self.interval):
            self.fn(*self.args,**self.kwargs)
        delta = (datetime.datetime.now()  - start).total_seconds()
        logging.info("finished {} ".format(delta))
        self.event.set()
        
t = Timer(10,add,4,50)
t.start()
e = Event()
e.wait(4)
t.cancel()
print("============")
    

lock

锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。

需求:

订单要求生产1000个杯子, 组织10个工人生产。

import logging
import threading

logging.basicConfig(level=logging.INFO)

#10个人生产100个杯子

cups = []

def worker(task = 100):
    while True:
        count = len(cups)
        logging.info(count)
        if count>=task:
            break
        cups.append(1)
        logging.info("{} make 1".format(threading.current_thread().name))
    logging.info("{}".format(cups))

for _ in range(10):
    threading.Thread(target = worker,args=(100,)).start()

 从结果截图可以看出,最后的结果是104,并不是100,因为线程都在访问cups,也就是都在访问cups,这就导致了多生产。也就是接近临界点的时候,都在生产。

import threading
from threading import Thread,Lock
import logging
import time

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format = FORMAT,level = logging.INFO)

cups = []

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format = FORMAT,level = logging.INFO)

cups = []

def worker(count = 10):
    logging.info("i'm working for u")
    while len(cups)<count:
        time.sleep(0.0001)#为了看出线程切换效果
        cups.append(1)
    logging.info("i finished . cups = {}".format(len(cups)))
    
for _ in range(10):
    Thread(target = worker,args = (1000,)).start()

从上例的运行结果来看,多线程调度,导致了判断失效,多生产了杯子。如何修改?加锁

lock

锁,一旦线程获得锁,其他视图获取锁的线程将被阻塞。

import threading

lock = threading.Lock()

lock.acquire()#拿到锁

print("get locker")
lock.release()
print("release locker")

结果为:
get locker
release locker

上面拿到锁了,然后打印,再然后释放锁,再打印。

import threading

lock = threading.Lock()

lock.acquire()#拿到锁

print("get locker1")
lock.acquire()
print("get locker2")
lock.release()
print("release locker")

结果为:
get locker1

上面拿到锁了,没有释放,又在继续拿锁,就被阻塞咯。等待释放。不能在往后面执行。

import logging
import threading


logging.basicConfig(level=logging.INFO)

#10个人生产100个杯子

cups = []
lock = threading.Lock()

def worker(lock:threading.Lock ,task =100):
    while True:
        lock.acquire()
        count = len(cups)
        lock.release()
        logging.info(count)
        if count>=task:
            break
        lock.acquire()
        cups.append(1)
        lock.release()
        logging.info("{} make 1".format(threading.current_thread().name))
    logging.info("{}".format(cups))

for _ in range(10):
    threading.Thread(target = worker,args=(lock,100)).start()

上面这样的代码,最后的执行结果还是不正确,因为从业务上来说是不正确的。

import logging
import threading


logging.basicConfig(level=logging.INFO)

#10个人生产100个杯子

cups = []
lock = threading.Lock()

def worker(lock:threading.Lock ,task =100):
    while True:
        lock.acquire()
        count = len(cups)

        logging.info(count)
        if count>=task:
            break
        
        cups.append(1)
        lock.release()
        logging.info("{} make 1".format(threading.current_thread().name))
    logging.info("{}".format(cups))

for _ in range(10):
    threading.Thread(target = worker,args=(lock,100)).start()

这样写才是正确的。但是效率还是不太高。还有点问题!因为到中间的位置,被break了,这个时候根本没有释放锁。

 看结果,程序根本就没有结束。直接阻塞咯。

import logging
import threading


logging.basicConfig(level=logging.INFO)

#10个人生产100个杯子

cups = []
lock = threading.Lock()

def worker(lock:threading.Lock ,task =100):
    while True:
        lock.acquire()
        count = len(cups)

        logging.info(count)
        if count>=task:
            lock.release()
            break

        cups.append(1)
        lock.release()
        logging.info("{} make 1".format(threading.current_thread().name))
    logging.info("{}".format(cups))

for _ in range(10):
    threading.Thread(target = worker,args=(lock,100)).start()

这样才可以咯。

acquire(blocking = True,timeout = -1):默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置。成功获取锁,返回true,否则返回false。

release():释放锁。可以从任何线程调用释放,已上锁的锁,会被重置为unlocked未上锁的锁上调用,抛runtimeerror异常。

上例的锁的实现

import threading
from threading import Thread,Lock
import logging
import time

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format = FORMAT,level = logging.INFO)

cups = []
lock = Lock()

def worker(count = 10):
    logging.info("i'm working for u")
    flag = False
    while True:
        lock.acquire()#获取锁
        
        if len(cups)>=count:
            flag = True
        #lock.release()#1 这里释放锁?
        time.sleep(0.0001)#为了看出线程切换效果
        
        if not flag:
            cups.append(1)
        #lock.release()#2 这里释放锁?
        
        if flag:
            break
            
    logging.info("i finished . cups = {}".format(len(cups)))
    
for _ in range(10):
    Thread(target = worker,args = (1000,)).start()

思考,上面的代码中,共有两处可以释放锁,放在何处合适?

假设位置1的lock.release()合适,分析如下:

有一个时刻,在某一个线程中len(cups)正好是999,flag= true,释放锁,正好线程被打断。另一个线程判断发现也是999,flag= true,可能线程被打断。可能另外一个线程也判断是999,flag也设置为true。这3个线程只要继续执行到cups.append(1),一定会导致cups的长度超过1000.

假设位置2的lock.release()合适,分析如下:

在某一个时刻,len(cups)正好是999,flag= true,其他线程试图访问这段代码的线程都阻塞获取不到锁,直到当前线程安全的增加了一个数据,然后释放锁,其他线程有一个抢到锁,但发现已经1000了,只好break打印退出。再其他线程 都一样,发现已经1000咯。都退出了。

所以位置2释放锁是正确的。

但是我们发现锁保证了数据完整性,但是性能下降很多。

上例中if flag:break是为了保证release方法被执行,否则,就出现了死锁,得到的锁永远没有释放锁。

计数器类,可以加,可以减。

import threading
from threading import Thread,Lock
import time

class Counter:
    def __init__(self):
        self._val = 0

    @property
    def value(self):
        return self._val

    def inc(self):
        self._val+=1

    def dec(self):
        self._val-=1

def run(c:Counter,count = 100):#重复了100次,50次加,50次减,最后结果应该0.
    for _ in range(count):
        for i in range(-50,50):
            if i<0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10 #线程数
c2 = 1000

for i in range(c1):
    Thread(target=run,args =(c,c2)).start()

print(c.value)#这一句有问题,可能线程还没有启动,这一句就完了。可以让主线程睡一会。

c1 取10、100、1000看看

c2取10、100、1000看看。当是1000的时候,结果就不对了。而且应该注意,最后一句打印的结果不一定是最终结果,可能线程还在运行。

self._val+=1或self._val-=1在线程执行的时候,有可能被打断。

要加锁,怎么加?

加锁,解锁

一般来说,加锁就需要解锁,但是加锁后,解锁前,还要有一些代码执行,就有可能会抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

加锁,解锁常用语句:

  1. 使用try……finally语句保证锁的释放。
  2. with上下文管理,锁对象支持上下文管理。

改造counter类,如下:

import threading
from threading import Thread,Lock
import time

class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = Lock()

    @property
    def value(self):
        with self.__lock:
            return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val+=1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1


def run(c:Counter,count = 100):
    for _ in range(count):
        for i in range(-50,50):
            if i<0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10 #线程数
c2 = 1000

for i in range(c1):
    Thread(target=run,args =(c,c2)).start()

print(c.value)#这一句合适吗?

最后一句修改如下:

import threading
from threading import Thread,Lock
import time

class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = Lock()

    @property
    def value(self):
        with self.__lock:#__enter__和__exit__
            return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val+=1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1


def run(c:Counter,count = 100):
    for _ in range(count):
        for i in range(-50,50):
            if i<0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10 #线程数
c2 = 1000

for i in range(c1):
    Thread(target=run,args =(c,c2)).start()

while True:
    time.sleep(1)
    if threading.active_count() ==1:
        print(threading.enumerate())
        print(c.value)
        break
    else:
        print(threading.enumerate())

print(v.value)这一句在主线程中,很早就执行完了。退出条件是,只剩下主线程的时候。

锁的应用场景

锁使用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?

不需要,因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁。

使用锁的注意事项:

  • 少用锁,必要时用锁,使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。
  • 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。
  • 加锁时间越短越好,不需要就立即释放锁。
  • 一定要避免死锁。

不使用锁,有了效率,但是结果是错的。

使用了锁,效率低下,但是结果是正确的。

所以,我们是为了效率要错误的结果呢?还是为了正确的结果,让计算机去计算?

非阻塞锁使用

import  threading
import time
import logging

FORMAT = "%(asctime)-15s	 [%(threadName)s,%(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO,format = FORMAT)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):#获取锁则返回true
            logging.info("{} {} begin to start".format(threading.current_thread(),task.name))
        #适当的时机释放锁
        else:
            logging.info(" {} {} is working".format(threading.current_thread(),task.name))

class Task:
    def __init__(self,name):
        self.name = name
        self.lock = threading.Lock()

#构造10个任务
tasks  = [Task("task - {}".format(x)) for x in range(10)]

#启动5个线程
for i in range(5):
    threading.Thread(target=worker,name="worker - {}".format(i),args=(tasks,)).start()

可重入锁RLock

可重入锁,是线程相关的锁。

线程A获得可重复锁,并可以多次成功获取,不会阻塞,最后要在线程A中做和asquire次数相同的release.

import threading

lock = threading.RLock()

ret = lock.acquire()
print(ret)

ret = lock.acquire()
print(ret)

结果为:

True
True

没有释放锁,后面还是拿到了锁。

import threading
import time

lock = threading.RLock()
print(lock.acquire())
print("________________________")
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout = 3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False,timeout=10))#异常
lock.release()
lock.release()
lock.release()
lock.release()
print("main thread {}".format(threading.current_thread().ident))
print("locked in main thread {}".format(lock))#注意观察lock对象的信息
lock.release()
#lock.release()#多了一次
print("========================")
print()

print(lock.acquire(blocking=False))#1次
#threading.Timer(3,lambda  x:x.release(),args=(lock,)).start()#跨线程了,异常
lock.release()
print("```````````````````````````````````")
print()

#测试多线程
print(lock.acquire())
def sub(l):
    print("{}:{}".format(threading.current_thread(),l.acquire()))#阻塞
    print("{}:{}".format(threading.current_thread(), l.acquire(False)))
    print("lock in sub thread {}".format(lock))
    l.release()
    print("sub 1")
    l.release()
    print("sub 2")
    #l.release()#多了一次

threading.Timer(2,sub, args=(lock,)).start()#传入同一个lock对象
print("+++++++++++++++++++++++++++++")
print()

print(lock.acquire())

lock.release()
time.sleep(5)
print("释放主线程锁")
lock.release()

结果为:

True
________________________
True
True
True
True
main thread 12652
locked in main thread <locked _thread.RLock object owner=12652 count=1 at 0x000000000217B418>
========================


True
```````````````````````````````````


True
+++++++++++++++++++++++++++++


True
释放主线程锁
<Timer(Thread-1, started 15028)>:True
<Timer(Thread-1, started 15028)>:True
lock in sub thread <locked _thread.RLock object owner=15028 count=2 at 0x000000000217B418>
sub 1
sub 2

 

可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一个线程中不阻塞获取锁。当锁未释放完,其他线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。

Condition

构造方法Condition(lock = None),可以传入一个lock或rlock对象,默认是rlock。

acquire(*args):获取锁

wait(self,timeout = None):等待或超时

notify(n = 1):唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作。

notify_all():唤醒所有等待的线程

Conditin用于生产者,消费者模型,为了解决生产者消费者速度匹配问题。

先看一个例子,消费者消费速度大于生产者生产速度。

from threading import Thread,Event
import logging
import random

FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
logging.basicConfig(format = FORMAT,level=logging.INFO)

#此例只是为了演示,不考虑线程安全问题。

class Dispatcher():
    def __init__(self):
        self.data = None
        self.event = Event()#event只是为了使用方便,与逻辑无关

    def produce(self,total):
        for _  in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()

    def consum(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce,args=(10,),name="producer")
c = Thread(target=d.consum,name="consumer")
c.start()
p.start()

结果为:

2019-11-26 16:39:02,354 consumer (thread)d recieved None
2019-11-26 16:39:02,355 producer (thread)d 100
2019-11-26 16:39:02,855 consumer (thread)d recieved 100
2019-11-26 16:39:03,369 producer (thread)d 37
2019-11-26 16:39:03,369 consumer (thread)d recieved 37
2019-11-26 16:39:03,869 consumer (thread)d recieved None
2019-11-26 16:39:04,383 producer (thread)d 20
2019-11-26 16:39:04,383 consumer (thread)d recieved None
2019-11-26 16:39:04,884 consumer (thread)d recieved None
2019-11-26 16:39:05,397 producer (thread)d 65
2019-11-26 16:39:05,397 consumer (thread)d recieved 65
2019-11-26 16:39:05,897 consumer (thread)d recieved None
2019-11-26 16:39:06,411 consumer (thread)d recieved None
2019-11-26 16:39:06,412 producer (thread)d 9
2019-11-26 16:39:06,912 consumer (thread)d recieved 9
2019-11-26 16:39:07,425 producer (thread)d 66
2019-11-26 16:39:07,425 consumer (thread)d recieved 66
2019-11-26 16:39:07,925 consumer (thread)d recieved None
2019-11-26 16:39:08,439 consumer (thread)d recieved None
2019-11-26 16:39:08,439 producer (thread)d 67
2019-11-26 16:39:08,939 consumer (thread)d recieved 67
2019-11-26 16:39:09,453 consumer (thread)d recieved None
2019-11-26 16:39:09,453 producer (thread)d 58
2019-11-26 16:39:09,953 consumer (thread)d recieved 58
2019-11-26 16:39:10,467 producer (thread)d 24
2019-11-26 16:39:10,467 consumer (thread)d recieved 24
2019-11-26 16:39:10,967 consumer (thread)d recieved None
2019-11-26 16:39:11,481 consumer (thread)d recieved None
2019-11-26 16:39:11,481 producer (thread)d 61
2019-11-26 16:39:11,981 consumer (thread)d recieved 61

 

这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。

能否换成一种通知机制,有数据通知消费者来消费呢?

使用Condition对象

from threading import Thread, Event, Condition
import logging
import random

FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


# 此例只是为了演示,不考虑线程安全问题。

class Dispatcher():
    def __init__(self):
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)#模拟产生数据速度
        self.event.set()

    def consum(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()#阻塞等通知
                logging.info("recieved {}".format(self.data))
                self.data = None
            self.event.wait(0.5)


d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name="producer")
c = Thread(target=d.consum, name="consumer")
c.start()
p.start()

结果为:


2019-11-26 16:40:26,151 producer (thread)d 23
2019-11-26 16:40:26,151 consumer (thread)d recieved 23
2019-11-26 16:40:27,158 producer (thread)d 71
2019-11-26 16:40:27,158 consumer (thread)d recieved 71
2019-11-26 16:40:28,172 producer (thread)d 35
2019-11-26 16:40:28,172 consumer (thread)d recieved 35
2019-11-26 16:40:29,186 producer (thread)d 15
2019-11-26 16:40:29,186 consumer (thread)d recieved 15
2019-11-26 16:40:30,200 producer (thread)d 26
2019-11-26 16:40:30,201 consumer (thread)d recieved 26
2019-11-26 16:40:31,215 producer (thread)d 42
2019-11-26 16:40:31,215 consumer (thread)d recieved 42
2019-11-26 16:40:32,229 producer (thread)d 63
2019-11-26 16:40:32,229 consumer (thread)d recieved 63
2019-11-26 16:40:33,243 producer (thread)d 50
2019-11-26 16:40:33,243 consumer (thread)d recieved 50
2019-11-26 16:40:34,257 producer (thread)d 50
2019-11-26 16:40:34,257 consumer (thread)d recieved 50
2019-11-26 16:40:35,271 producer (thread)d 50
2019-11-26 16:40:35,271 consumer (thread)d recieved 50

 

上例中,消费者等待数据等待,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪。

如果是1个生产者,多个消费者怎么改?

from threading import Thread, Event, Condition
import logging
import random

FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


# 此例只是为了演示,不考虑线程安全问题。

class Dispatcher():
    def __init__(self):
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)#模拟产生数据速度
        self.event.set()

    def consum(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()#阻塞等通知
                logging.info("recieved {}".format(self.data))
            self.event.wait(0.5)#模拟消费速度


d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name="producer")
for i in range(5):
    c = Thread(target=d.consum,name="consumer-{}".format(i))
    c.start()
p.start()

2019-11-26 16:41:38,536 producer (thread)d 26
2019-11-26 16:41:38,536 consumer-4 (thread)d recieved 26
2019-11-26 16:41:38,536 consumer-3 (thread)d recieved 26
2019-11-26 16:41:38,536 consumer-0 (thread)d recieved 26
2019-11-26 16:41:38,537 consumer-2 (thread)d recieved 26
2019-11-26 16:41:38,537 consumer-1 (thread)d recieved 26
2019-11-26 16:41:39,543 producer (thread)d 80
2019-11-26 16:41:39,543 consumer-1 (thread)d recieved 80
2019-11-26 16:41:39,544 consumer-4 (thread)d recieved 80
2019-11-26 16:41:39,544 consumer-2 (thread)d recieved 80
2019-11-26 16:41:39,544 consumer-3 (thread)d recieved 80
2019-11-26 16:41:39,544 consumer-0 (thread)d recieved 80
2019-11-26 16:41:40,557 producer (thread)d 17
2019-11-26 16:41:40,557 consumer-4 (thread)d recieved 17
2019-11-26 16:41:40,557 consumer-2 (thread)d recieved 17
2019-11-26 16:41:40,557 consumer-0 (thread)d recieved 17
2019-11-26 16:41:40,558 consumer-1 (thread)d recieved 17
2019-11-26 16:41:40,558 consumer-3 (thread)d recieved 17
2019-11-26 16:41:41,571 producer (thread)d 73
2019-11-26 16:41:41,571 consumer-1 (thread)d recieved 73
2019-11-26 16:41:41,572 consumer-3 (thread)d recieved 73
2019-11-26 16:41:41,572 consumer-0 (thread)d recieved 73
2019-11-26 16:41:41,573 consumer-2 (thread)d recieved 73
2019-11-26 16:41:41,573 consumer-4 (thread)d recieved 73
2019-11-26 16:41:42,585 producer (thread)d 4
2019-11-26 16:41:42,585 consumer-3 (thread)d recieved 4
2019-11-26 16:41:42,585 consumer-2 (thread)d recieved 4
2019-11-26 16:41:42,586 consumer-0 (thread)d recieved 4
2019-11-26 16:41:42,586 consumer-4 (thread)d recieved 4
2019-11-26 16:41:42,586 consumer-1 (thread)d recieved 4
2019-11-26 16:41:43,599 producer (thread)d 100
2019-11-26 16:41:43,599 consumer-0 (thread)d recieved 100
2019-11-26 16:41:43,599 consumer-4 (thread)d recieved 100
2019-11-26 16:41:43,599 consumer-2 (thread)d recieved 100
2019-11-26 16:41:43,600 consumer-1 (thread)d recieved 100
2019-11-26 16:41:43,600 consumer-3 (thread)d recieved 100
2019-11-26 16:41:44,613 producer (thread)d 68
2019-11-26 16:41:44,614 consumer-1 (thread)d recieved 68
2019-11-26 16:41:44,614 consumer-2 (thread)d recieved 68
2019-11-26 16:41:44,615 consumer-0 (thread)d recieved 68
2019-11-26 16:41:44,615 consumer-4 (thread)d recieved 68
2019-11-26 16:41:44,616 consumer-3 (thread)d recieved 68
2019-11-26 16:41:45,626 producer (thread)d 8
2019-11-26 16:41:45,626 consumer-0 (thread)d recieved 8
2019-11-26 16:41:45,626 consumer-1 (thread)d recieved 8
2019-11-26 16:41:45,626 consumer-2 (thread)d recieved 8
2019-11-26 16:41:45,627 consumer-3 (thread)d recieved 8
2019-11-26 16:41:45,627 consumer-4 (thread)d recieved 8
2019-11-26 16:41:46,640 producer (thread)d 15
2019-11-26 16:41:46,640 consumer-0 (thread)d recieved 15
2019-11-26 16:41:46,641 consumer-4 (thread)d recieved 15
2019-11-26 16:41:46,641 consumer-3 (thread)d recieved 15
2019-11-26 16:41:46,641 consumer-2 (thread)d recieved 15
2019-11-26 16:41:46,641 consumer-1 (thread)d recieved 15
2019-11-26 16:41:47,661 producer (thread)d 48
2019-11-26 16:41:47,662 consumer-2 (thread)d recieved 48
2019-11-26 16:41:47,662 consumer-1 (thread)d recieved 48
2019-11-26 16:41:47,663 consumer-3 (thread)d recieved 48
2019-11-26 16:41:47,663 consumer-4 (thread)d recieved 48
2019-11-26 16:41:47,663 consumer-0 (thread)d recieved 48

 

self.cond.notify_all()#发通知

修改为self.cond.notify(2)

试一试看看效果?

这个例子,可以看到实现了消息的一对多,这其实就是广播模式。

注:上例中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解condition的使用,和生产者消费者模型。

Condition总结

Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

采用了通知机制,非常有效率。

使用方式

使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。

消费者wait,等待通知。

生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

 
原文地址:https://www.cnblogs.com/xpc51/p/11925053.html