python----asyncio模块

1. 简介

说到asyncio就不得不说async、await,当一个函数前加了async之后就不在是一个函数了, 而是一个协程

2. 示例

两个协程平级并发调用

import asyncio

# 协程
async def print_hello():
    while True:
        print("hello world")
        await asyncio.sleep(1) # 暂停, cpu呗空出来

async def print_bye():
    while True:
        print("good bye")
        await asyncio.sleep(2) # 暂停, cpu呗空出来


co1 = print_hello() # 创建一个协程对象 并没有执行
co2 = print_bye() # 创建一个协程对象 并没有执行
# 创建事件循环
loop = asyncio.get_event_loop() # epoll
# loop.run_forever()
# loop.run_until_complete(co) # 监听事件, 调度一个
loop.run_until_complete(asyncio.gather(co1, co2)) # 监听事件, 如果调度多个需要使用gather

两个协程嵌套调用

import asyncio
import random
import functools

# 用于回调使用
def on_job_done(url, task): # 第二个是默认的task
    print("下载结束", url, task.result()) # task不仅仅有result(用于接收任务的返回值), 还有异常捕获等参数


async def crontab_scheduler():
    page = 1
    while True:
        url = '{}/{}'.format("https://www.baidu.com", page)
        await asyncio.sleep(1)
        job = cron_job(url)
        # await job # 错误,如果直接await 等待上一个job完成, 就会变成同步, 解决:直接交给Event loop,我们必须将新协程分离出去,让他和当前协程并发
        task = asyncio.create_task(job) # 注册到事件循环, 但是并没有将协程让出去
        task.add_done_callback(functools.partial(on_job_done, url)) # 完成回调
        # future = asyncio.ensure_future(job) # 与上边的区别是可以拿到结果 其中文档中更建议task
        # future.add_done_callback() #
        await asyncio.sleep(0) # 主动让出线程
        page +=1

async def cron_job(url):
    n = random.randint(1,3)
    await asyncio.sleep(n)
    print("下载结束", url)
    return "hhahaha"

co1 = crontab_scheduler() # 创建一个协程对象 并没有执行
loop = asyncio.get_event_loop()
loop.run_until_complete(co1)
# asyncio.run(crontab_scheduler()) # run方法在python3.7之后才会出现

 使用asyncio处理阻塞函数

使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行。

Loop.run_in_executor(executor, func, *args)
# executor : Executor 实例。如果为 None,则使用默认 executor。
# func 就是要执行的函数
# *args 就是传递给 func 的参数

该方法返回一个协程

示例

版本一 使用async

import asyncio
from time import sleep, strftime
from concurrent import futures
executor
= futures.ThreadPoolExecutor(max_workers=5) # 创建executor实例
async def blocked_sleep(name, t): print(strftime('[%H:%M:%S]'),end=' ') print('sleep {} is running {}s'.format(name, t)) loop = asyncio.get_event_loop() await loop.run_in_executor(executor, sleep, t) # 使用run_in_executor执行耗时方法 print(strftime('[%H:%M:%S]'),end=' ') print('sleep {} is end'.format(name)) return t
async
def main(): future = (blocked_sleep(i, i) for i in range(1, 6)) fs = asyncio.gather(*future) return await fs
loop
= asyncio.get_event_loop() results = loop.run_until_complete(main()) print('results: {}'.format(results))

调用协程不会使协程中的代码运行,仅仅是创建了一个协程对象, 可以用一下两种方式运行协程

  1. 在另一个协程中调用  await coroutine  和  yield from coroutine  (假定另一个协程已经在执行,即在事件循环中)
  2. 使用 ensure_future 函数或 AbstractEventLoop.create_task 方法来排定执行时间。

版本二 使用coroutine装饰器

import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} sleep:{}s....'.format(t, t))
    sleep(t)
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} finished'.format(t))
    return t
    
@asyncio.coroutine
def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 6)]
        fs = asyncio.wait(future) # wait函数需要传入一个list,并且返回两组Futures,(done, pending)
        return (yield from fs)
        
loop = asyncio.get_event_loop()
results, _ = loop.run_until_complete(main())
print('results: {}'.format([result.result() for result in results]))

 在第二份代码里,使用wait函数来等待任务结束,是为了记录一下不同的函数调用方法,和gather函数不同,wait函数需要传入一个list,并且返回两组Futures,(done, pending)。这就是为什么代码里使用 results, _ = loop.run_until_complete(main())的原因了。

下面是一个使用asyncio.as_comleted方法的例子,该方法返回一个协程迭代器。迭代时迭代器只返回已经完成的future。内部维护一个队列,每次迭代都从队列中返回已经完成的future的结果(result or exception),可以注意到在输出结果中,7秒后,所以任务才完成。因为executor大小设置为5,每次只有5个线程在跑,所以在第一个block运行结束后,我们可以看到第6个block立即执行。

import asyncio
from time import sleep, strftime
from concurrent import futures
def blocked(t):
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} sleep:{}s....'.format(t, t))
    sleep(t)
    print(strftime('[%H:%M:%S]'),end=' ')
    print('{} finished'.format(t))
    return t
@asyncio.coroutine
def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 7)]
        fs = asyncio.as_completed(future)
        results = []
        for f in fs:
            result = yield from f
            results.append(result)
        return results
  
loop = asyncio.get_event_loop()
results= loop.run_until_complete(main())
print('results: {}'.format(results))

总结

在asyncio中调用阻塞函数时,需要使用asyncio维护的线程池来另开线程运行阻塞函数,防止阻塞事件循环所在的线程。

函数传参返回值返回值顺序 函数意义
asyncio.gather 可以传递多个协程或者Futures,函数会自动将协程包装成task,例如协程生成器。 包含Futures结果的list 按照原始顺序排列   注重收集结果,等待一堆Futures并按照顺序返回结果
asyncio.wait a list of futures 返回两个Future集合 (done, pending) 无序(暂定)   是一个协程等传给他的所有协程都运行完之后结束,并不直接返回结果
asyncio.as_completed a list of futures 返回一个协程迭代器 按照完成顺序   返回的迭代器每次迭代只返回已经完成的Futures
原文地址:https://www.cnblogs.com/musl/p/13224561.html