并发编程之线程进阶

一、互斥锁

多进程中数据不安全,所以要加锁。

多线程虽然有GIL锁,但是由于GIL锁轮转的策略(多线程之间时间片的轮转),仍存在数据不安全的情况,但是相对几率较低。

GIL锁轮转的策略:早期执行700条指令(不是700行,+= 操作相当于4条指令),现在是执行一个时间片时间,当前线程会让出cpu给其他线程使用。

dis模块中的方法可以查看某个操作对应的cpu指令

 

解决线程之间的数据安全的问题:

①多线程中,不在线程中操作全局变量

②涉及+=,-=,lis[0]+1,类似的操作一定要加锁

③列表、字典自带的方法都是线程安全的

④队列也是数据安全的

 

线程不安全的案例

from threading import Thread

count = 0
def fun_add():
    global count
    for i in range(100000):
        count += 1

def fun_sub():
    global count
    for i in range(100000):
        count -= 1

t_lis = []
for i in range(10):
    t1 = Thread(target=fun_add)
    t1.start()
    t_lis.append(t1)
    t2 = Thread(target=fun_sub)
    t2.start()
    t_lis.append(t2)
for t in t_lis:
    t.join()
print(count)            # -98445

 

使用互斥锁解决线程安全问题(操作的指令都加上锁)

from threading import Thread,Lock

count = 0
def fun_add(lock):
    global count
    for i in range(100000):
        lock.acquire()
        count += 1
        lock.release()

def fun_sub(lock):
    global count
    for i in range(100000):
        lock.acquire()
        count -= 1
        lock.release()

t_lis = []
lock = Lock()                   # 创建锁对象
for i in range(10):
    t1 = Thread(target=fun_add,args=(lock,))
    t1.start()
    t_lis.append(t1)
    t2 = Thread(target=fun_sub,args=(lock,))
    t2.start()
    t_lis.append(t2)
for t in t_lis:
    t.join()                    # 等待所有的子线程执行完
print(count)                    # 0

 

二、递归锁

当在并发的情况下使用两把锁,会造成死锁的现象。一个线程抢占到一把锁,另一个线程抢占到另一把锁,而操作需要同时抢占两把锁才能执行操作。

解决方案:递归锁

多少个acquire上锁,就要有多少个release释放锁,一个线程先acquire后,其他的线程只能等着。这个锁好比像一串钥匙。

 

递归锁和互斥锁的区别:

互斥锁是两把锁多个线程抢占,而递归锁是一把锁多个线程抢占

在一个线程里,用多个锁的时候,用递归锁实例化一个锁,acquire多次

在一个线程里,只用一个所的时候,用互斥锁为了提高效率,在锁多个资源的时候,应该酌情选用互斥锁,用完一个资源应该马上释放

 

递归锁能够快速的解决死锁问题,但是递归锁并不是一个好的解决方案,死锁现象的发生不是互斥锁的问题,而是代码的逻辑问题,递归锁只是临时快速解决死锁的有效方案,解决时只需将递归锁替换互斥锁。后续需要将递归锁重新替换成互斥锁,完善代码的逻辑,并且提高代码的效率

多线程之间,用完一个资源再用另一个资源,应该先释放一个资源再去获取一个资源的锁

 

经典死锁案例:科学家吃面(互斥锁)

from threading import Thread,Lock
import time

noodles_lock = Lock()
fork_lock = Lock()
def eat1(name,i,):
    fork_lock.acquire()
    print('%s%s拿到叉子'%(name,i))
    noodles_lock.acquire()
    print('%s%s拿到面条'%(name,i))
    print('%s%s吃面'%(name,i))
    time.sleep(0.5)
    fork_lock.release()
    print('%s%s放下叉子' % (name, i))
    noodles_lock.release()
    print('%s%s放下面条' % (name, i))

def eat2(name,i,):
    noodles_lock.acquire()
    print('%s%s拿到面条'%(name,i))
    fork_lock.acquire()
    print('%s%s拿到叉子' % (name, i))
    print('%s%s吃面'%(name,i))
    time.sleep(0.5)
    noodles_lock.release()
    print('%s%s放下面条' % (name, i))
    fork_lock.release()
    print('%s%s放下叉子' % (name, i))

for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start()
for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start()

'''
科学家1拿到叉子
科学家1拿到面条
科学家1吃面
科学家1放下叉子
科学家1放下面条
科学家3拿到面条
科学家2拿到叉子
'''

 

递归锁解决方案

from threading import Thread,RLock
import time

