协程&异步编程&asyncio

协程&异步编程&asyncio

1.项目示例

1.1在python3.7 之前的版本

主要为

asyncio.ensure_future() 实例化task

使用

loop = asyncio.get_event_loop()

done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))

来运行

import asyncio

async def run():
    print("hello")
    await asyncio.sleep(5)
    print("world")

async def run1():
    print("hello")
    await asyncio.sleep(3)
    print("world")

# 创建并发
tasks = [asyncio.ensure_future(run()), asyncio.ensure_future(run1())]
# 创建应用池
loop = asyncio.get_event_loop()
# done 完成的
# deping 未完成的
# timeout 最长等带时间
done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))
print("done:", done)
print("deping:", deping)
hello
hello
world
done: {<Task finished name='Task-2' coro=<run1() done, defined at c:UsersAdministratorDesktopexam协程旧的版本.py:8> result=None>}
deping: {<Task pending name='Task-1' coro=<run() running at c:UsersAdministratorDesktopexam协程旧的版本.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001EF104A8040>()]>>}

1.2.在python3.7 之后

将 手动创建 task 和 运行方式进行了修改

使用: async.create_task() 来创建task 实例

使用 : asyncio.run() 来运行

示例:

import asyncio

async def run():
    print("hello")
    await asyncio.sleep(2)
    print("world")
    return 1

async def run1():
    print("hello")
    await asyncio.sleep(5)
    print("world")
    
# 主函数
async def main():
    # 创建task, name 任务名称
    tasks = [ asyncio.create_task(run(), name="n1"), 
             asyncio.create_task(run(), name="n2")]
    # done 已完成的
    # pending 未完成的
 	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
    done, pending =  await asyncio.wait(tasks, timeout=None)
    return done, pending

# 运行
done, pending = asyncio.run( main() )
print("done:", done)
print("pending:", pending)
C:UsersAdministratorDesktopexam>C:/Python39/python.exe "c:/Users/Administrator/Desktop/exam/协程/1. 简单示例3.4后.py"
hello
hello
world
world
done: {<Task finished name='n1' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1. 简单示例3.4后.py:3> result=1>, <Task finished name='n2' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1.  
简单示例3.4后.py:3> result=1>}
pending: set()

2.await ----->等待

​ await + 可等待对象 (协程对象, Future 对象, Task 对象) ---> IO 等待

携程对象: ----> async 修饰的函数 + ()

​ 流程:

​ 如果有其他任务, 在遇到IO等待(await) 的时候去执行其他任务。

示例1:

import asyncio

async def other():
    print("hello")
    await asyncio.sleep(3)
    print("world")
    return "返回值"
asyncio.run(other())

hello
world

示例2:

import asyncio

async def other():
    print("hello")
    await asyncio.sleep(3)
    print("world")
    return "返回值"


async def func():
    print("执行协程函数内部代码")
    #respone = await func()
    task = asyncio.create_task(other())
    print("执行结束")

asyncio.run(func())

结果:

执行协程函数内部代码
执行结束
hello

在这里我们发现, 我们开始执行协程内部代码遇到内部的await 并没有等,而是直接结束了,

示例3:

import asyncio

async def other():
    print("hello")
    await asyncio.sleep(3)
    print("world")
    return "返回值"


async def func():
    print("执行协程函数内部代码")
    respone = await other()
    print("执行结束")

asyncio.run(func())

执行协程函数内部代码
hello
world
执行结束

我们发现这样只有等内部结束后才会执行,外部接下来的代码

示例4:

import asyncio

async def other():
    print("hello")
    await asyncio.sleep(3)
    print("world")
    return "返回值"

async def other2():
    print("other---->开始")
    await asyncio.sleep(5)
    print("other ----> 结束")

async def func():
    print("执行协程函数内部代码")
    respone = await other()
    respone2 = await other2()
    print("执行结束")

asyncio.run(func())

执行协程函数内部代码
hello
world
other---->开始
other ----> 结束
执行结束

同样, await 结束后才执行下一个

3.task

​ 在事件循环中并发的添加多个任务的。可以说解决了上面的 await 的问题。

1. 使用asyncio.create_task()  创建 3.7 之后, 建议
2. loop.create_task() 或者 asyncio.ensure_future() 来创建
async def run():
    print("hello")
    await asyncio.sleep(2)
    print("world")
    return 1

async def run1():
    print("hello")
    await asyncio.sleep(5)
    print("world")
    
    
tasks = [run(), run1()]
done, pending = asyncio.run( asyncio.wait(tasks) )
print("done:", done)
print("pending:", pending)
import asyncio

async def run():
    print("hello")
    await asyncio.sleep(2)
    print("world")
    return 1

async def run1():
    print("hello")
    await asyncio.sleep(5)
    print("world")
    
