Python说文解字_Python之多任务_05

问:在Py3.5之前yield表现非常好,在Py3.5之后为了将予以变得更加明确,就引入了async和await关键词用于定义原生的协议。

答:async和await原生协程:

async def downloader(url):
    return "bobby"

async def downloader_url(url):
    # do something
    html = await downloader(url)
    return html

if __name__ == '__main__':
    coro = downloader_url("http://www.baidu.com")
    next(None)
    # coro.send(None)

# 1.如果调用send正常
# StopIteration: bobby
# 2.如果调用next草异常,因此原生协程只能用send
# TypeError: 'NoneType' object is not an iterator
# sys:1: RuntimeWarning: coroutine 'downloader_url' was never awaited

  我们发现:原生协程只能用send不能用next。而且发现 原生协程和yield协程差不多,前面加上了async语法,await类似于yield from。Python引入了async和await原生协程是为了我们的语义更加的清晰。如果我们用生成器写出的协程的话,代码非常的凌乱的。因为它又能当生成器又能当协程,显得比较凌乱,将这两种区分开来。因此async里面是不能定义yield的。因此Python加强了我们的区别。因此这两个是一对的。这样我们的协程区分开来。前面说了那么多生成器就是为了加强协程的理解。这样我们在协程里面就用这两个。因此在Python内部依然沿用了生成器的原理,来实现了我们的协程。

  await后跟随的Awaitbale对象。我们可以通过from  collections import Awaitalbe模块。

  其实这个是实现了魔法拿书中的__await__的方法,因此我们还可以使用装饰器的方法来操作,省去asyn,而变换成我们熟悉的生成器的样子。代码如下:

import types

@types.coroutine
def downloader(url):
    yield "bobby"

async def downloader_url(url):
    # do something
    html = await downloader(url)
    return html

if __name__ == '__main__':
    coro = downloader_url("http://www.baidu.com")
    # next(None)
    coro.send(None)

问:生成器是如何变成我们协程的?

答:在开始我么引入过协程的需求,我们的协程是通过单线程调度,协程是我们函数级别的是由我们程序员自己来决定调用的,我们可以写同步代码一样写异步代码。我们的生成器就可以完成我们的协程的这么一个功能。我们现在就可以用协程来模拟我们的需求。

  生成器是可以暂停的函数,实际上生成器是可以有状态的!

  我们看这段代码

import inspect
def gen_func():
    yield 1
    return "bobby"

if __name__ == '__main__':
    gen = gen_func()
    print(inspect.getgeneratorstate(gen)) # GEN_CREATED
    next(gen)
    print(inspect.getgeneratorstate(gen)) # GEN_SUSPENDED
    try:
        next(gen)
    except StopIteration:
        pass
    print(inspect.getgeneratorstate(gen)) # GEN_CLOSED

  通过inspect中的getgeneratorstate我们来观察生成器的状态,实际上我们在定义我们的生成器的时候,生成器可以接收我们的值。这句话有两个意思:第一是返回值给调用方,第二调用方通过send方式返回值跟gen。现在我们生成器由“生产者”变为“消费者”。

  1.我们用同步的方式编写异步的代码。

  2.在适当的时候暂停函数,并在适当的时候启动函数。

  现在我们模式:事件循环+协程模式

  我们在函数当中的子函数,如果出现异常,会抛给这个函数的主函数,是“向上抛”的过程。这个就很好。协程是一个单线程模式

问:异步IO和IO复用,也就是同步IO和异步IO。

答:我们对前面的东西略微做一个小结:

  异步IO和协程:现在我们还没有把协程来用到我们的编码当中,协程是需要事件循环来实现的。单独使用的话作用不是很明显。

  在最开始的时候我么说到了并发、并行、异步、同步、阻塞、非阻塞。

  在IO多路复用(同步IO)当中的select poll epoll,使我们使用的最多的技术。回调+事件循环的方式。这种编程模式和同步IO的编程模式差别很大。

  因此这两种模式:回调+事件循环(IO多路复用)、协程+事件循环(异步IO)

  上面的编码是非常痛苦的:回调之痛。

  我们引入了生成器和协程,协程并不会别上面的方式高,协程主要解决的问题是回调之痛的问题和编码习惯的问题。

  我们可以将生成器编程我们的协程了。

  最后引入了async和await来区别生成器和协程,不容易混乱,进行区分。我们可以用Cororoutine装饰器的方式,就不要用了。

  所以建议使用async和await的方式。

问:async IO并发编程:

答:该模块是在Python3.4后引入的模块,这是Python编程中最难的部分。该模块也是Python最具野心的模块。分几个部分开始讲解:

  1. 事件循环:

  我们可以把async IO看做一个模块也可看做一个框架,它完成了整套异步编程中最核心的内容。它包含各种特定系统实现的模块化事件循环,传输和协议抽象;对TCP,UDP,SSL,子进程,延时调用以及其他的具体支持;模仿futures模块但适用于事件循环使用Future类;基于yield from的协议和任务,可以让你用顺序的方式编写并发代码;必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池。可以将多进程和多线程协调进来。

  协程编码模式都逃离不掉三个要素:事件循环+调用(驱动生成器)+epoll(IO多路复用)

  asyncio 是Python用于解决异步IO编程的一整套解决方案。

  tornado、gevent、twisted(scrapy,django channels)

  tornado:实现了web服务器,djago+flask是Python最传统要搭配(uwsgi,gunicorn+nginx),tornado可以直接部署,nginx+tornado

  使用asyncio