fork_lock = noodles_lock = RLock()          # 递归锁
def eat1(name,i,):
    fork_lock.acquire()
    print('%s%s拿到叉子'%(name,i))
    noodles_lock.acquire()
    print('%s%s拿到面条'%(name,i))
    print('%s%s吃面'%(name,i))
    time.sleep(0.5)
    fork_lock.release()
    print('%s%s放下叉子' % (name, i))
    noodles_lock.release()
    print('%s%s放下面条' % (name, i))

def eat2(name,i,):
    noodles_lock.acquire()
    print('%s%s拿到面条'%(name,i))
    fork_lock.acquire()
    print('%s%s拿到叉子' % (name, i))
    print('%s%s吃面'%(name,i))
    time.sleep(0.5)
    noodles_lock.release()
    print('%s%s放下面条' % (name, i))
    fork_lock.release()
    print('%s%s放下叉子' % (name, i))

for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start()
for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start()

'''
科学家1拿到叉子
科学家1拿到面条
科学家1吃面
科学家1放下叉子
科学家1放下面条
科学家2拿到叉子
科学家2拿到面条
科学家2吃面
科学家2放下叉子
科学家2放下面条
科学家3拿到面条
科学家3拿到叉子
科学家3吃面
科学家3放下面条
科学家3放下叉子
科学家4拿到面条
科学家4拿到叉子
科学家4吃面
科学家4放下面条
科学家4放下叉子
'''

 

三、信号量

信号量是基于锁+计数器实现的,使用方式跟进程的信号量一样使用

from threading import Semaphore,Thread

from threading import Semaphore,Thread
import time

def func(index,sem):
    sem.acquire()
    print(index)
    time.sleep(2)
    sem.release()

sem = Semaphore(4)          
for i in range(12):
    Thread(target=func,args=(i,sem)).start()

 

四、事件

事件的应用:检测数据库连接

from threading import Event,Thread

方法:

wait()    # 可以设置阻塞的时间

set()     # 将信号设置为true

clear()       # 将信号设置为False

is_set()   # 查看信号的状态

事件的默认状态时False

# 检测数据库连接

from threading import Event,Thread
import time

def check(e):
    time.sleep(2)
    e.set()             # 将信号设置为True

def connect(e):
    for i in range(3):
        e.wait(1)       # 阻塞一秒
        if e.is_set():  # 查看信号的状态        
            print('连接成功')
            break
    else:print('连接失败')

e = Event()
Thread(target=check,args=(e,)).start()
Thread(target=connect,args=(e,)).start()

 

五、条件

方法:

notify()      # 控制流量,通知多少个可以通过,有参数。

wait()        # 阻塞所有进程

notify_all()      # 全部放行,一般配合notify()使用

这两个方法都是线程不安全的,每个方法使用的前后都需要加锁,条件里面有锁的方法。

# 条件

from threading import Condition,Thread
import time

def get_through(name,c):
    print('%s在等待'%name)
    c.acquire()
    c.wait()                # 阻塞,等待通过线程的命令
    print('%s通过'%name)
    c.release()

name_list = ['刘一','陈二','张三','李四','王五','赵六','孙七','周八']
c = Condition()
for i in name_list:
    t = Thread(target=get_through,args=(i,c))
    t.start()

for k in range(4):
    c.acquire()
    c.notify(2)         # 设置每次通过的线程数
    c.release()
    time.sleep(5)

 

六、定时器

 使用场景:定时任务

Timer(n,函数)    实例化时接收两个参数,(执行的m秒数,执行的函数)

不影响主线程

# 定时器

from threading import Timer

def func():
    print('action')

t = Timer(5,func)           # 创建子线程,并且设置开启子线程的时间
t.start()

七、队列

qps概念:每秒钟接收到的请求数

队列的线程是安全的,队列用于做排队相关的逻辑,帮助维持相应的顺序

特点:先进先出

方法:

get()

put()

get_nowait()

put_nowait()

import queue

q = queue.Queue()
q.put(1)
print(q.get())

八、新的队列

from queue import LifoQueue

类似于栈,特点是后进先出,并且不允许插队

应用:算法的完成,有点类似分布式的思想,例如:三级菜单

from queue import LifoQueue

q = LifoQueue()
for i in range(1,6):
    q.put(i)

for i in range(1,6):
    print(q.get(),end=' ')      # 5 4 3 2 1 

九、优先级队列

只能放同一种类似的值

应用场景:会员服务

①如果是数值,按照数值从小到大取值

from queue import PriorityQueue

q = PriorityQueue()
q.put(10)
q.put(5)
q.put(20)

for i in range(3):
    print(q.get(),end=' ')      # 5 10 20 

