并发编程之异步asyncio

协程简介

协程不是计算机提供的, 而是程序员任务创造的。

协程(Coroutine), 也可以被称为微线程, 是一种用户态上下文切换技术。简而言之, 其实就是通过一个线程实现代码块互相切换执行。

实现协程有这么几种方法

  • greenlet, 早期模块
  • yield关键字
  • asyncio装饰器
  • async、await关键字

greenlet实现协程

from greenlet import greenlet


def func1():
    print(1)        # 第二步: 输出 1
    gr2.switch()    # 第三步: 切换到 func2 函数
    print(2)        # 第六步: 输出 2
    gr2.switch()    # 第七步: 切换到 func2 函数, 从上一次执行的位置继续向后执行


def func2():
    print(3)        # 第四步: 输出 3
    gr1.switch()    # 第五步: 切换到 func1 函数, 从上一次执行的位置继续向后执行
    print(4)        # 第八步: 输出 4


gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()    # 第一步: 去执行 func1 函数

yield关键字

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)

asyncio

在Python3.4及之后的版本才能使用asyncio

import asyncio


@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意: asyncio遇到IO阻塞会自动切换, 而上述其他的操作遇到IO阻塞是手动切换的

async&await关键字

在Python3.5及之后的版本

import asyncio


async def func1():
    print(1)
    await asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

协程的意义

在一个线程中如果遇到IO等待时间, 线程不会傻傻的等待, 会利用空闲的时候再去干其他的事情

案例: 下载三张图片(网络IO)

  • 普通方式(同步)
import requests


def download_image(url):
    print('开始下载', url)

    response = requests.get(url)
    print('下载完成')

    # 图片保存到本地
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = [
        'https://car2.autoimg.cn/cardfs/product/g28/M0A/19/BF/1024x0_1_q95_autohomecar__ChcCR13C4O2AHX-VAAhFPllxCw8142.jpg',
        'https://car3.autoimg.cn/cardfs/product/g28/M07/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OyAYyFGAAkPNXso9Ts320.jpg',
        'https://car2.autoimg.cn/cardfs/product/g28/M02/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OqAcAzoAAhDQ5rjm3k807.jpg'
    ]
    for item in url_list:
        download_image(item)
View Code
  • 协程方式(异步)
import aiohttp
import asyncio


async def fetch(session, url):
    print('发送请求', url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)
    print('下载完成')


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://car2.autoimg.cn/cardfs/product/g28/M0A/19/BF/1024x0_1_q95_autohomecar__ChcCR13C4O2AHX-VAAhFPllxCw8142.jpg',
            'https://car3.autoimg.cn/cardfs/product/g28/M07/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OyAYyFGAAkPNXso9Ts320.jpg',
            'https://car2.autoimg.cn/cardfs/product/g28/M02/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OqAcAzoAAhDQ5rjm3k807.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

        await asyncio.wait(tasks)


if __name__ == '__main__':
    asyncio.run(main())
View Code

异步编程

事件循环

事件循环可以理解成为一个死循环, 会去检查并执行某些代码。

# 伪代码

任务列表 = [ 任务1, 任务2, 任务3, ...]

while True:
    可执行的任务列表, 已完成的任务列表 = 去任务列表中检查所有的任务, 将'可执行''已完成'的任务返回
    
    for 就绪任务 in 可执行的任务列表:
        执行就绪任务
    
    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除已完成的任务

    如果 任务列表中的的任务都已完成, 则终止循环
import asyncio

# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到"任务列表"
loop.run_until_complete(任务)

快速上手

协程函数, 定义函数的时候async def 函数名, 则称该函数为协程函数

协程对象, 执行协程函数()可以得到协程对象

async def func():
    pass

result = func()

注意: 执行协程函数创建协程对象的时候, 函数内部代码不会执行

如果想要运行协程函数内部代码, 必须要将协程对象交给事件循环来处理

import asyncio


async def func():
    print('gogogo')


result = func()

loop = asyncio.get_event_loop()
loop.run_until_complete(result)

await关键字

await关键字 + 可等待的对象(协程对象、Future、Task对象 -> IO等待)

示例1

import asyncio

async def func():
    print('来玩啊')
    response = await asyncio.sleep(2)
    print('结束', response)


asyncio.run( func() )

示例2

import asyncio

