Python之路(第四十七篇) 协程:greenlet模块gevent模块asyncio模块

 

一、协程介绍

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

协程相比于线程,最大的区别在于,协程不需要像线程那样来回的中断切换,也不需要线程的锁机制,因为线程中断或者锁机制都会对性能问题造成影响,所以协程的性能相比于线程,性能有明显的提高,尤其在线程越多的时候,优势越明显。

协程的好处:

  1. 无需线程上下文切换的开销

  2. 无需原子操作锁定及同步的开销 "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

  3. 方便切换控制流,简化编程模型

  4. 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

 

缺点:

  1. 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是 CPU 集型应用。

  2. 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

 

总结协程特点:

  1. 必须在只有一个单线程里实现并发

  2. 修改共享数据不需加锁

  3. 用户程序里自己保存多个控制流的上下文栈

  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

 

Python2.x协程

类库:

  • yield

  • greenlet

  • gevent

 

Python3.x协程

  • asyncio

 

Python3.x系列的gevent用法和python2.x系列是一样的

 

 

在学习前,我们先来理清楚同步/异步的概念

·同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。也称作串行执行。

·异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。也称作并行执行。

 

二、greenlet模块

第三方模块,可以在pycharm中选择虚拟环境安装,

也可以通过 pip install greenlet 安装

 

greenlet 通过 greenlet(func) 启动一个协程,通过 switch() 手动切换程序的执行

示例

from greenlet import greenlet
​
def func1(name):
    print("%s from func1"%name) #2执行这一句
    g2.switch("jack")  #3切换执行func2(),第一次执行要传入参数保存现在执行的状态
    print("from func1 end") #6执行这一句
    g2.switch()#7切换执行play(),保存现在执行的状态
​
def func2(name):
    print("%s from func2"%name) #4执行这一句
    g1.switch() #5切换执行func1(),保存现在执行的状态
    print("from func2 end") #8执行这一句
​
g1 = greenlet(func1)
g2 = greenlet(func2)
g1.switch("nick") #1执行func1(),在switch()里传参数 ,注意与一般的线程、进程传参方式的不同
#可以在第一次switch时传入参数,以后都不需要

  



分析:就是通过创建greenlet(func)对象,通过对象的switch()方法转移程序执行的不同步骤,但是这里无法自动识别IO后自动切换。

 

三、gevent模块

gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是greenlet, 它是以C扩展模块形式接入Python的轻量级协程。

安装 pip3 install gevent 或者在pycharm中选择虚拟环境安装

 

用法

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如func1,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数func1的
​
g2=gevent.spawn(func2)
​
g1.join() #等待g1结束
​
g2.join() #等待g2结束
​
#或者上述两步合作一步:gevent.joinall([g1,g2])
​
g1.value#拿到func1的返回值

  

 

示例

import gevent
​
​
def func1():
    print('from func1: 1')
    gevent.sleep(0)
    print('from func1: 2')
    gevent.sleep(1)
​
​
def func2():
    print('from func2: 1')
    gevent.sleep(2)
    print('from func2: 2')
​
​
def func3():
    print('from func3: 1')
    gevent.sleep(1)
    print('from func3: 2')
​
​
gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
    gevent.spawn(func3),
])
​

  

输出结果

from func1: 1
from func2: 1
from func3: 1
from func1: 2
from func3: 2
from func2: 2

  


分析:可以从输出结果看到程序不断的在三个函数中跳跃执行,遇到IO了就去执行另外的函数,但是请注意一点

gevent.sleep() 是用于模仿 IO 操作的,实际使用中不需要 gevent.sleep(),这里如果单纯执行上述代码的话,gevent模块也是只能识别 gevent.sleep()产生的IO,而对系统产生的IO或者网络IO之类无法识别,所有需要打上补丁,使得gevent模块识别其他IO

 

gevent是不能直接识别的需要用下面一行代码,打补丁

要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

 

示例

 

需求:爬取三个网站并打印网页字符串长度

​
from gevent import monkey;monkey.patch_all()
# 把当前程序的所有 IO 操作标记起来,否则模块无法知道 IO 操作
import gevent
import time
import requests
​
​
def get_page(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
    }
    page_text = requests.get(url=url, headers=headers).text
    print('网站长度', len(page_text))
