python协程系列

声明:本文针对的是python3.4以后的版本的,因为从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。比如在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,但是在3.5以后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差异,但是原理是一样的。本文用最通俗的语言解释了pythonasyncio背后的一些核心概念,简要解析了asyncio的设计架构,并给出了使用python进行asyncio异步编程的一般模板。

一、一些最重要的概念
1、协程(coroutine)——本质就是一个函数

所谓的“协程”就是一个函数,这个函数需要有两个基本的组成要素,第一,需要使用@asyncio.coroutine进行装饰;第二,函数体内一定要有yield from 返回的的generator,或者是说使用yield from 返回另一个协程对象。

当然,这两个条件并不是硬性规定的,如果没有这两个条件,依然是函数,只不过是普通函数而已。

怎么判断一个函数是不是协程?通过asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判断,返回true,则是。

那么协程函数有什么作用呢?

(1)result = yield from future

作用一:返回future的结果。什么是future?后面会讲到。当协程函数执行到这一句,协程会被悬挂起来,知道future的结果被返回。如果是future被中途取消,则会触发CancelledError异常。由于task是future的子类,后面也会介绍,关于future的所有应用,都同样适用于task

(2)result = yield from coroutine

等候另一个协程函数返回结果或者是触发异常 

(3)result= yield from task

返回一个task的结果

(4)return expression

作为一个函数,他本身也是可以返回某一个结果的

(5)raise exception 

2、事件循环——event_loop

协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,然后由事件循环去运行,单独运行协程函数是不会有结果的,看一个简单的例子:

import time
import asyncio
async def say_after_time(delay,what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"开始时间为: {time.time()}")
await say_after_time(1,"hello")
await say_after_time(2,"world")
print(f"结束时间为: {time.time()}")

loop=asyncio.get_event_loop() #创建事件循环对象
#loop=asyncio.new_event_loop() #与上面等价,创建新的事件循环
loop.run_until_complete(main()) #通过事件循环对象运行协程函数
loop.close()
在python3.6版本中,如果我们单独像执行普通函数那样执行一个协程函数,只会返回一个coroutine对象(python3.7)如下所示:

>>> main()
<coroutine object main at 0x1053bb7c8>
(1)获取事件循环对象的几种方式:

下面几种方式可以用来获取、设置、创建事件循环对象loop

loop=asyncio.get_running_loop() 返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的

loop=asyncio.get_event_loop() 获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;

loop=asyncio.set_event_loop(loop) 设置一个事件循环为当前线程的事件循环;

loop=asyncio.new_event_loop() 创建一个新的事件循环

(2)通过事件循环运行协程函数的两种方式:

(1)方式一:创建事件循环对象loop,即asyncio.get_event_loop(),通过事件循环运行协程函数

(2)方式二:直接通过asyncio.run(function_name)运行协程函数。但是需要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数总是会创建一个新的事件循环并在run结束之后关闭事件循环,所以,如果在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,因为同一个线程不能有两个事件循环,而且这个run函数不能同时运行两次,因为他已经创建一个了。即同一个线程中是不允许有多个事件循环loop的。

asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,因为它是高层API,后面会讲到它与asyncio.run_until_complete()的差异性,run_until_complete()是相对较低层的API。

注意:到底什么是事件循环?如何理解?

可以这样理解:线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,然后又走到另外一个方法,依次进行下去,知道事件循环所有的方法执行完毕。实际上loop是BaseEventLoop的一个实例,我们可以查看定义,它到底有哪些方法可调用。

3、什么是awaitable对象——即可暂停等待的对象

有三类对象是可等待的,即 coroutines, Tasks, and Futures.

coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;

Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;

Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。

三者的关系,coroutine可以自动封装成task,而Task是Future的子类。

4、什么是task任务

如前所述,Task用来 并发调度的协程,即对协程函数的进一步包装?那为什么还需要包装呢?因为单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。

(1)创建任务(两种方法):

方法一:task = asyncio.create_task(coro())   # 这是3.7版本新添加的

方法二:task = asyncio.ensure_future(coro())

也可以使用

loop.create_future()

loop.create_task(coro)

也是可以的。

备注:关于任务的详解,会在后面的系列文章继续讲解,本文只是概括性的说明。

(2)获取某一个任务的方法:

方法一:task=asyncio.current_task(loop=None)

返回在某一个指定的loop中,当前正在运行的任务,如果没有任务正在运行,则返回None;

如果loop为None,则默认为在当前的事件循环中获取,

方法二:asyncio.all_tasks(loop=None)

返回某一个loop中还没有结束的任务

5、什么是future?

Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。

Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,

返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().

二、asyncio的基本架构
前面介绍了asyncio里面最为核心的几个概念,如果能够很好地理解这些概念,对于学习协程是非常有帮助的,但是按照我个人的风格,我会先说asyncio的架构,理解asyncio的设计架构有助于更好地应用和理解。

asyncio分为高层API和低层API,我们都可以使用,就像我前面在讲matplotlib的架构的时候所讲的一样,我们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。当然asyncio所涉及到的功能远不止于此,我们只看这么多。下面是是高层API和低层API的概览:

High-level APIs

Coroutines and Tasks(本文要写的)
Streams
Synchronization Primitives
Subprocesses
Queues
Exceptions
Low-level APIs

Event Loop(下一篇要写的)
Futures
Transports and Protocols
Policies
Platform Support
所谓的高层API主要是指那些asyncio.xxx()的方法,

1、常见的一些高层API方法

(1)运行异步协程

asyncio.run(coro, *, debug=False)  #运行一个一步程序,参见上面

(2)创建任务

task=asyncio.create_task(coro)  #python3.7  ,参见上面

task = asyncio.ensure_future(coro()) 

