chapter11.2、线程同步,Event,Lock,RLock,Condition,Barrier,Semaphore和全局解释器锁GIL

线程同步

概念

线程间的操作,需要协同完成任务,在访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

不同的系统实现技术不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件等

Event

Event事件,是线程间通信机制中最简单的实现,使用一个内部标记flag,通过flag的True或False的变化实现。方法有

set()

clear()

is_set()

wait(timeout=None)

使用event实现Timer的练习:

import threading

class Timer:

    def __init__(self,interval,fun,*args,**kwargs):
        self.interval = interval
        self.fun = fun
        self.e = threading.Event()
        self.args = args
        self.kwargs = kwargs

    def start(self):
        Thread(target=self.__run).start()

    def cancel(self):
        self.e.set()

    def __run(self):
        if not self.e.wait(self.interval):
            self.fun(*self.args,**self.kwargs)
        self.e.set()

def add(x,y):
    logging.info(x + y)

t = Timer(4,add,4,30)
t.start()

e = Event()
e.wait(2)
#t.cancel()

print('++++++')

Lock

锁,资源争抢一定会用到锁,从而保证只有一个使用者使用该资源。

方法:

acquire(self, blocking=True, timeout=-1)  默认阻塞,可以设置超时。非阻塞时,禁止设置timeout。成功获取锁,返回True,否则返回False。

release() 释放锁,可以从任何线程调用释放,已上锁的锁,无法release,无法release一个unlocked lock,会抛RuntimeError异常

from  threading import  Thread,Event,Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups = []
lock = Lock()

def worker(count:int,lock):
    logging.info("I'm working for U.")
    flag = False
    while True:
        lock.acquire()
        if len(cups) >= count:
            flag = True
        #lock.release()   #放在此处,并不能起锁定资源的作用
        time.sleep(0.001)
        if not flag:
            cups.append(1)
        lock.release()
        if flag:
            break
        #lock.release()  #放在此处不会执行后边的释放锁,锁不释放,变成死锁,其他线程全部阻塞
    logging.info(len(cups))

for i in range(10):
    Thread(target=worker,args=(1000,lock)).start()

加锁,解锁

如果加锁后,未解锁前,要执行一段代码,可能会出现异常,有异常时,锁无法释放,当前线程可能会因为异常终止,就会产生死锁。锁对象支持上下文管理可以使用with语句,也可以使用try...finally语句保证语句的释放。

如下代码中,c1取10,100,1000,线程数增加,此时取c2为10,100都会显示正确的结果,一旦c2取值到1000,就会输出非零的结果,且结果不固定,这是因为在执行 += 操作时,是可以被其他线程打断的。

from threading import Thread, Lock,Event
import threading

class Counter:
    def __init__(self):
        self._val = 0

    @property
    def value(self):
        return self._val

    def inc(self):
        self._val += 1

    def dec(self):
        self._val -= 1

def run(c:Counter,count=100):
    for _ in range(count):
        for i in range(-50,50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 1000
threads = []
for i in range(c1):
    t = Thread(target=run,args=(c,c2))
    t.start()
    threads.append(t)

# for t in threads:
#     t.join()

e = Event()
while not e.wait(1):
    if threading.active_count() == 1: ##确保除主线程外所有线程都执行完,再打印c.value
        print(c.value)
        e.set()
    else:
        print(threading.enumerate())

可以改为

import threading
from threading import Thread, Lock,Event
import time

class Counter:
    def __init__(self):
        self._val = 0
        self._lock = Lock()

    @property
    def value(self):
        with self._lock:
            return self._val

    def inc(self):
        with self._lock:
            self._val += 1

    def dec(self):
        try:
            self._lock.acquire()
            self._val -= 1
        finally:
            self._lock.release()

def run(c:Counter,count=100):
    for _ in range(count):
        for i in range(-50,50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 1000
threads = []
for i in range(c1):
    t = Thread(target=run,args=(c,c2))
    t.start()
    threads.append(t)

# for t in threads:
#     t.join()
e = Event()
while not e.wait(1):
    if threading.active_count() == 1:
        print(c.value)
        e.set()
    else:
        print(threading.enumerate())

锁的使用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源时不需要锁。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁
使用锁的注意事项:
  1、少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
    举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。
  2、加锁时间越短越好,不需要就立即释放锁
  3、一定要避免死锁
不使用锁,有了效率,但是结果是错的。使用了锁,效率低下,但是结果是对的。我们要正确的结果。

非阻塞的锁的使用

import threading
import logging
import time

FORMAT = '%(asctime)-15s	 [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False): # 获取锁则返回True
            logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))
            # 适当的时机释放锁,为了演示不释放
        else:
            logging.info('{} {} is working'.format(threading.current_thread(), task.name))
