day034GIL锁,线程队列,线程池,回调函数

本节内容:

1、GIL锁
2、线程队列
3、线程池
4、线程的回调函数

参考文章1
参考文章2

一、GIL锁

1、GIL锁介绍(Global Interpreter Lock)

首先,一些语言(java、c++、c)是支持同一个进程中的多个线程是可以应用多核CPU的,也就是我们会听到的现在4核8核这种多核CPU技术的牛逼之处。
那么我们之前说过应用多进程的时候如果有共享数据是不是会出现数据不安全的问题啊,就是多个进程同时一个文件中去抢这个数据,大家都把这个数据改了,
但是还没来得及去更新到原来的文件中,就被其他进程也计算了,导致数据不安全的问题啊,所以我们是不是通过加锁可以解决啊,
多线程大家想一下是不是一样的,并发执行就是有这个问题。但是python最早期的时候对于多线程也加锁,
但是python比较极端的(在当时电脑cpu确实只有1核)加了一个GIL全局解释锁,是解释器级别的,锁的是整个线程,而不是线程里面的某些数据操作,
每次只能有一个线程使用cpu,也就说多线程用不了多核,但是他不是python语言的问题,是CPython解释器的特性,
如果用Jpython解释器是没有这个问题的,Cpython是默认的,因为速度快,Jpython是java开发的,在Cpython里面就是没办法用多核,
这是python的弊病,历史问题,虽然众多python团队的大神在致力于改变这个情况,但是暂没有解决。
(这和解释型语言(python,php)和编译型语言有关系吗???待定!,
编译型语言一般在编译的过程中就帮你分配好了,解释型要边解释边执行,所以为了防止出现数据不安全的情况加上了这个锁,这是所有解释型语言的弊端??)


但是有了这个锁我们就不能并发了吗?当我们的程序是偏计算的,也就是cpu占用率很高的程序(cpu一直在计算),就不行了,
但是如果你的程序是I/O型的(一般你的程序都是这个)(input、访问网址网络延迟、打开/关闭文件读写),
在什么情况下用的到高并发呢(金融计算会用到,人工智能(阿尔法狗),但是一般的业务场景用不到,爬网页,多用户网站、聊天软件、处理文件),
I/O型的操作很少占用CPU,那么多线程还是可以并发的,因为cpu只是快速的调度线程,而线程里面并没有什么计算,就像一堆的网络请求,
我cpu非常快速的一个一个的将你的多线程调度出去,你的线程就去执行I/O操作了,

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。
有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。
像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。
所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:

文档链接

2、GIL相关解释

GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,
以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。
通过开多个进程就可以利用到多核,进行计算,
例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程

3、gil锁的工作原理及流程

执行一个python代码,从磁盘读取到内存,会启动一个python.exe(python解释器),开辟一个进程,
在这个进程空间里,从py代码到解释器,要经过GIL锁(互斥锁或同步锁,锁的位置在解释器上),这里就变成串行了,
也就导致python不能利用多核,跟语言没关系,主要是解释器的原因,
然后解释器里,有两个功能,一个是编译,一个是虚拟机,编译成 .pyc,以及生成二进制文件
然后通过操作系统,发给cup执行计算,

4、有了GIL锁为什么还要lock锁

1、三个需要注意的点:

#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

#3. 一定要看本小节最后的GIL与互斥锁的经典分析
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

2、GIL VS Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?

首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,
前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),
后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock

过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限

线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,
还没有执行完毕(这是线程若进入io操作,操作系统会强制回收线程1拿到的GIL锁,释放出去给其他线程抢),
即线程1还未释放Lock,
有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,
于是线程2进入阻塞,被夺走执行权限,
有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果

既然是串行,那我们执行

t1.start()

t1.join

t2.start()

t2.join()

这也是串行执行啊,为何还要加Lock呢,需知join是等待t1所有的代码执行完,
相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码。

详解:

因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,
每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,
此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,
假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,
可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,
为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,
其它人都不能动,这样就解决了上述的问题,这可以说是Python早期版本的遗留问题。

lock的作用

二、线程队列

Import queue
三种队列形式
1 先进先出
2 先进后出 后进先出
3 优先级队列   参数是:元祖(1,’2’) 1是优先级  ‘2’是数据
优先级一样时,后面的数据类型要一样,才可以进行比较,
字典不可以,因为字典是无序的,是通过哈希值存储的
import queue