(3)睡眠

await asyncio.sleep(delay, result=None, *, loop=None)

这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而允许其他任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息,注意他们的区别哦。

另外如果提供了参数result,当当前任务(协程)结束的时候,它会返回;

loop参数将会在3.10中移除,这里就不再说了。

(4)并发运行多个任务
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

它本身也是awaitable的。

*coros_or_futures是一个序列拆分操作,如果是以个协程函数,则会自动转换成Task。

当所有的任务都完成之后,返回的结果是一个列表的形式,列表中值的顺序和*coros_or_futures完成的顺序是一样的。

return_exceptions:False,这是他的默认值,第一个出发异常的任务会立即返回,然后其他的任务继续执行;

                             True,对于已经发生了异常的任务,也会像成功执行了任务那样,等到所有的任务执行结束一起将错误的结果返回到最终的结果列表里面。

如果gather()本身被取消了,那么绑定在它里面的任务也就取消了。

(5)防止任务取消

await asyncio.shield(*arg, *, loop=None)

它本身也是awaitable的。顾名思义,shield为屏蔽、保护的意思,即保护一个awaitable 对象防止取消,一般情况下不推荐使用,而且在使用的过程中,最好使用try语句块更好。

try:
res = await shield(something())
except CancelledError:
res = None
(6)设置timeout——一定要好好理解

await asyncio.wait_for(aw, timeout, *, loop=None)

如果aw是一个协程函数,会自动包装成一个任务task。参见下面的例子:

import asyncio

async def eternity():
print('我马上开始执行')
await asyncio.sleep(3600) #当前任务休眠1小时,即3600秒
print('终于轮到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒钟哦')
await asyncio.wait_for(eternity(), timeout=3) #休息3秒钟了执行任务
except asyncio.TimeoutError:
print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我马上开始执行
超时了!
'''
为什么?首先调用main()函数,作为入口函数,当输出‘等你3秒钟哦’之后,main挂起,执行eternity,然后打印‘我马上开始执行’,然后eternity挂起,而且要挂起3600秒,大于3,这时候出发TimeoutError。修改一下:‘’

import asyncio

async def eternity():
print('我马上开始执行')
await asyncio.sleep(2) #当前任务休眠2秒钟,2<3
print('终于轮到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒钟哦')
await asyncio.wait_for(eternity(), timeout=3) #给你3秒钟执行你的任务
except asyncio.TimeoutError:
print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我马上开始执行
终于轮到我了
'''
总结:当异步操作需要执行的时间超过waitfor设置的timeout,就会触发异常,所以在编写程序的时候,如果要给异步操作设置timeout,一定要选择合适,如果异步操作本身的耗时较长,而你设置的timeout太短,会涉及到她还没做完,就抛出异常了。

(7)多个协程函数时候的等候

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,比如:

{func(),func(),func3()}

表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是可以的。

注意:该函数的返回值是两个Tasks/Futures的集合:

(done, pending)

其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示还没有完成的任务。

常见的使用方法为:done, pending = await asyncio.wait(aws)

参数解释:

timeout (a float or int), 同上面的含义一样,需要注意的是,这个不会触发asyncio.TimeoutError异常,如果到了timeout还有任务没有执行完,那些没有执行完的tasks和futures会被返回到第二个集合pending里面。

return_when参数,顾名思义,他表示的是,什么时候wait函数该返回值。只能够去下面的几个值。 

Constant Description
FIRST_COMPLETED first_completes.当任何一个task或者是future完成或者是取消,wait函数就返回
FIRST_EXCEPTION 当任何一个task或者是future触发了某一个异常,就返回,.如果是所有的task和future都没有触发异常,则等价与下面的 ALL_COMPLETED.
ALL_COMPLETED 当所有的task或者是future都完成或者是都取消的时候,再返回。
如下面例子所示:

import asyncio
import time

a=time.time()

async def hello1(): #大约2秒
print("Hello world 01 begin")
yield from asyncio.sleep(2)
print("Hello again 01 end")

async def hello2(): #大约3秒
print("Hello world 02 begin")
yield from asyncio.sleep(3)
print("Hello again 02 end")

async def hello3(): #大约4秒
print("Hello world 03 begin")
yield from asyncio.sleep(4)
print("Hello again 03 end")

async def main(): #入口函数
done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
for i in done:
print(i)
for j in pending:
print(j)

asyncio.run(main()) #运行入口函数

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:Python学习基础入门asyncio3.4详解 est11.py:46> result=None>
<Task pending coro=<hello3() running at e:Python学习基础入门asyncio3.4详解 est11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:Python学习基础入门asyncio3.4详解 est11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''
从上面可以看出,hello1()试运行结束了的,hello2()和hello3()还没结束。
(8)asyncio.as_completed()函数

这个函数我没有找到合适的中文名称去描述,所以哪位大神如果知道,望告知,不胜感激!它的函数原型如下:

asyncio.as_completed(aws, *, loop=None, timeout=None)

第一个参数aws:同上面一样,是一个集合{}集合里面的元素是coroutine、task或者future

第三个参数timeout:意义和上面讲的的一样

那到底什么作用呢?其实很简单,个人感觉有点鸡肋,从一个例子看起:

import asyncio
import time
import threading

a=time.time()

@asyncio.coroutine
def hello1():
print("Hello world 01 begin")
yield from asyncio.sleep(5) #大约5秒
print("Hello again 01 end")
return '哈哈1'

@asyncio.coroutine
def hello2():
print("Hello world 02 begin")
yield from asyncio.sleep(3) #大约3秒
print("Hello again 02 end")
return '哈哈2'

@asyncio.coroutine
def hello3():
print("Hello world 03 begin")
yield from asyncio.sleep(4) #大约4秒
print("Hello again 03 end")
return '哈哈3'