class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task('task-{}'.format(x)) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, name='worker-{}'.format(i), args=(tasks,)).start()

可重入的锁Rlock

线程相关的锁,也有叫递归锁的。

线程A获得可重复锁,并可以多次不阻塞的获取,其他线程不能获取,获取时就会阻塞,直到持有该锁的线程释放完锁。

condition

构造方法Condition(lock=None),可以传入一个Lock或者RLock对象,默认是RLock。

acquire(*args)  获取锁

wait(self,timeout=None)  等待或超时

notify(n=1)  唤醒至多指定数目的等待的线程,没有等待线程就没有任何操作

notify_all()  唤醒所有等待的程序

Condition 用于生产者,消费者模型,解决速度匹配问题。可以解决缓冲问题,不解决解耦问题。

消息一对全部,广播机制;消息一对多,多播机制;一对一,单播机制

以下的例子是为了演示,不考虑线程安全问题。

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event()  

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

该方法中消费者处于一直自己主动取数据的模式中,浪费时间。

可以改为通知模式,由生产者来通知消费者。

from threading import Thread, Event,Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify(5)
            self.event.wait(1)
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info("received {}".format(self.data))
                #self.data = None    ##只生产一份
            self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
for i in range(5):
    c = Thread(target=d.consume, name='consumer -{}'.format(i) )
    c.start()
p.start()

Condition总结

在生产者消费者模型中,要注意消费速度要大于生产速度,否者会造成数据积压

Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

采用了通知机制,非常有效率。

使用方式

  使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。

消费者wait,等待通知。

生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法

Barrier

翻译为栅栏,理解为屏障,路障,道闸都可以,3.2引入python

Barrier(parties, action=None, timeout=None)  构建Barrier对象,指定参与方数目。timeout是wait方法未指定超时的默认值

n_waiting   当前在屏障中等待的线程数

parties   各方数,就是需要多少个等待

wait(timeout=None)  等待通过屏障。返回0到线程数-1的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态

import threading
import logging

# 输出格式定义
FORMAT = '%(asctime)-15s	 [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(4)

for x in range(3):  # 改成4、5、6试一试
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()
logging.info('started')

上例中,屏障后等待的线程到达parties的值时,就会运行在等待的一批线程,然后继续等待下一批的线程数目足够。

类似于赛马比赛的闸,等所有马匹就位,就开闸。下一批陆续来到闸门前等待。

broken   如果屏障处于打破的状态,返回True
abort()  将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出brokenBarrierError异常,直到reset方法来恢复屏障
reset()   恢复屏障,重新开始拦截

import threading
import logging

# 输出格式定义
FORMAT = '%(asctime)-15s	 [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier. run.')

barrier = threading.Barrier(3)
for x in range(0, 10):
    if x == 2:
        barrier.abort()
    elif x == 6:
        barrier.reset()
    threading.Event().wait(1)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程。

wait方法超时发生时,屏障将会处于broken状态,直到被reset

import threading
import logging

# 输出格式定义
FORMAT = '%(asctime)-15s	 [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(barrier: threading.Barrier, i: int):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        logging.info(barrier.broken)  # 是否broken
        if i < 3:
            barrier_id = barrier.wait(1)  # 超时后,屏障broken
        else:
            if i == 6:
                barrier.reset()  # 恢复屏障
            barrier_id = barrier.wait()  # 永久等待
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier. run.')

barrier = threading.Barrier(3)

for x in range(0, 9):
    threading.Event().wait(2)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier, x)).start()

Barrier应用

并发初始化

所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完,就开始运行,将不能正常工作。
10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程。
例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出。

工作量
有10个计算任务,完成6个,就算工作完成。

semaphore信号量

semaphore也有一个到计数器,每次acquire都会减一,当acquire方法发现计数为0及阻塞请求的线程,直到其他的线程释放后,计数大于0,才恢复阻塞的线程。

