线程同步
概念
线程间的操作,需要协同完成任务,在访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。
不同的系统实现技术不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件等
Event
Event事件,是线程间通信机制中最简单的实现,使用一个内部标记flag,通过flag的True或False的变化实现。方法有
set()
clear()
is_set()
wait(timeout=None)
使用event实现Timer的练习:
import threading class Timer: def __init__(self,interval,fun,*args,**kwargs): self.interval = interval self.fun = fun self.e = threading.Event() self.args = args self.kwargs = kwargs def start(self): Thread(target=self.__run).start() def cancel(self): self.e.set() def __run(self): if not self.e.wait(self.interval): self.fun(*self.args,**self.kwargs) self.e.set() def add(x,y): logging.info(x + y) t = Timer(4,add,4,30) t.start() e = Event() e.wait(2) #t.cancel() print('++++++')
Lock
锁,资源争抢一定会用到锁,从而保证只有一个使用者使用该资源。
方法:
acquire(self, blocking=True, timeout=-1) 默认阻塞,可以设置超时。非阻塞时,禁止设置timeout。成功获取锁,返回True,否则返回False。
release() 释放锁,可以从任何线程调用释放,已上锁的锁,无法release,无法release一个unlocked lock,会抛RuntimeError异常
from threading import Thread,Event,Lock import logging import time FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORMAT,level=logging.INFO) cups = [] lock = Lock() def worker(count:int,lock): logging.info("I'm working for U.") flag = False while True: lock.acquire() if len(cups) >= count: flag = True #lock.release() #放在此处,并不能起锁定资源的作用 time.sleep(0.001) if not flag: cups.append(1) lock.release() if flag: break #lock.release() #放在此处不会执行后边的释放锁,锁不释放,变成死锁,其他线程全部阻塞 logging.info(len(cups)) for i in range(10): Thread(target=worker,args=(1000,lock)).start()
加锁,解锁
如果加锁后,未解锁前,要执行一段代码,可能会出现异常,有异常时,锁无法释放,当前线程可能会因为异常终止,就会产生死锁。锁对象支持上下文管理可以使用with语句,也可以使用try...finally语句保证语句的释放。
如下代码中,c1取10,100,1000,线程数增加,此时取c2为10,100都会显示正确的结果,一旦c2取值到1000,就会输出非零的结果,且结果不固定,这是因为在执行 += 操作时,是可以被其他线程打断的。
from threading import Thread, Lock,Event import threading class Counter: def __init__(self): self._val = 0 @property def value(self): return self._val def inc(self): self._val += 1 def dec(self): self._val -= 1 def run(c:Counter,count=100): for _ in range(count): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 1000 threads = [] for i in range(c1): t = Thread(target=run,args=(c,c2)) t.start() threads.append(t) # for t in threads: # t.join() e = Event() while not e.wait(1): if threading.active_count() == 1: ##确保除主线程外所有线程都执行完,再打印c.value print(c.value) e.set() else: print(threading.enumerate())
可以改为
import threading from threading import Thread, Lock,Event import time class Counter: def __init__(self): self._val = 0 self._lock = Lock() @property def value(self): with self._lock: return self._val def inc(self): with self._lock: self._val += 1 def dec(self): try: self._lock.acquire() self._val -= 1 finally: self._lock.release() def run(c:Counter,count=100): for _ in range(count): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 1000 threads = [] for i in range(c1): t = Thread(target=run,args=(c,c2)) t.start() threads.append(t) # for t in threads: # t.join() e = Event() while not e.wait(1): if threading.active_count() == 1: print(c.value) e.set() else: print(threading.enumerate())
锁的使用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源时不需要锁。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁
使用锁的注意事项:
1、少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。
2、加锁时间越短越好,不需要就立即释放锁
3、一定要避免死锁
不使用锁,有了效率,但是结果是错的。使用了锁,效率低下,但是结果是对的。我们要正确的结果。
非阻塞的锁的使用
import threading import logging import time FORMAT = '%(asctime)-15s [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(tasks): for task in tasks: time.sleep(0.001) if task.lock.acquire(False): # 获取锁则返回True logging.info('{} {} begin to start'.format(threading.current_thread(), task.name)) # 适当的时机释放锁,为了演示不释放 else: logging.info('{} {} is working'.format(threading.current_thread(), task.name)) class Task: def __init__(self, name): self.name = name self.lock = threading.Lock() tasks = [Task('task-{}'.format(x)) for x in range(10)] for i in range(5): threading.Thread(target=worker, name='worker-{}'.format(i), args=(tasks,)).start()
可重入的锁Rlock
线程相关的锁,也有叫递归锁的。
线程A获得可重复锁,并可以多次不阻塞的获取,其他线程不能获取,获取时就会阻塞,直到持有该锁的线程释放完锁。
condition
构造方法Condition(lock=None),可以传入一个Lock或者RLock对象,默认是RLock。
acquire(*args) 获取锁
wait(self,timeout=None) 等待或超时
notify(n=1) 唤醒至多指定数目的等待的线程,没有等待线程就没有任何操作
notify_all() 唤醒所有等待的程序
Condition 用于生产者,消费者模型,解决速度匹配问题。可以解决缓冲问题,不解决解耦问题。
消息一对全部,广播机制;消息一对多,多播机制;一对一,单播机制
以下的例子是为了演示,不考虑线程安全问题。
from threading import Thread, Event import logging import random FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) class Dispatcher: def __init__(self): self.data = None self.event = Event() def produce(self, total): for _ in range(total): data = random.randint(0, 100) logging.info(data) self.data = data self.event.wait(1) self.event.set() def consume(self): while not self.event.is_set(): data = self.data logging.info("recieved {}".format(data)) self.data = None self.event.wait(0.5) d = Dispatcher() p = Thread(target=d.produce, args=(10,), name='producer') c = Thread(target=d.consume, name='consumer') c.start() p.start()
该方法中消费者处于一直自己主动取数据的模式中,浪费时间。
可以改为通知模式,由生产者来通知消费者。
from threading import Thread, Event,Condition import logging import random FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) class Dispatcher: def __init__(self): self.data = None self.event = Event() # event只是为了使用方便,与逻辑无关 self.cond = Condition() def produce(self, total): for _ in range(total): data = random.randint(0, 100) with self.cond: logging.info(data) self.data = data self.cond.notify(5) self.event.wait(1) self.event.set() def consume(self): while not self.event.is_set(): with self.cond: self.cond.wait() logging.info("received {}".format(self.data)) #self.data = None ##只生产一份 self.event.wait(0.5) d = Dispatcher() p = Thread(target=d.produce, args=(10,), name='producer') for i in range(5): c = Thread(target=d.consume, name='consumer -{}'.format(i) ) c.start() p.start()
Condition总结
在生产者消费者模型中,要注意消费速度要大于生产速度,否者会造成数据积压
Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采用了通知机制,非常有效率。
使用方式
使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。
消费者wait,等待通知。
生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法
Barrier
翻译为栅栏,理解为屏障,路障,道闸都可以,3.2引入python
Barrier(parties, action=None, timeout=None) 构建Barrier对象,指定参与方数目。timeout是wait方法未指定超时的默认值
n_waiting 当前在屏障中等待的线程数
parties 各方数,就是需要多少个等待
wait(timeout=None) 等待通过屏障。返回0到线程数-1的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态
import threading import logging # 输出格式定义 FORMAT = '%(asctime)-15s [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier: threading.Barrier): logging.info('waiting for {} threads.'.format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier') barrier = threading.Barrier(4) for x in range(3): # 改成4、5、6试一试 threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start() logging.info('started')
上例中,屏障后等待的线程到达parties的值时,就会运行在等待的一批线程,然后继续等待下一批的线程数目足够。
类似于赛马比赛的闸,等所有马匹就位,就开闸。下一批陆续来到闸门前等待。
broken 如果屏障处于打破的状态,返回True
abort() 将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出brokenBarrierError异常,直到reset方法来恢复屏障
reset() 恢复屏障,重新开始拦截
import threading import logging # 输出格式定义 FORMAT = '%(asctime)-15s [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier: threading.Barrier): logging.info('waiting for {} threads.'.format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier. run.') barrier = threading.Barrier(3) for x in range(0, 10): if x == 2: barrier.abort() elif x == 6: barrier.reset() threading.Event().wait(1) threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()
上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程。
wait方法超时发生时,屏障将会处于broken状态,直到被reset
import threading import logging # 输出格式定义 FORMAT = '%(asctime)-15s [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier: threading.Barrier, i: int): logging.info('waiting for {} threads.'.format(barrier.n_waiting)) try: logging.info(barrier.broken) # 是否broken if i < 3: barrier_id = barrier.wait(1) # 超时后,屏障broken else: if i == 6: barrier.reset() # 恢复屏障 barrier_id = barrier.wait() # 永久等待 logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier. run.') barrier = threading.Barrier(3) for x in range(0, 9): threading.Event().wait(2) threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier, x)).start()
Barrier应用
并发初始化
所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完,就开始运行,将不能正常工作。
10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程。
例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出。
工作量
有10个计算任务,完成6个,就算工作完成。
semaphore信号量
semaphore也有一个到计数器,每次acquire都会减一,当acquire方法发现计数为0及阻塞请求的线程,直到其他的线程释放后,计数大于0,才恢复阻塞的线程。
Semaphore(value=1) 构造方法。value小于0,抛ValueError异常
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True
release() 释放信号量,计数器加1
计数器永远不会小于0,到0就阻塞了
import threading import logging import time # 输出格式定义 FORMAT = '%(asctime)-15s [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(s: threading.Semaphore): logging.info('in sub thread') logging.info(s.acquire()) # 阻塞 logging.info('sub thread over') # 信号量 s = threading.Semaphore(2) logging.info(s.acquire()) print(s._value) logging.info(s.acquire()) print(s._value) threading.Thread(target=worker, args=(s,)).start() time.sleep(2) logging.info(s.acquire(False)) logging.info(s.acquire(timeout=3)) # 阻塞3秒 # 释放 logging.info('released') s.release()
可以测试,多释放几次信号量,value的值就变得比设置的value值大了。
解决这个问题,就可以使用
BoundedSemaphore类
有界的信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常。
应用举例
连接池,资源有限,且开启一个连接成本较高,所以,使用连接池。
实现一个简单的连接池,有容量,有工厂方法获得链接,能够返回不用的连接,供其他调用者使用。正真的连接池很复杂,所以只是为了实验,不考虑很多问题。
import threading,logging,random class Conn: def __init__(self, name): self.name = name def __repr__(self): return self.name class Pool: def __init__(self, count: int): self.count = count # 池中是连接对象的列表 self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)] self.semaphore = threading.Semaphore(count) def _connect(self, conn_name): # 创建连接的方法,返回一个名称 return Conn(conn_name) def get_conn(self): # 从池中拿走一个连接 print("----------------") self.semaphore.acquire() print("++++++++++++++++++++") conn = self.pool.pop() return conn def return_conn(self, conn: Conn): # 向池中添加一个连接 self.pool.append(conn) self.semaphore.release() pool = Pool(3) def worker(pool:Pool): conn = pool.get_conn() logging.info(conn) threading.Event().wait(random.randint(1,4)) pool.return_conn(conn) for i in range(6): threading.Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()
上例中,使用信号量解决资源有限的问题。
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。
注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。
self.pool.append(conn) 这一句有哪些问题考虑?
1、边界问题分析
return_conn方法可以单独执行,有可能多归还连接,也就是会多release,所以,要用有界信号量
BoundedSemaphore类。
这样用有界信号量修改源代码,保证如果多return_conn就会抛异常。
假设一种极端情况,计数器还差1就归还满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。
因此信号量,可以保证,一定不能多归还。
如果归还了同一个连接多次怎么办,重复很容易判断。
这个程序还不能判断这些连接是不是原来自己创建的,这不是生成环境用的代码,只是简单演示。
2、正常使用分析
正常使用信号量,都会先获取信号量,然后用完归还。
创建很多线程,都去获取信号量,没有获得信号量的线程都阻塞。能归还的线程都是前面获取到信号量的线程,其他没有获得线程都阻塞着。非阻塞的线程append后才release,这时候等待的线程被唤醒,才能pop,也就是没有获取信号量就不能pop,这是安全的。
经过上面的分析,信号量比计算列表长度好,线程安全。
信号量和锁
锁,只允许同一个时间一个线程独占资源。它是特殊的信号量,即信号量计数器初值为1。
信号量,可以多个线程访问共享资源,但这个共享资源数量有限。
锁,可以看做特殊的信号量
GIL全局解释器锁
CPython 在解释器进程级别有一把锁,叫做GIL 全局解释器锁。
GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也是只能允许一个CPU上的一个线程在运行。
CPython中
IO密集型,由于线程阻塞,就会调度其他线程;
CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。
在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL。
新版CPython正在努力优化GIL的问题,但不是移除。
如果在意多线程的效率问题,请绕行,选择其它语言erlang、Go等。
Python中绝大多数内置数据结构的读、写操作都是原子操作。
由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身 不是线程安全类型。
保留GIL的原因:
Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。
而且移除GIL,会降低CPython单线程的执行效率。