async def main():
s=asyncio.as_completed({hello1(),hello2(),hello3()})
for f in s:
result=await f
print(result)

asyncio.run(main())

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''

结论:asyncio.as_completed()函数返回的是一个可迭代(iterator)的对象,对象的每个元素就是一个future对象,很多小伙伴说,这不是相当于没变吗?其实返回的future集合是对参数的future集合重新组合,组合的顺序就是,最先执行完的协程函数(coroutine、task、future)最先返回,从上面的代码可知,参数为

aws={hello1(),hello2(),hello3()},因为hello1大约花费5秒、hello2大约花费3秒、hello3大约花费4秒。返回的结果为

s={hello2()、hello3()、hello(1)},因为hello2时间最短,故而放在前面,hello1时间最长,故而放在最后面。然后对返回的集合s开始迭代。

2、Task 类详解

先来看一下Task类的简单介绍(英文原文文档)。

class asyncio.Task(coro, *, loop=None)

A Future-like object that runs a Python coroutine. Not thread-safe.

Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.

Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.

To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.

cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.

asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().

Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

上面的文字描述中推出了几个非常重要的信息,特在此总结如下:

(1)他是作为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future所有的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高层API  asyncio.ccreate_task()创建任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()创建任务对象;

(3)相比于协程函数,任务时有状态的,可以使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。

下面介绍Task类常见的一些使用函数

(1)cancel()

Request the Task to be cancelled.

其实前面已经有所介绍,最好是使用他会出发CancelledError异常,所以需要取消的协程函数里面的代码最好在try-except语句块中进行,这样方便触发异常,打印相关信息,但是Task.cancel()没有办法保证任务一定会取消,而Future.cancel()是可以保证任务一定取消的。可以参见下面的一个例子:

import asyncio

async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600) #模拟一个耗时任务
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')

async def main():
#通过协程创建一个任务,需要注意的是,在创建任务的时候,就会跳入到异步开始执行
#因为是3.7版本,创建一个任务就相当于是运行了异步函数cancel_me
task = asyncio.create_task(cancel_me())
#等待一秒钟
await asyncio.sleep(1)
print('main函数休息完了')
#发出取消任务的请求
task.cancel()
try:
await task #因为任务被取消,触发了异常
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''运行结果为:
cancel_me(): before sleep
main函数休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
运行过程分析:

首先run函数启动主函数入口main,在main中,因为第一句话就是调用异步函数cancel_me()函数,所以先打印出了第一句话;

然后进入cancel_me中的try语句,遇到await,暂停,这时候返回main中执行,但是有在main中遇到了await,也会暂停,但是由于main中只需要暂停1秒,而cancel_me中要暂停3600秒,所以等到main的暂停结束后,接着运行main,所以打印出第二句话;

接下来遇到取消任务的请求task.cancel(),然后继续执行main里面的try,又遇到了await,接着main进入暂停,接下来进入到cancel_me函数中,但是由于main中请求了取消任务,所以那个耗时3600秒的任务就不再执行了,直接触发了Cancelled_Error异常,打印出第三句话,接下来又raise一个异常信息;

接下来执行cancel_me的finally,打印出第四句话,此时cancel_me执行完毕,由于他抛出了一个异常,返回到主程序main中,触发异常,打印出第五句话。

(2)done()

当一个被包装得协程既没有触发异常、也没有被取消的时候,意味着它是done的,返回true。

(3)result()

返回任务的执行结果,

当任务被正常执行完毕,则返回结果;

当任务被取消了,调用这个方法,会触发CancelledError异常;

当任务返回的结果是无用的时候,则调用这个方法会触发InvalidStateError;

当任务出发了一个异常而中断,调用这个方法还会再次触发这个使程序中断的异常。

(4)exception()

返回任务的异常信息,触发了什么异常,就返回什么异常,如果任务是正常执行的无异常,则返回None;

当任务被取消了,调用这个方法会触发CancelledError异常;

当任务没有做完,调用这个方法会触发InvalidStateError异常。

下面还有一些不常用的方法,如下:

(5)add_done_callback(callback, *, context=None)

(6)remove_done_callback(callback)

(7)get_stack(*, limit=None)

(8)print_stack(*, limit=None, file=None)

(9)all_tasks(loop=None),这是一个类方法

(10)current_task(loop=None),这是一个类方法

3、异步函数的结果获取

对于异步编程、异步函数而言,最重要的就是异步函数调用结束之后,获取异步函数的返回值,我们可以有以下几种方式来获取函数的返回值,第一是直接通过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。

(1)直接通过result来获取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop() #第一步:创建事件循环
task=asyncio.ensure_future(coroutine) #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(task) #第三步:通过事件循环运行
print('-------------------------------------')
print(task.result())
loop.close()

'''运行结果为
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''
(2)通过定义回调函数来获取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

def callback(future): #定义的回调函数
print(future.result())

loop = asyncio.get_event_loop() #第一步:创建事件循环
task=asyncio.ensure_future(hello1(10,5)) #第二步:将多个协程函数包装成任务
task.add_done_callback(callback) #并被任务绑定一个回调函数

loop.run_until_complete(task) #第三步:通过事件循环运行
loop.close() #第四步:关闭事件循环


'''运行结果为:
Hello world 01 begin
Hello again 01 end
15
'''
注意:所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象,因为task是future的子类。

三、asyncio异步编程的基本模板
事实上,在使用asyncio进行异步编程的时候,语法形式往往是多样性的,虽然理解异步编程的核心思想很重要,但是实现的时候终究还是要编写语句的,本次给出的模板,是两个不同的例子,例子一是三个异步方法,它们都没有参数,没有返回值,都模拟一个耗时任务;例子二是三个异步方法,都有参数,都有返回值。

1、python3.7之前的版本

(1)例子一:无参数、无返回值

import asyncio
import time

a=time.time()

async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")

loop = asyncio.get_event_loop() #第一步:创建事件循环
tasks = [hello1(), hello2(),hello3()] #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:通过事件循环运行
loop.close() #第四步:取消事件循环

'''运行结果为:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''
(2)例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