import asyncio # 可以当做协程池来理解比较容易

import time


async def get_html(url):
    print("start get url")
    # time.sleep(2) # 阻塞式的IO不能写在里面
    await asyncio.sleep(2) # 不能使用import time,必须要加await
    print("end get url")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_html("htttp://www.baidu.com"))
    print(time.time() - start_time)

# start get url
# end get url
# 2.0150375366210938

   get_event_loop市价循环

  run_until_complete去执行

  这里不能用time.sleep这是阻塞式的方法。因此会单独的一个一个执行非常慢,所以要使用asynic中的sleep

import asyncio # 可以当做协程池来理解比较容易

import time


async def get_html(url):
    print("start get url")
    # time.sleep(2) # 阻塞式的IO不能写在里面
    await asyncio.sleep(2) # 不能使用import time,必须要加await
    print("end get url")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html("htttp://www.baidu.com") for i in range(100)]
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)

# start get url
# end get url
# 2.0150375366210938
# time编程顺序执行。asyncio.sleep()可以立即执行。只要一个地方阻塞了其他方面都实现不了。

  我们发现更改后就会阻塞。

import asyncio # 可以当做协程池来理解比较容易

import time


async def get_html(url):
    print("start get url")
    # time.sleep(2) # 阻塞式的IO不能写在里面
    await asyncio.sleep(2) # 不能使用import time,必须要加await
    return "bobby"
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # get_future = asyncio.ensure_future(get_html("htttp://www.baidu.com"))
    # loop.create_task()
    # tasks = [get_html("htttp://www.baidu.com") for i in range(100)]
    task = loop.create_task(get_html("htttp://www.baidu.com"))
    loop.run_until_complete(task)
    print(time.time() - start_time)
    print(task.result())


# 获取协程的返回值

  我们用协程调用线程池:ensure_funture

  使用方法还有create_task这两种都是比较好理解的。

import asyncio # 可以当做协程池来理解比较容易

import time
from functools import partial

async def get_html(url):
    print("start get url")
    # time.sleep(2) # 阻塞式的IO不能写在里面
    await asyncio.sleep(2) # 不能使用import time,必须要加await
    return "bobby"

def callback(url,future):
    print("send email to bobby")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # get_future = asyncio.ensure_future(get_html("htttp://www.baidu.com"))
    # loop.create_task()
    # tasks = [get_html("htttp://www.baidu.com") for i in range(100)]
    task = loop.create_task(get_html("htttp://www.baidu.com"))
    # task.add_done_callback(callback)
    task.add_done_callback(partial(callback,"htttp://www.baidu.com"))
    loop.run_until_complete(task)
    print(time.time() - start_time)
    print(task.result())


# 获取协程的返回值

  我们也可以使用回调,在task中的重写add_done_callback方法。

import asyncio # 可以当做协程池来理解比较容易

import time

async def get_html(url):
    print("start get url")
    # time.sleep(2) # 阻塞式的IO不能写在里面
    await asyncio.sleep(2) # 不能使用import time,必须要加await
    print("end get url")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html("htttp://www.baidu.com") for i in range(100)]
    # loop.run_until_complete(asyncio.wait(tasks))
    loop.run_until_complete(asyncio.gather(*tasks))
    print(time.time() - start_time)

# wait 和 gather 的区别
# gather更加高层,可以将我们task分组
    group1 = [get_html("htttp://www.baidu1.com") for i in range(100)]
    group2 = [get_html("htttp://www.baidu2.com") for i in range(100)]
    loop.run_until_complete(asyncio.gather(*group1,*group2))

    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)

    group2.cancel()

  我们尽量使用gather方法,注意他是可以将我们task进行分组,后面要加上*参数的形式。

  2. task取消、嵌套、字写成调用原理

# import asyncio
#
# loop = asyncio.get_event_loop()
# loop.run_forever()
# loop.run_until_complete()
# 1.loop会被放到future中。
# 2.取消future(task)
import asyncio
import time

async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after {}s".format(sleep_times))

if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(2)
    tasks = [task1,task2,task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_task = asyncio.Task.all_tasks()
        for task in all_task:
            print("cancel task")
            task.cancel()
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

  3. call_soon() 即刻执行,call_later(),call_at()

  4.ThreadPoolExector   + asyncio

  使用多线程:在协程中继承阻塞io

  生成器 = ThreadPoolExecutor

  run_in+executor(生成器,函数,参数)

11111

原文地址:https://www.cnblogs.com/noah0532/p/11025187.html