# 主函数
async def main():
    # 创建task, name 任务名称
    tasks = [ asyncio.create_task(run(), name="n1"), 
             asyncio.create_task(run1(), name="n2")]
    # 或者
    # tasks = [
    #    run(),
    #    run1()
    # ]
    # done 已完成的
    # pending 未完成的
 	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
    done, pending =  await asyncio.wait(tasks, timeout=None)
    return done, pending

# 运行
done, pending = asyncio.run( main() )
print("done:", done)
print("pending:", pending)

4. asyncio.Future 对象

他是 task 的基类,一个更低级的接口帮助我们等待task的结果

Task 继承 Future, Task 对象内部 await 结果的处理基于Future 对象来的。

示例1:

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象), 这个任务什么都不干
    fut = loop.create_future()
    # 这个测试中这里始终等不到对象
    await fut
asyncio.run(main())

示例2:

import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    # 给fut 设置结果
    fut.set_resukt("666")

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象), 这个任务什么都不干
    fut = loop.create_future()
    await loop.create_task(set_after(fut))
    # 这里会等到set_after 为fut设置结果, 等待结束
    data = await fut
    print(data)
asyncio.run(main())

5. concurrent.futures.Future 对象

使用线程池、进程池实现异步操作时用到的对象。

帮助我们来获取结果

https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

示例1:

import asyncio
import time
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    print("hello io")
    time.sleep(5) # 这时候这里可以是任何需要等待结果的操作
    print("world io")
    return "1111"

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print("hello cpu")
    time.sleep(5)
    print("world cpu")
    return "2222"

async def f(fun):
    # 重点, 以线程池或进程池的方式启动
    loop = asyncio.get_event_loop()
    # run_in_executor 的方式启动,则可变成可等待对象。
    result =  loop.run_in_executor(
        None, fun)
    # 等待结果
    response = await result
    print(response)

async def main():
    task = [ f(blocking_io), f(cpu_bound)]
    await asyncio.wait(task)

asyncio.run( main())


# 注
# get_event_loop  只会在主线程帮您创建新的 event loop
# get_running_loop 用于获取当前正在运行的loop,如果当前主线程中没有正在运行的loop,如果没有就会报RuntimeError 错误。

本来这种方式是不支持 async, 目前的实现结合了async 实际上是通过 线程池来实现的

loop.run_in_executor( None, fun)

第一个参数传None 使用默认, 在上边连接官网中有介绍

同时也可以把 asyncio.run(main()) 改成

task = [ f(blocking_io), f(cpu_bound)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task))

6.异步上下文管理器

此种对象通过定义 __aenter__()__aexit__()方法来对 async_with 语句中的环境进行控制。

import asyncio

class AsyncContentManager:
    def __init__(self):
        pass
    
    async def do_something(self):
        return 666
    
    async def __aenter__(self):
        # 连接数据库   
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库
        await asyncio.sleep(1)

async def func():
    async with AsyncContentManager() as f:
        result = await f.do_something()
asyncio.run( func() )

7. uvloop

​ 是asyncio的事件循环的替代方案 > 默认asyncio的事件循环。

pip install uvloop	
import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码,与之前写的代码一致
# 内部的事件循环自动化会变成uvloop
asyncio.run()

8. 实操

​ 8.1 异步redis

​ 在python 操作redis, 连接/操作/断开都是网络IO

pip install aioredis

示例1:

import asyncio
import aioredis

async def execute(address, passwd, test):
    """
    	@address: redis 连接地址
    	@passwd: redis 密码
    """
    print("开始执行", test)
    
    redis = await aioredis.create_redis_pool(address=address, password=passwd)
    # 网络io操作
    await redis.hmset_dict("car", key1=1, key2=2, key3=3)
    
    # 网络io操作、遇到io自动切换任务
    result = await redis.hgetall("car", encoding="utf-8")
    print(result)
    
    redis.close()
    #  网络io操作, 遇到io会自动切换任务
    await redis.wait_closed()
    print("结束", test)

tasks = [
    execute("redis://127.0.0.1:6379", "295213", "测试1"),
    execute("redis://127.0.0.1:6379", "295213", "测试2")
]

asyncio.run(asyncio.wait(tasks))

结果

开始执行 测试2
开始执行 测试1
{'key1': '1', 'key2': '2', 'key3': '3'}
{'key1': '1', 'key2': '2', 'key3': '3'}
结束 测试2
结束 测试1

8.2 异步mysql

pip install aiomysql

示例1:

import asyncio
import aiomysql

async def execute(host, password, test):
    print("开始", test)
    # 建立连接
    conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="work")
    # 创建指针
    cur = await conn.cursor()
    
    # 查询
    await cur.execute("select * from users limit 1")
    data = await cur.fetchall()
    print(data)
    await cur.close()
    conn.close()
    print("结束", test)
    
tasks = [
    execute("127.0.0.1", "295213", "测试1"),
    execute("xxx.xx.xx.xx", "295213", "测试2")
]

asyncio.run(asyncio.wait(tasks))
原文地址:https://www.cnblogs.com/ShanCe/p/14550146.html