loop = asyncio.get_event_loop() #第一步:创建事件循环
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3] #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:通过事件循环运行
print(task1.result()) #并且在所有的任务完成之后,获取异步函数的返回值
print(task2.result())
print(task3.result())
loop.close() #第四步:关闭事件循环

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)总结:四步走(针对python3.7之前的版本)

第一步·:构造事件循环

loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的

loop=asyncio.get_event_loop() #获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;

loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;

loop=asyncio.new_event_loop() #创建一个新的事件循环
第二步:将一个或者是多个协程函数包装成任务Task

#高层API
task = asyncio.create_task(coro(参数列表))   # 这是3.7版本新添加的
task = asyncio.ensure_future(coro(参数列表))

#低层API
loop.create_future(coro)
loop.create_task(coro)

'''需要注意的是,在使用Task.result()获取协程函数结果的时候,使用asyncio.create_task()却会显示错
但是使用asyncio.ensure_future却正确,本人暂时不知道原因,哪位大神知道,望告知,不胜感激!'''
第三步:通过事件循环运行

loop.run_until_complete(asyncio.wait(tasks)) #通过asyncio.wait()整合多个task

loop.run_until_complete(asyncio.gather(tasks)) #通过asyncio.gather()整合多个task

loop.run_until_complete(task_1) #单个任务则不需要整合

loop.run_forever() #但是这个方法在新版本已经取消,不再推荐使用,因为使用起来不简洁

'''
使用gather或者wait可以同时注册多个任务,实现并发,但他们的设计是完全不一样的,在前面的2.1.(4)中已经讨论过了,主要区别如下:
(1)参数形式不一样
gather的参数为 *coroutines_or_futures,即如这种形式
tasks = asyncio.gather(*[task1,task2,task3])或者
tasks = asyncio.gather(task1,task2,task3)
loop.run_until_complete(tasks)
wait的参数为列表或者集合的形式,如下
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)
(2)返回的值不一样
gather的定义如下,gather返回的是每一个任务运行的结果,
results = await asyncio.gather(*tasks)
wait的定义如下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型
done, pending = yield from asyncio.wait(fs)
(3)后面还会讲到他们的进一步使用
'''
简单来说:async.wait会返回两个值:done和pending,done为已完成的协程Task,pending为超时未完成的协程Task,需通过future.result调用Task的result。而async.gather返回的是已完成Task的result。

第四步:关闭事件循环

loop.close()

'''
以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果关闭了,就不能再运行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此处异常
建议调用 loop.close,以彻底清理 loop 对象防止误用
'''
2、python3.7版本

在最新的python3.7版本中,asyncio又引进了一些新的特性和API,

(1)例子一:无参数、无返回值

import asyncio
import time


async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")

async def main():
results=await asyncio.gather(hello1(),hello2(),hello3())
for result in results:
print(result) #因为没返回值,故而返回None

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''
(2)例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

async def main():
results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
for result in results:
print(result)

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)总结:两步走(针对python3.7)

第一步:构建一个入口函数main

他也是一个异步协程函数,即通过async定义,并且要在main函数里面await一个或者是多个协程,同前面一样,我可以通过gather或者是wait进行组合,对于有返回值的协程函数,一般就在main里面进行结果的获取。

第二步:启动主函数main

这是python3.7新添加的函数,就一句话,即

asyncio.run(main())

注意:

不再需要显式的创建事件循环,因为在启动run函数的时候,就会自动创建一个新的事件循环。而且在main中也不需要通过事件循环去掉用被包装的协程函数,只需要向普通函数那样调用即可 ,只不过使用了await关键字而已。

四、协程编程的优点:
1、无cpu分时切换线程保存上下文问题(协程上下文怎么保存)

2、遇到io阻塞切换(怎么实现的)

3、无需共享数据的保护锁(为什么)

4、系列文章下篇预告——介绍低层的API,事件循环到底是怎么实现的以及future类的实现。
---------------------

原文:https://blog.csdn.net/qq_27825451/article/details/86218230

=============================================================================================================================

一、事件循环EventLoop
事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程的运行,都是通过事件循环完成的。在前一篇文章中,已经提到过,在python3.7中,我们甚至完全不用管事件循环,只需要使用高层API,即asyncio中的方法,我们很少直接与事件循环打交道,但是为了更加熟悉asyncio的运行原理,最好还是了解EventLoop的设计原理。

1、事件循环的创建、获取、设置(上文已经介绍过了)

(1)asyncio.get_running_loop()。python3.7新添加的

(2)asyncio.get_event_loop()

(3)asyncio.set_event_loop(loop)

(4)asyncio.new_event_loop()

2、运行和停止事件循环

(1)loop.run_until_complete(future)。运行事件循环,直到future运行结束

(2)loop.run_forever()。在python3.7中已经取消了,表示事件循环会一直运行,直到遇到stop。

(3)loop.stop()。停止事件循环

(4)loop.is_running()。如果事件循环依然在运行,则返回True

(5)loop.is_closed()。如果事件循环已经close,则返回True

(6)loop.close()。关闭事件循环

3、创建Future和Task

(1)loop.create_future(coroutine) ,返回future对象

(2)loop.create_task(corootine) ,返回task对象

(3)loop.set_task_factory(factory)

(4)loop.get_task_factory()