​
​
def main():
    urls = [
        'https://www.sogou.com',
        'https://cn.bing.com',
        'https://cnblogs.com/Nicholas0707/',
    ]
    time_start = time.time()
    for url in urls:
        get_page(url)
​
    print('同步耗时:', time.time() - time_start)
​
    print("-"*50)
    async_time_start = time.time()
    gevent.joinall([
        gevent.spawn(get_page, 'https://www.sogou.com'),
        gevent.spawn(get_page, 'https://cn.bing.com'),
        gevent.spawn(get_page, 'https://cnblogs.com/Nicholas0707/'),
    ])
    print('异步协程耗时:', time.time() - async_time_start)
​
​
if __name__ == '__main__':
    main()

  

输出结果

 

网站长度 23795
网站长度 130248
网站长度 13761
同步耗时: 2.5321450233459473
--------------------------------------------------
网站长度 23795
网站长度 130221
网站长度 13761
异步协程耗时: 0.36602067947387695
​

  

分析:从结果可以看出采用协程异步明显更快

 

 

四、asyncio模块

asyncio是Python3.4(2014年)引进的标准库,直接内置了对IO的支持。

python2x没有加这个库,python3.5又加入了async/await特性,python3.7新增了asyncio.run() api来执行异步函数.

 

 

协程示例

先简单看一个协程示例

运行协程函数的第一种方式(loop.run_until_complete())

#python 3.7+,本次测试环境python3.8
​
import asyncio,time
​
​
async def fun():  #定义一个协程函数
    print('hello')
    await asyncio.sleep(1)  #模拟IO操作,等待调用
    print('word')
​
​
if __name__ == '__main__':
    begin = time.time()
    # 创建一个事件loop
    loop = asyncio.get_event_loop()
    # 将协程函数加入到事件循环loop,并启动事件循环
    loop.run_until_complete(fun())
    loop.close()
    print('用时共计',time.time()-begin)
    print(fun)
    print(loop)

  


输出结果

hello
word
用时共计 1.0010573863983154
<function fun at 0x00000000022CD0D0>
<ProactorEventLoop running=False closed=True debug=False>

  

上面代码等同于下面(不推荐使用,python3.8已经不支持此写法了)

##python 3.7,本次测试环境python3.7
​
import asyncio,time
​
@asyncio.coroutine #这种写法在python3.8之后被抛弃了
def fun():  #定义一个协程函数
    print('hello')
    yield from asyncio.sleep(1)  #模拟IO操作,等待调用
    print('word')
​
​
if __name__ == '__main__':
    begin = time.time()
    # 创建一个事件loop
    loop = asyncio.get_event_loop()
    # 将协程函数加入到事件循环loop,并启动事件循环
    loop.run_until_complete(fun())
    loop.close()
    print('用时共计',time.time()-begin)

  




分析:使用async关键字定义一个协程函数,用asyncio.get_event_loop()创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

 

运行协程函数的第二种方式( asyncio.gather()---asyncio.run())

示例

# ## python 3.7+,本次测试环境python3.8
#
import asyncio,time
​
​
async def foo():
    print('start foo')
    await asyncio.sleep(1)
    print('end foo')
    return 'foo'
​
async def bar():
    print('start bar')
    await asyncio.sleep(2)
    print('end bar')
    return ('1','2')
​
async def main():
    res = await asyncio.gather(foo(), bar())
    #同时将两个异步函数对象加入事件循环,
    # 但并不运行,等待调用。
    print(res)
​
if __name__ == '__main__':
    begin = time.time()
    asyncio.run(main())
    print('共计用时',time.time()-begin)
    # 执行协程事件循环并返回结果。

  

 

输出结果

start foo
start bar
end foo
end bar
['foo', ('1', '2')]
共计用时 2.003114700317383

  


分析:如果要同时异步执行两个异步函数,需要用asyncio.gather(fun1(), fun2())将两个异步函数对象加入事件循环,这里不用显示的创建异步事件循环,因为asyncio.gather()方法中如果检测到你没有创建异步事件循环会自动帮你创建,见源代码

