asyncio -- 协程库

asyncio -- 协程库

event_loop: 事件循环,相当一个无线循环,将函数注册到这个时间循环中,满足条件时,调用适当的处理方法
    	    每一个线程都有一个事件循环,主线程调用asyncio.get_event_loop()时创建循环,把异步任务放入run_until_complete()方法中,事件循环会安排协同程序的执行
coroutine: 协程对象类型,将协程对象注册到时间循环中,它会被事件循环调用。可以使用async关键字来定义一个方法,这个方法调用时不会立即执行,而是返回一个协程对象
task: 任务,对协程对象的进一步封装,包含了任务的各个状态,running,finished等。
    
# 关键字
async: 定义一个协程
await: 用来挂起阻塞方法的执行

例:

import asyncio
import time

async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)  # 也是一个协程
    print('共用时%s'%(time.time() - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(do_task(3))
'''
结果: 
start....
共用时3.0001718997955322
'''

回调

# 例如IO操作,需要得到返回的结果,可以往future中添加回调来实现
import asyncio
import time

async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    print('共用时%s'%(time.time() - start))

def done_callback(result):   # result相当于一个对象
    print('执行回调函数')

result = asyncio.ensure_future(do_task(3))
result.add_done_callback(done_callback)

loop = asyncio.get_event_loop()
loop.run_until_complete(result)

多协程

# 先把协程存入列表中
coros = [do_task(1),do_task(2)]
# 然后把多个协程交给loop
loop.run_until_complete(asyncio.gather(*coros))
# 例
async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    print('共用时%s'%(time.time() - start))

coros = [do_task(2),do_task(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*coros))
'''
两个协程并发,结束时间以耗时较长的协程为准
start....
start....
共用时2.0001144409179688
共用时3.003171682357788
'''

多协程结果获取

# 没有loop的情况
async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    # print('共用时%s'%(time.time() - start))
    return '共用时%s'%(time.time() - start)

futus = [asyncio.ensure_future(do_task(2)),asyncio.ensure_future(do_task(3))]
loop = asyncio.get_event_loop()
print(loop.run_until_complete(asyncio.gather(*futus)))

'''
结果:['共用时2.0001144409179688', '共用时3.003171682357788']
'''
# 已经有loop 的情况  ?????
futus = [asyncio.ensure_future(do_task(2)),asyncio.ensure_future(do_task(3))]
result = await asyncio.gather(*futus)
print(str(result))

run_until_complete 和 run_forever

1. run_until_complete: 执行完立即结束 (内部也是调用run_forever)
2. run_forever : 执行完程序不结束,除非在协程调用中加 loop.stop()

import asyncio
import time
async def task(i):
    start = time.time()
    await asyncio.sleep(3)
    print('耗时%s'%(time.time()-start))
    loop.stop()

loop = asyncio.get_event_loop()
coro = task(3)
# loop.run_until_complete(coro)
asyncio.ensure_future(coro)
loop.run_forever()   # 协程中加loop.stop()


# 多个协程使用 run_forever问题: 第二个协程没完,loop 就停止了
# 解决: gather 将多个协程合成一个future,添加回调,里面终止loop
import asyncio
import time
async def task(i):
    start = time.time()
    await asyncio.sleep(3)
    print('耗时%s'%(time.time()-start))

def done_callback(loop,futu):
    print('callback 函数')
    loop.stop()
    
loop = asyncio.get_event_loop()
futus = asyncio.gather(task(2),task(3))  # 用gather合并
futus.add_done_callback(functools.partial(done_callback,loop))  # 调价回调
loop.run_forever()

把容易阻塞的函数task 定义成可调出协程

# 两次task之间相差4秒
async def task():
    time.sleep(4)
    return datetime.datetime.now()
tasks = [
    task(),
    task()
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
print(results)
'''
结果:
[datetime.datetime(2019, 10, 24, 15, 8, 26, 758582), datetime.datetime(2019, 10, 24, 15, 8, 22, 758353)]
'''
# 把容易阻塞的函数task 定义成可调出协程
def task():
    time.sleep(4)
    return datetime.datetime.now()

async def fetch_async(func,*args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None,func,*args)
    result = await future
    return result

tasks = [
    fetch_async(task),
    fetch_async(task)
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
print(results)

'''
结果:两个task 几乎同时结束
[datetime.datetime(2019, 10, 24, 15, 11, 28, 260963), datetime.datetime(2019, 10, 24, 15, 11, 28, 260963)]
'''

call_later()

# call_soon 来调用display_date
# display_data中用 call_later 每隔一分钟调用dispaly_data,直到条件不满足
import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop) # (delay,func,*args)
    else:
        loop.stop()
loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

其他

# loop.close
loop.run_until_complete(do_task(3))
loop.close()
loop.run_until_complete(do_task(2))  # 此处异常,loop已经关闭

原文地址:https://www.cnblogs.com/Afrafre/p/11798575.html