4、事件循环的时钟

loop.time()。可以这么理解,事件循环内部也维护着一个时钟,可以查看事件循环现在运行的时间点是多少,就像普通的time.time()类似,它返回的是一个浮点数值,比如下面的代码。

import asyncio

async def hello1(a,b):
print('准备做加法运算')
await asyncio.sleep(3)
return a+b

loop=asyncio.get_event_loop()
t1=loop.time() #开始时间
print(t1)
loop.run_until_complete(hello1(3,4))
t2=loop.time() #结束时间
print(t2)
print(t2-t1) #时间间隔
'''运行结果为:
28525.671
准备做加法运算
28528.703
3.0320000000028813
'''
5、计划执行回调函数(CallBacks)

(1)loop.call_later(delay, callback, *args, context=None)

首先简单的说一下它的含义,就是事件循环在delay多长时间之后才执行callback函数,它的返回值是asyncio.TimerHandle类的一个实例对象。

(2)loop.call_at(when, callback, *args, context=None)

即在某一个时刻进行调用计划的回调函数,第一个参数不再是delay而是when,表示一个绝对的时间点,结合前面的loop.time使用,它的使用方法和call_later()很类似。它的返回值是asyncio.TimerHandle类的一个实例对象。

(3)loop.call_soon(callback, *args, context=None)

在下一个迭代的时间循环中立刻调用回调函数,用法同上面。它的返回值是asyncio.Handle类的一个实例对象。

(4)loop.call_soon_threadsafe(callback, *args, context=None)

这是call_soon()函数的线程安全版本,计划回调函数必须在另一个线程中使用。

需要注意的是:上面的几个回调函数都只使用了“位置参数”哦,asyncio中,大部分的计划回调函数都不支持“关键字参数”,如果是想要使用关键字参数,则推荐使用functools.aprtial()对方法进一步包装,详细可以参考前面的python标准库系列文章。

如:

# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
下面来看一下具体的使用例子。

import asyncio

def callback(n):
print('我是回调函数,参数为: {0} '.format(n))


async def main(loop):
print('在异步函数中注册回调函数')
loop.call_later(2, callback, 1)
loop.call_later(1, callback, 2)
loop.call_soon(callback, 3)

await asyncio.sleep(4)


loop = asyncio.get_event_loop()
print('进入事件循环')
loop.run_until_complete(main(loop))
print('关闭事件循环')
loop.close()

'''运行结果为:
进入事件循环
在异步函数中注册回调函数
我是回调函数,参数为: 3
我是回调函数,参数为: 2
我是回调函数,参数为: 1
关闭事件循环
'''
再看一个简单的例子:

import asyncio

def callback(a, loop):
print("我的参数为 {0},执行的时间为{1}".format(a,loop.time()))


#call_later, call_at
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_later(5, callback, 5, loop) #第一个参数设置的时间5.5秒后执行,
loop.call_at(now+2, callback, 2, loop) #在指定的时间,运行,当前时间+2秒
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever() #要用这个run_forever运行,因为没有传入协程,这个函数在3.7中已经被取消
except KeyboardInterrupt:
print("Goodbye!")

'''运行结果为:
我的参数为 4,执行的时间为266419.843
我的参数为 1,执行的时间为266420.843
我的参数为 2,执行的时间为266421.859
我的参数为 3,执行的时间为266422.859
我的参数为 5,执行的时间为266424.843
'''
总结注意事项:

(1)CallBack函数只能够定义为同步方法,不能够定义为async方法,及不能使用async和@asyncio.coroutine修饰;

(2)每一个CallBack方法只会调用一次,如果在同一个时刻有另个CallBack方法需要调用,则他们的执行顺序是不确定的;

(3)注意使用functools.partial()去修饰带有关键字参数的CallBack方法;

(4)如何理解?对于一般的异步函数,我们需要将它放在时间循环里面,然后通过事件循环去循环调用它,而因为CallBack并不是异步函数,它是定义为普通的同步方法,所以不能够放在时间循环里面,但是如果我依然想要让事件循环去执行它怎么办呢?那就不放进事件循环,直接让事件循环“立即、稍后、在什么时候”去执行它不就行了嘛,call的含义就是“执行”。

二、底层API之Future
1、Future的定义概览

Future的本质是一个类。他表示的是异步操作的最终将要返回的结果,故而命名为Future,它不是线程安全的。Future对象是awaitable的,参见系类文章的前面,

class asyncio.Future(*, loop=None)

2、asyncio中关于Future的几个方法

(1)asyncio.isfuture(obj) 。判断一个对象是不是Future,注意python中一切皆对象哦,包括函数,当obj是下面几种情况时返回true:

asyncio.Future的实例对象
asyncio.Task的实例对象
一个具有 _asyncio_future_blocking 属性的对象
(2)asyncio.ensure_future(obj, *, loop=None)。将一个obj包装成Future

(3)asyncio.wrap_future(future, *, loop=None)

将concurrent.futures.Future对象包装成一个 asyncio.Future 对象。

3、Future对象的常用方法

(1)result()。返回Future执行的结果返回值

如果Future被执行完成,如果使用set_result()方法设置了一个结果,那个设置的value就会被返回;

如果Future被执行完成,如果使用set_exception()方法设置了一个异常,那么使用这个方法也会触发异常;

如果Future被取消了,那么使用这个方法会触发CancelledError异常;

如果Future的结果不可用或者是不可达,那么使用这个方法也会触发InvalidStateError异常;

(2)set_result(result)

标记Future已经执行完毕,并且设置它的返回值。

(3)set_exception(exception)

标记Future已经执行完毕,并且触发一个异常。

(4)done()

如果Future1执行完毕,则返回 True 。

(5)cancelled()