"""
async def func():
    print('来玩啊')
    response = await asyncio.sleep(2)
    print('结束', response)


asyncio.run(func())
"""


async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')

    return '返回值'


async def func():
    print('执行协程函数内部代码')

    # 遇到IO操作挂起当前协程(任务), 等IO操作完成之后再继续往下执行, 当前协程挂起时, 事件循环可以去执行其他协程(任务)
    response = await others()

    print('IO请求结束, 结果为:', response)


asyncio.run(func())

示例3

async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')

    return '返回值'


async def func():
    print('执行协程函数内部代码')

    # 遇到IO操作挂起当前协程(任务), 等IO操作完成之后再继续往下执行, 当前协程挂起时, 事件循环可以去执行其他协程(任务)
    response1 = await others()
    print('IO请求结束, 结果为:', response1)

    response2 = await others()
    print('IO请求结束, 结果为:', response2)


asyncio.run(func())

await就是等待对象的值得到结果之后再继续向下执行。

Task对象

Tasks are used to schedule coroutines concurrently

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon

Task对象是用来在事件循环中添加多个任务的

Task用于并发调度协程, 通过asyncio.create_task(协程对象)的方式创建Task对象, 这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数以外, 还可以用低层级的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象

注意: asyncio.create_task()函数在Python3.7中被加入。在Python3.7之前, 可改用低层级的asyncio.ensure_future()函数

示例1:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return '返回值'


async def main():
    print('main开始')

    # 创建Task对象, 将当前执行func函数的任务添加到事件循环
    task1 = asyncio.create_task(func())

    # 创建Task对象, 将当前执行func函数的任务添加到事件循环
    task2 = asyncio.create_task(func())

    print('main结束')

    # 当执行某协程遇到IO的时候, 会自动化切换执行其他任务
    # 此处的await是等待相对于的协程全都执行完毕并获取数据
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)


asyncio.run(main())

示例2:

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return '返回值'


async def main():
    print('main开始')

    task_list = [
        asyncio.create_task(func()),
        asyncio.create_task(func())
    ]

    print('main结束')

    done, pending = await asyncio.wait(task_list, timeout=1)
    print(done)


asyncio.run(main())

示例3:

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return '返回值'


task_list = [
    func(),
    func()
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

asyncio.Future对象

A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.

Task继承Future, Task对象内部await结果的处理基于Future对象来的

示例1:

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务(Future对象), 这个任务什么都不干
    fut = loop.create_future()

    # 等待任务最终结果(Future对象), 没有结果则会一直等待下去
    data = await fut
    print(data)

asyncio.run( main )

示例2

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')


async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 创建一个任务(Future对象), 没绑定任何行为, 这个任务永远不知道什么时候结束.
    fut = loop.create_future()

    # 创建一个任务(Task对象), 绑定了set_after函数, 函数内部在2s之后会给fut赋值.
    # 即受到设置future任务的最终结果, 那么fut就可以结束了
    await loop.create_task(set_after(fut))

    # 等待Future对象获取最终结果, 否则一直等下去
    data = await fut
    print(data)


asyncio.run(main())

concurrent.futures.Future对象

concurrent.funtures.Future对象是使用线程池、进程池实现异步操作时用到的对象。

import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(1)
    print(value)
    return 123

# 创建线程池
pool = ProcessPoolExecutor(max_workers=5)

# 创建进程池
# pool = ThreadPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)
    print(fut)
import time
import asyncio
import concurrent.futures


def func1():
    # 某个耗时操作
    time.sleep(2)
    return 'hello world'


async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor(默认ThreadPoolExecutor)
    # 第一步: 内部会先调用ThreadPoolExecutor的submit方法去线程池中申请一个线程去执行fun1函数, 并返回一个concurrent.future.Future对象
    # 第二步: 调用asyncio.wrap_future对象将concurrent.futures.Future对象包装成asycio.Future对象
    # 因为concurrent.futures.Future对象不支持await语法, 所以需要包装成asyncio.Future对象才能使用
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1
    #
    #     )
    #     print('custom thread pool', result)

    # 3. Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1
    #     )
    #     print('custom process pool', result)

asyncio.run(main())

案例: asyncio + 不支持异步模块

异步迭代器

什么是异步迭代器

实现了__aiter__()和__anext__()方法的对象。__anext__必须返回一个awaitable对象。async for会处理异步迭代器的__anext__()方法所返回的可迭代对象, 直到其引发一个StopAsyncIteration异常。

什么是异步可迭代对象

可在async for语句中被使用的对象。必须通过它的__aiter__()方法返回一个asyncchronous iterator。

import asyncio


class Reader(object):
    """ 自定义异步迭代器(同时也是异步可迭代对象) """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val

async def func():
    obj = Reader()
    async for item in obj:
        print(item)

asyncio.run(func())

异步上下文管理器

异步上下文管理器对象通过定义__aenter__()和__aexit__()方法来对async with语句中的环境进行控制

import asyncio


class AsyncContextManager(object):
    # def __init__(self):
        # self.conn = conn

    async def do_something(self):
        # 异步操作数据库
        return 666

    async def __aenter__(self):
        # 异步链接数据库
        # self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库
        await asyncio.sleep(1)


async def main():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)


