python多线程编程—同步原语入门(锁Lock、信号量(Bounded)Semaphore)

摘录python核心编程

一般的,多线程代码中,总有一些特定的函数或者代码块不希望(或不应该)被多个线程同时执行(比如两个线程运行的顺序发生变化,就可能造成代码的执行轨迹或者行为不相同,或者产生不一致的数据),比如修改数据库、更新文件或其他会产生竞态条件的类似情况。此时就需要同步了。

同步:任意数量的线程可以访问临界区的代码,但在给定的时刻又只有一个线程可以通过时。

这里介绍两个基本的同步类型原语:锁/互斥、信号量

锁有两种状态:锁定和未锁定。与之对应的是两个函数:获得锁和释放锁。

  当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码;所有之后到达的线程都将被阻塞,直到第一个线程执行结束,退出临界区,并释放锁。此时其他的线程可以获得锁并进入临界区。注意:那些被阻塞的线程是没有顺序的(并不是先到先得),意味着下一个获得锁的线程的顺序并不是确定的。

mtsleepF.py脚本中派生了随机数量的线程(没有使用锁):

from atexit import register
from random import randrange
from threading import Thread,currentThread
from time import ctime,sleep

#自定义一个集合对象,重写__str__方法
class CleanOutputSet(set):
    def __str__(self):
        return ', '.join(x for x in self)

#列表生成式   randrange()用于生成一个随机数,range()返回一个列表
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()

def loop(nsec):
    myname = currentThread().name
    remaining.add(myname)
    print('[%s] 开始了 %s' % (ctime(),myname))
    sleep(nsec)
    remaining.remove(myname)
    print('[%s] 结束了 %s (%s second)' % (ctime(),myname,nsec))
    print('  (还存在:%s)' % (remaining or 'NONE'))

def _main():
    #创建3~6个线程,每个线程睡眠2~4秒
    for pause in loops:
        Thread(target = loop,args = (pause,)).start()
#装饰器,在脚本的最后执行       
@register
def _atexit():
    print('所有的完成于:',ctime())
    
if __name__ == '__main__':
    _main()

正常情况下,执行的结果:

PS C:UsersWC> python E:Python3.6.3workspacemtsleepF.py
[Mon Apr 16 17:47:31 2018] 开始了 Thread-1
[Mon Apr 16 17:47:31 2018] 开始了 Thread-2
[Mon Apr 16 17:47:31 2018] 开始了 Thread-3
[Mon Apr 16 17:47:31 2018] 开始了 Thread-4
[Mon Apr 16 17:47:33 2018] 结束了 Thread-1 (2 second)
  (还存在:Thread-4, Thread-3, Thread-2)
[Mon Apr 16 17:47:33 2018] 结束了 Thread-2 (2 second)
  (还存在:Thread-4, Thread-3)
[Mon Apr 16 17:47:34 2018] 结束了 Thread-3 (3 second)
  (还存在:Thread-4)
[Mon Apr 16 17:47:34 2018] 结束了 Thread-4 (3 second)
  (还存在:NONE)
所有的完成于: Mon Apr 16 17:47:34 2018

我们多运行几次,有时会得到下面错乱的结果:

PS C:UsersWC> python E:Python3.6.3workspacemtsleepF.py
[Mon Apr 16 17:50:09 2018] 开始了 Thread-1
[Mon Apr 16 17:50:09 2018] 开始了 Thread-2
[Mon Apr 16 17:50:09 2018] 开始了 Thread-3
[Mon Apr 16 17:50:12 2018] 结束了 Thread-3 (3 second)
  (还存在:Thread-2, Thread-1)
[Mon Apr 16 17:50:13 2018] 结束了 Thread-1 (4 second)
[Mon Apr 16 17:50:13 2018] 结束了 Thread-2 (4 second)
  (还存在:NONE)
  (还存在:NONE)
所有的完成于: Mon Apr 16 17:50:13 2018