判断任务是否取消。

(6)add_done_callback(callback, *, context=None)

在Future完成之后,给它添加一个回调方法,这个方法就相当于是loop.call_soon()方法,参见前面,如下例子:

如果要回调带有关键字参数的函数,也需要使用partial方法哦。

(7)remove_done_callback(callback)

(8)cancel()

(9)exception()

(10)get_loop()。返回Future所绑定的事件循环

三、集中回答以下几个问题
通过前面的讲解,已经讲清楚了asyncio架构里面的一些基本东西,现在可以来集中回答以下一些常见的问题了,弄清楚这希尔问题,可以方便我们更加深入的理解协程。

1、很多个协程一起运行有创建新的线程吗?

协程运行时,都是在一个线程中运行的,没有创建新的线程。如下

import asyncio
import time
import threading

a=time.time()

async def hello1():
    print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(3)
    print("Hello again 01 end")

async def hello2():
    print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(2)
    print("Hello again 02 end")

async def hello3():
    print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(1)
    print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello1(), hello2(),hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()


b=time.time()
print('---------------------------------------')
print(b-a)
'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello again 03 end
Hello again 02 end
Hello again 01 end
---------------------------------------
2.994506597518921
'''
从上面那个可以看出,三个不同的协程函数都是在一个线程完成的。但是并不是意味着,多个协程函数只能在一个线程中执行,同样可以创建新的线程,其实我们完全可以在新的线程中重新创建一个事件循环,具体的实例参见后面。

2、线程一定效率更高吗?

也不是绝对的,当然在一般情况下,异步方式的执行效率是更高的,就比如上面的三个函数,如果按照同步的方式执行,则一共需要6秒的时间,但是采用协程则只需要最长的那个时间3秒,这自然是提高了工作效率,那是不是一定会提高呢?也不一定,这与协程的调用方式是由密切关系的。如下所示:

import asyncio
import time
import threading

a=time.time()

async def hello1():
print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(3)
print("Hello again 01 end")

async def hello2():
print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(2)
print("Hello again 02 end")

async def hello3():
print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
await hello2()
await hello1()
print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 02 end
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 01 end
Hello again 03 end
---------------------------------------
5.008373498916626
'''
我们发现一个问题,上面执行的顺序完全不是异步执行,执行的时间也没有得到改善,究其原因,是因为上面是通过hello3去调用hello1和hello2的,这和同步调用的方式完全是一样的,即使我定义的都是异步方法,它既没有提高执行效率,还会有阻塞。

结论:在有很多个异步方式的时候,一定要尽量避免这种异步函数的直接调用,这和同步是没什么区别的,一定要通过事件循环loop,“让事件循环在各个异步函数之间不停游走”,这样才不会造成阻塞。

3、协程会不会有阻塞呢?

异步方式依然会有阻塞的,当我们定义的很多个异步方法彼此之间有一来的时候,比如,我必须要等到函数1执行完毕,函数2需要用到函数1的返回值,如上面的例子2所示,就会造成阻塞,这也是异步编程的难点之一,如何合理配置这些资源,尽量减少函数之间的明确依赖,这是很重要的。

4、协程的4种状态

协程函数相比于一般的函数来说,我们可以将协程包装成任务Task,任务Task就在于可以跟踪它的状态,我就知道它具体执行到哪一步了,一般来说,协程函数具有4种状态,可以通过相关的模块进行查看,请参见前面的文章,他的四种状态为:

Pending
Running
Done
Cacelled
 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,中途需要取消,就需要先把task取消,即为cancelled。

四、多任务实现并发
python异步协程函数的最终目的是实现并发,这样才能提高工作效率。

我们经常看见下面这样的代码,即:

tasks = asyncio.gather(*[task1,task2,task3])
loop.run_until_complete(tasks)

#或者是
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)