if __name__ == '__main__':
    asyncio.run(main())

uvloop

是asyncio的事件循环的替代方案。uvloop事件循环大于默认asyncio的事件循环

pip install uvloop
import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码, 与之前的代码一致

# 内部的事件循环自动化会变成uvloop
asyncio.run(...)

案例

异步操作Redis

在使用Python代码操作Redis时, 链接/操作/断开都是网络IO

pip install ailredis

示例1:

import asyncio
import aioredis


async def execute(address, password):
    print('开始执行: ', address)
    # 网络IO操作: 创建Redis连接
    redis = aioredis.create_redis(address, password=password)

    # 网络IO操作: 在Redis中设置哈希值car, 内部再设置三个键值对, 即: redis = {car: {key1:1, key2:2, key3:3}}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)

    # 网络IO操作: 去Redis中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)

    redis.close()
    # 网络IO操作: 关闭Redis连接
    await redis.wait_closed()

    print('结束: ', address)

asyncio.run(execute('redis://127.0.0.1:6379', 'redis'))

示例2:

import asyncio
import aioredis


async def execute(address, password):
    print('开始执行: ', address)
    # 网络IO操作: 创建Redis连接
    redis = aioredis.create_redis(address, password=password)

    # 网络IO操作: 在Redis中设置哈希值car, 内部再设置三个键值对, 即: redis = {car: {key1:1, key2:2, key3:3}}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)

    # 网络IO操作: 去Redis中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)

    redis.close()
    # 网络IO操作: 关闭Redis连接
    await redis.wait_closed()

    print('结束: ', address)

task_list = [
    execute('redis://127.0.0.1:6379', 'redis'),
    execute('redis://127.0.0.1:6379', 'redis'),
]

asyncio.run(asyncio.wait(task_list))

异步MySQL

pip install aiomysql

示例1

import asyncio
import aiomysql


async def execute():
    # 网络IO操作: 连接MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql')

    # 网络IO操作: 创建CURSOR
    cursor = await conn.cursor()

    # 网络IO操作: 执行SQL
    await cursor.execute('SELECT HOST,User FROM user')

    # 网络IO操作: 获取SQL结果
    result = await cursor.fetchall()
    print(result)

    # 网络IO操作: 关闭连接
    await cursor.close()
    conn.close()


asyncio.run(execute())

示例2

import asyncio
import aiomysql


async def execute(host, port, user, password, db):
    # 网络IO操作: 连接MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql')

    # 网络IO操作: 创建CURSOR
    cursor = await conn.cursor()

    # 网络IO操作: 执行SQL
    await cursor.execute('SELECT HOST,User FROM user')

    # 网络IO操作: 获取SQL结果
    result = await cursor.fetchall()
    print(result)

    # 网络IO操作: 关闭连接
    await cursor.close()
    conn.close()


task_list = [
    execute(host='127.0.0.1', port=3306, user='root', password='123', db='mysql'),
    execute(host='127.0.0.1', port=3306, user='root', password='123', db='mysql'),
]

asyncio.run(asyncio.wait(task_list))

异步爬虫

import aiohttp
import asyncio


async def fetch(session, url):
    print('发送请求:', url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print('得到结果:', url, len(text))


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://python.org',
            'https://www.baidu.com',
            'http://www.pythonav.com'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

        await asyncio.wait(tasks)


if __name__ == '__main__':
    asyncio.run(main())
原文地址:https://www.cnblogs.com/featherwit/p/13404115.html