​
def gather(*coros_or_futures, loop=None, return_exceptions=False):
    """..."""
    if not coros_or_futures:
        if loop is None:
            loop = events.get_event_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)

  

启动事件循环是通过 asyncio.run()方法进行启动

 

运行协程函数的第三种方式( asyncio.create_task()---asyncio.run())

## python 3.7+,本次测试环境python3.8
import asyncio,time
​
​
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
​
​
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))  #创建任务事件,异步函数加入参数,
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    await task1 #将任务事件加入异步事件循环,等待调用
    await task2
​
    print(f"finished at {time.strftime('%X')}")
​
​
if __name__ == '__main__':
    begin = time.time()
    asyncio.run(main()) #启动异步事件循环
    print('共计用时',time.time()-begin)

  

输出结果

started at 20:01:51
hello at 20:01:52
world at 20:01:53
finished at 20:01:53
共计用时 2.002114772796631

  

分析:通过asyncio.create_task()创建等待异步执行的任务事件,这里也是自动创建了事件循环loop,

源码

def create_task(coro, *, name=None):
    """...
    """
    loop = events.get_running_loop()

  

然后使用await将任务事件加入异步事件循环。

 

 

关于asyncio的一些关键字的说明:

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态

  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别

  • async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口,等待调用。

  • sleep:暂停执行此任务,为事件循环分配要竞争的任务,并且它(事件循环)监视其所有任务的状态并从一个任务切换到另一个,这里是模拟io任务花费的时间。

 

asyncio方法


"""
Asyncio.get_event_loop()
​
返回一个事件循环对象,是asyncio.Baseeventloop的实例
​
Abstracteventloop.stop()
​
停止运行事件循环
​
​
Abstracteventloop.run_forever()
​
一直运行直到stop()
​
Abstracteventloop.run_until_complete(future)
​
运行直至future对象运行完
​
Abstracteventloop.close()
​
关闭事件循环
​
Abstracteventloop.is_running()
​
返回事件循环的是否运行
​
asyncio.gather(*aws, loop=None, return_exceptions=False)
同时在协程事件循环中运行定义的异步函数对象
​
​
task = asyncio.create_task(func());task.cancel()
请求取消任务。调用它将导致Task将CancelledError异常抛出到协程事件循环中。
​
"""

  


 

 

为异步函数绑定回调函数

## python 3.7+,本次测试环境python3.8
import asyncio
​
async def fun():
    print('hello word')
    return 'nick'
​
​
def callback(future):
    print('Callback: ', future.result())  # 通过result()方法获得异步函数的返回值
​
​
loop = asyncio.get_event_loop()  # 创建异步事件循环
task = loop.create_task(fun())  # 将异步函数加入loop
task.add_done_callback(callback)  # 添加回调函数
loop.run_until_complete(task)
​

  

输出结果

hello word
Callback:  nick

  


示例二

## python 3.7+,本次测试环境python3.8
import asyncio
​
async def fun():
    print('hello')
    await asyncio.sleep(1)
    print('fun --end')
    return 'nick'
async def bar():
    print('word')
    await asyncio.sleep(2)
    print('bar --end')
    return 'jack'
​
​
def callback(future):
    print('Callback: ', future.result())  # 通过result()方法获得异步函数的返回值
​
​
async def main():
    loop = asyncio.get_event_loop()  # 创建异步事件循环
    task1 = loop.create_task(fun())  # 将异步函数加入loop
    task2 = loop.create_task(bar())  # 将异步函数加入loop
    task1.add_done_callback(callback)  # 添加回调函数
    task2.add_done_callback(callback)  # 添加回调函数
    await task1
    await task2
​
if __name__ == '__main__':
    asyncio.run(main())

  

输出结果

hello
word
fun --end
Callback:  nick
bar --end
Callback:  jack

  

分析:通过add_done_callback方法给task任务添加回调函数,当task(也可以说是coroutine)执行完成的时候,就会调用回调函数,通过result()方法获得异步函数的返回值。

 

原文地址:https://www.cnblogs.com/Nicholas0707/p/11774525.html