#甚至可以写在一起,即
loop.run_until_complete(asyncio.gather(*[task1,task2,task3])
#或者是
asyncio.gather(asyncio.wait([task1,task2,task3]))
上面这些都是一些简单的应用,可以同时进行多任务,进行并发,但是如果我们每一个任务都有返回值,而且需要获取这些返回值,这样做显然还不够,还需要做进一步的处理。

asyncio实现并发的思想是一样的,只是实现的手段稍有区别,主要有以下几种实现方式:

(1)使用gather同时注册多个任务,实现并发

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

注意事项:

gather的返回值是它所绑定的所有任务的执行结果,而且顺序是不变的,即返回的result的顺序和绑定的顺序是保持一致的。

除此之外,它是awaitable的,所以,如果需要获取多个任务的返回值,既然是awaitable的,就需要将它放在一个函数里面,所以我们引入一个包装多个任务的入口main,这也是python3.7的思想。如下:

# import asyncio
# import time
# import threading

# a=time.time()

# async def hello1():
# print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(3)
# print("Hello again 01 end")

# async def hello2():
# print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(2)
# print("Hello again 02 end")

# async def hello3():
# print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
# await hello2()
# await hello1()
# print("Hello again 03 end")

# loop = asyncio.get_event_loop()
# tasks = [hello3()]
# loop.run_until_complete(asyncio.wait(tasks))

# loop.close()


# b=time.time()
# print('---------------------------------------')
# print(b-a)

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
results=await asyncio.gather(task1,task2,task3)
for result in results: #通过迭代获取函数的结果,每一个元素就是相对应的任务的返回值,顺序都没变
print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(2)使用wait可以同时注册多个任务,实现并发

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

它与gather不同的地方是他的参数是集合类型,而且他的返回类型是这样一个形式,即

 (done, pending).   #返回dones是已经完成的任务,pending是未完成的任务,都是集合类型,不同的是每一个元素不再是返回值,而是某一个task哦,

相同的是它依然也是awaitable的,故而也需要定义在一个异步函数main()中,如下。

#前面的代码和上面一样
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
done,pending=await asyncio.wait([task1,task2,task3])
for done_task in done:
print(done_task.result()) #这里返回的是一个任务,不是直接的返回值,故而需要使用result函数进行获取


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

#运行结果也一样
(3)使用as_completed可以同时注册多个任务,实现并发

这个方法使用的比较少,与前面的两个gather和wait不同的是,它不是awaitable。使用实例参见前面的一篇文章,参见如下:

(4)主调方获取任务的运行结果

上面的运行结果,都是在main()函数里面获取的运行结果,那可不可以不再main()里面获取结果呢,,当然是可以的,我们可以这样做,

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.gather(task1,task2,task3) #不在这里获取结果,只是返回


loop = asyncio.get_event_loop()
results=loop.run_until_complete(main()) #在这里再获取返回函数值,然后迭代获取
for result in results:
print(result)
loop.close()

#y运行结果同上
或者是如下:

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.wait([task1,task2,task3]) #不在这里获取结果,只是返回


loop = asyncio.get_event_loop()
done,pending=loop.run_until_complete(main()) #在这里再获取返回函数值,然后迭代获取
for done_task in done:
print(done_task.result())
loop.close()
五、Future补充下一篇预告
1、Future补充

asyncio中的Future类是模仿concurrent.futures.Future类而设计的,关于concurrent.futures.Future,可以查阅相关的文档。它们之间的主要区别是:

(1)asyncio.Future对象是awaitable的,但是concurrent.futures.Future对象是不能够awaitable的;

(2)asyncio.Future.result()和asyncio.Future.exception()是不接受关键字参数timeout的;

(3)当Future没有完成的时候,asyncio.Future.result()和asyncio.Future.exception()将会触发一个InvalidStateError异常;

(4)使用asyncio.Future.add_done_callback()注册的回调函数不会立即执行,它可以使用loop.call_soon代替;

(5)asyncio里面的Future和concurrent.futures.wait()以及concurrent.futures.as_completed()是不兼容的。

有兴趣的小伙伴可以自己学一下concurrent.futures哦!

=============================================================================================================================

本文为系列文章的第七篇,将介绍如何使用多线程结合异步编程asyncio,开发出真正“不假死”的应用程序;以及如何模拟一个timer,实现定时操作。

一、异步方法依然会假死(freezing)
什么是程序的假死,这里不再多描述,特别是在编写桌面程序的时候,如果是使用单个线程,同步函数的方式,假死是不可避免的,但是有时候我们即使是使用了异步函数的方式依然是不可避免的,依然会假死,这是为什么呢,下面会通过几个例子来详细说明。

1、一般程序的调用方“假死”


import asyncio
import time
import threading

#定义一个异步操作
async def hello1(a,b):
print(f"异步函数开始执行")
await asyncio.sleep(3)
print("异步函数执行结束")
return a+b

#在一个异步操作里面调用另一个异步操作
async def main():
c=await hello1(10,20)
print(c)
print("主函数执行")

loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

'''运行结果为:
异步函数开始执行(在此处要等待3秒)
异步函数执行结束
30
主函数执行
'''
注意一个问题:我们前面所讲的例子中,没有出现等待,是因为各个异步方法之间是“完全并列”关系,彼此之间没有依赖,所以,我可以将所有的异步操作“gather”起来,然后通过事件循环,让事件循环在多个异步方法之间来回调用,永不停止,故而没有出现等待。

但是,现实中不可能所有的异步方法都是完全独立的,没有任何关系的,在上面的这个例子中,就是很好的说明,hello1是一个耗时任务,耗时大约3秒,main也是一个异步方法,但是main中需要用到hello1中的返回结果,所以他必须要等待hello1运行结束之后再才能继续执行,这就是为什么会得到上面结果的原因。这也再一次说明,异步依然是会有阻塞的。

我们也可以这样理解,因为我给事件循环只注册了一个异步方法,那就是main,当在main里面遇到了await,事件循环挂起,转而寻找其他的异步方法,但是由于只注册了一个异步方法给事件循环,他没有其他的方法可执行了,所以只能等待,让hello1执行完了,再继续执行。

2、窗体程序的假死

(1)同步假死

import tkinter as tk # 导入 Tkinter 库
import time

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.calculate)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()
self.root.mainloop()

def calculate(self):
time.sleep(3) #模拟耗时计算
self.label["text"]=300

if __name__=='__main__':
form=Form()
运行的结果就是,我单机一下“开始计算”按钮,然后窗体会假死,这时候无法移动窗体、也无法最大化最小化、3秒钟之后,“等待计算结果”的label会显示出3,然后前面移动的窗体等操作接着发生,最终效果如下:

上面的窗体会假死,这无可厚非,因为,所有的操作都是同步方法,只有一个线程,负责维护窗体状态的线程和执行好使计算的线程是同一个,当遇到time.sleep()的时候自然会遇到阻塞。那如果我们将耗时任务换成异步方法呢?代码如下:

(2)异步假死

import tkinter as tk # 导入 Tkinter 库
import asyncio

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.get_loop)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()

self.root.mainloop()

#定义一个异步方法,模拟耗时计算任务
async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

#asyncio任务只能通过事件循环运行,不能直接运行异步函数
def get_loop(self):
self.loop=asyncio.get_event_loop()
self.loop.run_until_complete(self.calculate())
self.loop.close()