Semaphore(value=1)   构造方法。value小于0,抛ValueError异常

acquire(blocking=True, timeout=None)   获取信号量,计数器减1,获取成功返回True

release()   释放信号量,计数器加1

计数器永远不会小于0,到0就阻塞了

import threading
import logging
import time

# 输出格式定义
FORMAT = '%(asctime)-15s	 [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(s: threading.Semaphore):
    logging.info('in sub thread')
    logging.info(s.acquire())  # 阻塞
    logging.info('sub thread over')

# 信号量
s = threading.Semaphore(2)
logging.info(s.acquire())
print(s._value)
logging.info(s.acquire())
print(s._value)

threading.Thread(target=worker, args=(s,)).start()

time.sleep(2)

logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3))  # 阻塞3秒

# 释放
logging.info('released')
s.release()

可以测试,多释放几次信号量,value的值就变得比设置的value值大了。

解决这个问题,就可以使用

BoundedSemaphore类

有界的信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常。

应用举例

连接池,资源有限,且开启一个连接成本较高,所以,使用连接池。

实现一个简单的连接池,有容量,有工厂方法获得链接,能够返回不用的连接,供其他调用者使用。正真的连接池很复杂,所以只是为了实验,不考虑很多问题。

import threading,logging,random

class Conn:
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return self.name

class Pool:
    def __init__(self, count: int):
        self.count = count
        # 池中是连接对象的列表
        self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]
        self.semaphore = threading.Semaphore(count)

    def _connect(self, conn_name):
        # 创建连接的方法,返回一个名称
        return Conn(conn_name)

    def get_conn(self):
        # 从池中拿走一个连接
        print("----------------")
        self.semaphore.acquire()
        print("++++++++++++++++++++")
        conn = self.pool.pop()
        return conn

    def return_conn(self, conn: Conn):
        # 向池中添加一个连接
        self.pool.append(conn)
        self.semaphore.release()

pool = Pool(3)

def worker(pool:Pool):
    conn = pool.get_conn()
    logging.info(conn)
    threading.Event().wait(random.randint(1,4))
    pool.return_conn(conn)

for i in range(6):
    threading.Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()

上例中,使用信号量解决资源有限的问题。

如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。

注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。

self.pool.append(conn) 这一句有哪些问题考虑?

1、边界问题分析

return_conn方法可以单独执行,有可能多归还连接,也就是会多release,所以,要用有界信号量
BoundedSemaphore类。
这样用有界信号量修改源代码,保证如果多return_conn就会抛异常。

假设一种极端情况,计数器还差1就归还满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。

因此信号量,可以保证,一定不能多归还。
如果归还了同一个连接多次怎么办,重复很容易判断。
这个程序还不能判断这些连接是不是原来自己创建的,这不是生成环境用的代码,只是简单演示。

2、正常使用分析

正常使用信号量,都会先获取信号量,然后用完归还。
创建很多线程,都去获取信号量,没有获得信号量的线程都阻塞。能归还的线程都是前面获取到信号量的线程,其他没有获得线程都阻塞着。非阻塞的线程append后才release,这时候等待的线程被唤醒,才能pop,也就是没有获取信号量就不能pop,这是安全的。
经过上面的分析,信号量比计算列表长度好,线程安全。

信号量和锁

锁,只允许同一个时间一个线程独占资源。它是特殊的信号量,即信号量计数器初值为1。

信号量,可以多个线程访问共享资源,但这个共享资源数量有限。

锁,可以看做特殊的信号量

GIL全局解释器锁

CPython 在解释器进程级别有一把锁,叫做GIL 全局解释器锁。

GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也是只能允许一个CPU上的一个线程在运行。

CPython中

IO密集型,由于线程阻塞,就会调度其他线程;

CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。

在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL。

新版CPython正在努力优化GIL的问题,但不是移除。

如果在意多线程的效率问题,请绕行,选择其它语言erlang、Go等。

Python中绝大多数内置数据结构的读、写操作都是原子操作。
由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身 不是线程安全类型。
保留GIL的原因:
Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。
而且移除GIL,会降低CPython单线程的执行效率。

原文地址:https://www.cnblogs.com/rprp789/p/9784352.html