②如果是字符串,按照ASICC编码来取值

from queue import PriorityQueue

q = PriorityQueue()
q.put('c')
q.put('a')
q.put('b')

for i in range(3):
    print(q.get(), end=' ')     # a b c 

③如果是数字、字母组成的元组,按第一个元素来取值,从小到大取值

from queue import PriorityQueue

q = PriorityQueue()

q.put((3,'zxc'))
q.put((3,'abc'))
q.put((1,'asd'))
q.put((2,'qwe'))

for i in range(4):
    print(q.get(),end=' ')      # (1, 'asd') (2, 'qwe') (3, 'abc') (3, 'zxc') 

十、线程池

concurrent.futures 模块不仅提供线程池,还提供进程池。

from concurrent.futures import ThreadPoolExecutor         # 线程池
from concurrent.futures import ProcessPoolExecutor            # 进程池

实例化的线程池数量 = 5 * cpu_count

 

方法:

submit(函数,参数)    异步提交任务,只能按位置传参,不用加args=

ret = submit()         获取返回值,需要通过result()方法取值

ret.result()        获取值

map(函数,iterable)     取代for循环submit操作

shutdown()         等于进程池的close()和join()方法,阻塞直到任务完成

 

① 有返回值

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread

def func(i):
    print('子线程号:',currentThread().ident)        # 打印子线程的线程号
    return i * '*'

tp = ThreadPoolExecutor(5)          # 创建线程池,创建5个线程
ret_lis = []
for i in range(15):
    ret = tp.submit(func,i)         # 异步提交任务
    ret_lis.append(ret)             # 将返回值存到列表
for ret in ret_lis:
    print(ret.result())               # 通过result()方法获取返回值的值
print('主线程',currentThread().ident)   # 打印主线程的线程号

 

 

② 无返回值

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread

def func():
    print('子进程',currentThread().ident)      # 打印子线程的线程号

tp = ThreadPoolExecutor(3)                    # 创建线程池,开启3个线程
for i in range(9):
    tp.submit(func)                           # 异步提交任务
tp.shutdown()                                 # 阻塞主线程,待所有的子线程运行完
print('主线程',currentThread().ident)

 

③ map方法

使用may方法必须传入参数

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time

def func(n):                            # 使用map必须有一个参数
    print('子线程号:',currentThread().ident)
    time.sleep(1)

tp = ThreadPoolExecutor(3)
ret = tp.map(func,range(15))            # map函数会传入一个参数
# for i in range(15):
#     tp.submit(func,i)
#  异步提交任务
print('主线程:',currentThread().ident)

 十一、回调函数

线程池和进程池的回调函数通过submit实现的,

add_done_callback调用回调函数,不需要传参,回调函数需要通过result()取值

线程池的回调函数由子线程完成

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time

def func(i):
    print('子线程:',currentThread().ident)         # 获取子线程的线程号
    time.sleep(1)
    return i

def call_back(ret):
    print('ret>',ret.result())                      # 通过result()方法取值
    print('callback线程号:',currentThread().ident)   # 获取回调函数的线程号

tp = ThreadPoolExecutor(3)
for i in range(9):
    tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回调函数的方法,函数不需要传入参数
tp.shutdown()                                           # 阻塞主线程,等待所有子线程执行完
print('主线程号:',currentThread().ident)

进程池的回调函数由主进程完成

from concurrent.futures import ProcessPoolExecutor
import time,os

def func(i):
    print('子进程:',os.getpid())         # 获取子进程的进程号
    time.sleep(1)
    return i

def call_back(ret):
    print('ret>',ret.result())            # 通过result()方法取值
    print('callback进程号:',os.getpid())   # 获取回调函数的进程号

if __name__ == '__main__':

    tp = ProcessPoolExecutor(3)
    for i in range(9):
        tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回调函数的方法,函数不需要传入参数
    tp.shutdown()                           # 阻塞主进程
    print('主线程号:',os.getpid())

十二、local模块

from threading import local

不同线程的ID存储的值和取到的值是不同的

多个线程之间使用threading.local对象,可以实现多个线程之间的数据隔离

import time
import random
from threading import local,Thread

loc = local()
def func2():
    global loc
    print(loc.name,loc.age)

def func1(name,age):
    global loc
    loc.name = name
    loc.age = age
    time.sleep(random.random())
    func2()

Thread(target=func1,args=('xiaobai',20)).start()
Thread(target=func1,args=('xiaohei',25)).start()

'''
xiaobai 20
xiaohei 25
'''
原文地址:https://www.cnblogs.com/st-st/p/9700265.html