我们发现输出存在部分混乱的情况(多个线程可能并行执行IO),还有就是两个线程修改同一个变量(剩余线程名集合)。IO和访问相同的数据结构都属于临界区,因此需要引入锁防止多个线程同时进入临界区。

下面是引入锁的脚本实例(mtsleepG.py):

# python 3.6
from atexit import register
from random import randrange
from threading import Thread,Lock,currentThread #2.6版本后重命名为current_thread()
from time import ctime,sleep

#自定义一个集合类,重写—__str__方法,将默认输出改变为将其所有元素按照逗号分隔的字符串
class CleanOutputSet(set):
    def __str__(self):
        return ', '.join(x for x in self)
#三个全局变量   
lock = Lock()#
loops = (randrange(2,5) for x in range(randrange(3,7)))#随机数量的线程(3~6个),每个线程暂停2~4秒
remaining = CleanOutputSet()#自定义集合类的实例

def loop(nsec):
    myname = currentThread().name#获得当前线程的名称
    lock.acquire()#获取锁,阻止其他线程进入到临界区
    remaining.add(myname)#将线程名添加到集合中
    print('[%s] 开始 %s' % (ctime(),myname))
    lock.release()#释放锁
    sleep(nsec)#线程睡眠操作
    lock.acquire()#重新获得锁
    remaining.remove(myname)#从集合中删除当前线程
    print('[%s] 完成 %s (%s secs)' % (ctime(),myname,nsec))
    print('   (remaining: %s )' % (remaining or 'NONE'))
    lock.release()#最后释放锁
    
def _main():    #main函数前面添加‘_’是为了不在其他地方使用而导入。_main只能在命令行模式下才能执行
    for pause in loops:
        Thread(target = loop,args = (pause,)).start()   #循环派生并执行每个线程
#装饰器,注册_atexit()函数,使得解释器在脚本退出的时候执行此函数      
@register
def _atexit():
    print('所有线程完成于:',ctime())
    
if __name__ == '__main__':
    _main()

多次执行,结果没有再出现混乱的情况:

PS C:UsersWC> python E:Python3.6.3workspacemtsleepG.py
[Tue Apr 17 19:54:31 2018] 开始 Thread-1
[Tue Apr 17 19:54:31 2018] 开始 Thread-2
[Tue Apr 17 19:54:31 2018] 开始 Thread-3
[Tue Apr 17 19:54:31 2018] 开始 Thread-4
[Tue Apr 17 19:54:31 2018] 开始 Thread-5
[Tue Apr 17 19:54:31 2018] 开始 Thread-6
[Tue Apr 17 19:54:33 2018] 完成 Thread-1 (2 secs)
   (remaining: Thread-5, Thread-3, Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:33 2018] 完成 Thread-5 (2 secs)
   (remaining: Thread-3, Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:34 2018] 完成 Thread-3 (3 secs)
   (remaining: Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:34 2018] 完成 Thread-2 (3 secs)
   (remaining: Thread-4, Thread-6 )
[Tue Apr 17 19:54:35 2018] 完成 Thread-4 (4 secs)
   (remaining: Thread-6 )
[Tue Apr 17 19:54:35 2018] 完成 Thread-6 (4 secs)
   (remaining: NONE )
所有线程完成于: Tue Apr 17 19:54:35 2018

信号量

当情况更加复杂的时候,还可以考虑使用信号量这个同步原语来代替锁。

信号量(Semaphore),是一个计数器,当资源消耗时递减(调用acquire),计数器会减1;当资源释放是递增(调用release),计数器会加1。计数器的值不会小于0;当等于0的时候,再调用acquire会阻塞,直到其他线程调用release为止。可以认为信号量代表他们的资源可用或不可用。

两个函数简介如下:

acquire(blocking=布尔值,timeout=None):

  • 本方法用于获得Semaphore
  • blocking默认值是True,此时,如果内部计数器值大于0,则减一,并返回;如果等于0,则阻塞,等待其他线程调用release()以使计数器加1;本方法返回True,或无线阻塞
  • 如果blocking=False,则不阻塞,如若获取失败,则返回False
  • 当设定了timeout的值,最多阻塞timeout秒,如果超时,返回False。

release():

  • 释放Semaphore,内部计数器加1,可以唤醒等待的线程

BoundedSemaphore正好和Semaphore相反:一个工厂函数,返回一个新的有界信号量对象。有界信号量会确保他的值不会超过初始值;如果超出则会抛出ValueError异常。初始值默认为1。

消耗资源使计数器递减的操作习惯上成为P(),也称为wait、try、acquire、pend、procure.

相对的,当一个线程对一个资源完成操作时,该资源需要返回资源池中,这种操作一般称为V(),也称为signal、increment、release、post、Vacate。

python简化了所有的命名,使用和锁的函数一样的名字:acquire和release。信号量比锁更加灵活,因为可以有多个线程,每个线程拥有有限资源的一个实例

下面,我们模仿一个简化的糖果机:该糖果机中只有5个可用的槽来保持库存(糖果),如果所有的槽都满了,糖果就不能再加到这个机器中了;相似的,如果每个槽都空了,消费者就不能再购买到了。我们使用信号量来追踪这些有限的资源(糖果槽)。

脚本实例candy.py:

#python 3.6
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread#增加了信号量
from time import sleep,ctime
#3个全局变量
lock = Lock()#
MAX = 5 #表示库存糖果最大值的常量
candytray = BoundedSemaphore(MAX)#‘糖果托盘’,一个信号量
#向库存中添加糖果。这段代码是一个临界区,输出用户的行动,并在糖果超过最大库存的时候给出警告
def refill():
    lock.acquire()
    print('重装糖果……')
    try:
        candytray.release()
    except ValueError:
        print('满了,跳过')
    else:
        print('成功')
    lock.release()
#购买糖果。也是一个临界区,效果和refill函数相反   
def buy():
    lock.acquire()
    print('购买糖果中………')
    #检查是否所有的资源都已经消费完。
    #计数器的值不能小于0,所以这个调用一般会在计数器再次增加之前被阻塞。传入非阻塞标志False,让调用不再阻塞,而在应当阻塞的时候返回一个false,表示没有更多资源了。
    if candytray.acquire(False): 
        print('成功')
    else:
        print('空,跳过')
    lock.release()
#模拟糖果机的所有者   
def producer(loops):
    for i  in range(loops):
        refill()
        sleep(randrange(3))
#模拟消费者    
def consumer(loops):
    for i in range(loops):
        buy()
        sleep(randrange(3))
#_main表示从命令行执行    
def _main():
    print('开始于:',ctime())
    nloops = randrange(2,6)
    print('糖果机(一共 %s 个槽)' % MAX)
    #创建消费者和所有者线程
    #其中消费者线程中,增加了额外的操作,用于随机给出正偏差,使得消费者真正消费的糖果数可能会比供应者放入机器的更多;否则代码永远不会进入消费者尝试从空机器购买糖果的情况
    Thread(target = consumer,args = (randrange(nloops,nloops+MAX+2),)).start()#
    Thread(target = producer,args = (nloops,)).start()
#注册退出函数
@register
def _atexit():
    print('结束于:',ctime())
    
if __name__ == '__main__':
    _main()

执行结果类似:

PS C:UsersWC> python E:Python3.6.3workspacecandy.py
开始于: Wed Apr 18 19:56:19 2018
糖果机(一共 5 个槽)
购买糖果中………
成功
购买糖果中………
成功
重装糖果……
成功
重装糖果……
成功
重装糖果……
满了,跳过
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
空,跳过
购买糖果中………
空,跳过
结束于: Wed Apr 18 19:56:27 2018
原文地址:https://www.cnblogs.com/hiwuchong/p/8858857.html