if __name__=='__main__':
form=Form()
我们发现,窗体依然会造成阻塞,情况和前面的同步方法是一样的,为什么会这样呢?因为这个地方虽然启动了事件循环,但是拥有事件循环的那个线程同时还需要维护窗体的状态,始终只有一个线程在运行,当单击“开始计算”按钮,开始执行get_loop函数,在get_loop里面启动异步方法calculate,然后遇到await,这个时候事件循环暂停,但是由于事件循环只注册了calculate一个异步方法,也没其他事情干,所以只能等待,造成假死阻塞。

解决办法就是我专门再创建一个线程去执行一些计算任务,维护窗体状态的线程就之专门负责维护状态,后面再详说。

二、多线程结合asyncio解决调用时的假死
1、asyncio专门实现Concurrency and Multithreading(多线程和并发)的函数介绍

为了让一个协程函数在不同的线程中执行,我们可以使用以下两个函数

(1)loop.call_soon_threadsafe(callback, *args),这是一个很底层的API接口,一般很少使用,本文也暂时不做讨论。

(2)asyncio.run_coroutine_threadsafe(coroutine,loop)

第一个参数为需要异步执行的协程函数,第二个loop参数为在新线程中创建的事件循环loop,注意一定要是在新线程中创建哦,该函数的返回值是一个concurrent.futures.Future类的对象,用来获取协程的返回结果。

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)   # 在新线程中运行协程

result = future.result()   #等待获取Future的结果

2、不阻塞的多线程并发实例

asyncio.run_coroutine_threadsafe(coroutine,loop)的意思很简单,就是我在新线程中创建一个事件循环loop,然后在新线程的loop中不断不停的运行一个或者是多个coroutine。参考下面代码:

import asyncio

import asyncio,time,threading

#需要执行的耗时异步任务
async def func(num):
print(f'准备调用func,大约耗时{num}')
await asyncio.sleep(num)
print(f'耗时{num}之后,func函数运行结束')

#定义一个专门创建事件循环loop的函数,在另一个线程中启动它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

#定义一个main函数
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)

new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
t = threading.Thread(target=start_loop,args=(new_loop,)) #通过当前线程开启新的线程去启动事件循环
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #这几个是关键,代表在新线程中事件循环不断“游走”执行
asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

for i in "iloveu":
print(str(i)+" ")

if __name__ == "__main__":
main()

'''运行结果为:
i 准备调用func,大约耗时3
l 准备调用func,大约耗时2
o 准备调用func,大约耗时1
v
e
u
耗时1之后,func函数运行结束
耗时2之后,func函数运行结束
耗时3之后,func函数运行结束
'''
我们发现,main是在主线程中的,而三个协程函数是在新线程中的,它们是在一起执行的,没有造成主线程main的阻塞。下面再看一下窗体函数中的实现。

3、tkinter+threading+asyncio

import tkinter as tk # 导入 Tkinter 库
import time
import asyncio
import threading

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.change_form_state)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()

self.root.mainloop()

async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

def get_loop(self,loop):
self.loop=loop
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def change_form_state(self):
coroutine1 = self.calculate()
new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
t = threading.Thread(target=self.get_loop,args=(new_loop,)) #通过当前线程开启新的线程去启动事件循环
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #这几个是关键,代表在新线程中事件循环不断“游走”执行


if __name__=='__main__':
form=Form()
运行上面的代码,我们发现,此时点击“开始计算”按钮执行耗时任务,没有造成窗体的任何阻塞,我可以最大最小化、移动等等,然后3秒之后标签会自动显示运算结果。为什么会这样?

上面的代码中,get_loop()、change_form_state()、__init__()都是定义在主线程中的,窗体的状态维护也是主线程,二耗时计算calculate()是一个异步协程函数。

现在单击“开始计算按钮”,这个事件发生之后,会触发主线程的chang_form_state函数,然后在该函数中,会创建新的线程,通过新的线程创建一个事件循环,然后将协程函数注册到新线程中的事件循环中去,达到的效果就是,主线程做主线程的,新线程做新线程的,不会造成任何阻塞。

4、multithreading+asyncio总结

第一步:定义需要异步执行的一系列操作,及一系列协程函数;

第二步:在主线程中定义一个新的线程,然后在新线程中产生一个新的事件循环;

第三步:在主线程中,通过asyncio.run_coroutine_threadsafe(coroutine,loop)这个方法,将一系列异步方法注册到新线程的loop里面去,这样就是新线程负责事件循环的执行。

三、使用asyncio实现一个timer
所谓的timer指的是,指定一个时间间隔,让某一个操作隔一个时间间隔执行一次,如此周而复始。很多编程语言都提供了专门的timer实现机制、包括C++、C#等。但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。

大致的思想如下,将timer定义为一个异步协程,然后通过事件循环去调用这个异步协程,让事件循环不断在这个协程中反反复调用,只不过隔几秒调用一次即可。

简单的实现如下(本例基于python3.7):

async def delay(time):
await asyncio.sleep(time)

async def timer(time,function):
while True:
future=asyncio.ensure_future(delay(time))
await future
future.add_done_callback(function)

def func(future):
print('done')

if __name__=='__main__':
asyncio.run(timer(2,func))

'''运行结果为:
done
done
done
done
done
done
done
done
done
done
done
.
.
.
.每隔2秒打印一个done
'''
几个注意点:asyncio.sleep()本身就是一个协程函数,故而可以将它封装成一个Task或者是Future,等待时间结束也就是任务完成,绑定回调函数。当然,本身python语法灵活,上面只是其中一种实现而已。
---------------------
作者:LoveMIss-Y
来源:CSDN
原文:https://blog.csdn.net/qq_27825451/article/details/86483493
版权声明:本文为博主原创文章,转载请附上博文链接!


---------------------
原文:https://blog.csdn.net/qq_27825451/article/details/86292513

原文地址:https://www.cnblogs.com/sea520/p/10823510.html