#先进先出队列
# class queue.Queue(maxsize=0) #先进先出 frist in frist out
# q = queue.Queue(3) #创建了一个长度为3的队列,也就是说,最多只能放3个数据
#
# q.put(2)
# print('>>>>',q.qsize())
# q.put(5)
# print('>>>>',q.qsize())
# q.put(0)
#
# # q.put(1)
# try:
#     q.put_nowait(1)
# except Exception:  # 捕获异常,这样就可以
#     print('队列满了')
# print(q.get())
# print(q.get())
# print(q.get())
# # print(q.get())
# # print(q.get_nowait())

#后进先出,先进后出
# class queue.LifoQueue(maxsize=0) #last in fisrt out

# import queue
#
# q = queue.LifoQueue(3)
#
# q.put(1)
# q.put(2)
# q.put(3)
#
# print(q.get())
# print(q.get())
# print(q.get())

# 优先级队列
# class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

# import queue
#
# q = queue.PriorityQueue(5)  # 创建队列
#
# q.put((3,33))
#
# q.put((1,2)) # 优先级一样是,比较后面值的先后顺序,字母为ASCII码的顺序,中文为Unicode顺序
# q.put((1,3))
#
# q.put((2,22))
#
# q.put((4,44))

# print(q.get())

# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())

三、线程池

到这里就差我们的线程池没有讲了,我们用一个新的模块给大家讲,早期的时候我们没有线程池,
现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,
之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,他俩用法上是一样的。

为什么要将进程池和线程池放到一起呢,是为了统一使用方式,
使用threadPollExecutor和ProcessPollExecutor的方式一样,
而且只要通过这个concurrent.futures导入就可以直接用他们两个了
两者的切换很简单,只要更改实例化对象就行了,就可以实现两者间的转换

1、基本用法

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
submit(fn, *args, **kwargs)
异步提交任务

map(func, *iterables, timeout=None, chunksize=1)
取代for循环 + submit的操作

shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

result(timeout=None)
取得结果

add_done_callback(fn)
回调函数

2、ThreadPoolExecutor的简单使用

import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor  # 可以直接引入线程池和进程池(推荐)
from multiprocessing import Pool  # 引入进程池的一种方法,

def func(n):
    # print(n)
    time.sleep(1)
    return n * n

if __name__ == '__main__':
    t_p = ThreadPoolExecutor(max_workers=4)  # 实例化线程池
    # t_p = ProcessPoolExecutor(max_workers=4)  # 只需要将变量变成实例化进程池就行,就完成线程池 -- 》 进程池
    # p_pool = Pool(4)
    res_list = []

    for i in range(10):
        res = t_p.submit(func,i)  #异步提交任务
        # print(res.result())  #等待任务的执行结果,拿不到就阻塞,拿到了再运行
        res_list.append(res)  # 加入到一个列表,等待所有子线程或子进程完成再往下执行

    t_p.shutdown()  # 等待

    print('主线程结束')

    for res1 in res_list:
        print(res1.result())  # 拿到结果

    # for i in range(10):
    #     res = p_pool.apply_async(func,args=(i,))
    #     # print(res)
    #     print(res.get())

3、ProcessPoolExecutor的使用:

只需要将这一行代码改为下面这一行就可以了,其他的代码都不用变
tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就会发现为什么将线程池和进程池都放到这一个模块里面了,用法一样

4、map的使用

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])

四、回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
用.submit(func,10).add_done_callback(call_back)  # 执行命令是调用,回调函数

from multiprocessing import Pool
用p_pool.apply_async(func,args=(10,),callback=call_back),创建时直接指定返回值,调用回调函数

fe:普通示例

import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool

def func(n):
    time.sleep(1)
    return n * n

def call_back(m):
    print('>>>>>',m)
    print(m.result())

if __name__ == '__main__':
    # t_p = ThreadPoolExecutor(max_workers=4)  # 实例化线程池
    t_p = ProcessPoolExecutor(max_workers=4)  # 实例化进程池
    p_pool = Pool(4)
    res_list = []

    res = t_p.submit(func,10).add_done_callback(call_back)  # 命令调用回调函数

    # p_pool.apply_async(func,args=(10,),callback=call_back)
    # for i in range(10):
    #     res = p_pool.apply_async(func,args=(i,))
    #     # print(res)
    #     print(res.get())
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_d
原文地址:https://www.cnblogs.com/yipianshuying/p/10059587.html