Python 协程

迭代器

  • 可迭代 (Iterable):直接作用于for循环变量
  • 迭代器 (Iterator):直接作用于for循环变量,并且可以用next调用
  • 鉴别,可用isinstancle()

生成器

  • 不用for,占内存小,一边循环一边计算——时间换空间

  • next函数调用,到最后一个,报StopIteration异常

  • 生成:

    1. 直接使用
    g = (x * x for x in range(10)) # 中括号是列表生成器,小括号是生成器
    
    1. 函数包含yield,则叫生成器,next调用函数,遇yield返回
    def odd():
        print('step 1')
        yield 1
        print('step 2')
        yield(3)
        print('step 3')
        yield(5)
        
        
    g = odd()
    one = next(g)
    print(one)
    two = next(g)
    print(two)
    three = next(g)
    print(three)
    

    Step 1
    1
    Step 2
    2
    Step 3
    3

    注意:此时g是一个generator, next调用从上次yield后开始

    1. for循环调用生成器
    def fib(max):
        n, a, b = 0, 0, 1
        while n < max:
            yield b
            a, b = b, a+b
            n += 1
        return 'Done'
    
    
    g = fib(5)
    
    for i in g:
        print(i)
    

    1
    1
    2
    3
    5

协程

  • 定义:为非抢占式多任务产生子程序,可以暂停执行——像generator一样

  • 关键字:yieldsend

    def simple_coroutine(a):
        print('-> start')
    
        b = yield a
        print('-> recived', a, b)
    
        c = yield a + b
        print('-> recived', a, b, c)
    
    
    # runc
    sc = simple_coroutine(5)
    
    aa = next(sc)  # 预激
    print(aa)
    bb = sc.send(6)  # 5, 6
    print(bb)
    cc = sc.send(7)  # 5, 6, 7
    print(cc)
    

    -> start
    5
    -> recived 5 6
    11
    -> recived 5 6 7

    分析:动脑子

  • 协程终止:向上冒泡,发送哨符值,让协程退出

  • yield from:相当于加一个通道(协程与主线程间)

    def gen():
        for c in 'AB':
            yield c
    
    
    print(list(gen()))
    
    
    def gen_new():
        yield from 'AB'
    
    
    print(list(gen_new()))
    
  • 委派生成器:包含yield from的生成器函数

    from collections import namedtuple
    
    ResClass = namedtuple('Res', 'count average')
    
    
    # 子生成器
    def averager():
        total = 0.0
        count = 0
        average = None
    
        while True:
            term = yield
            if term is None:
                break
            total += term
            count += 1
            average = total / count
    
        return ResClass(count, average)
    
    
    # 委派生成器
    def grouper(storages, key):
        while True:
            # 获取averager()返回的值
            storages[key] = yield from averager()
    
    
    # 客户端代码
    def client():
        process_data = {
            'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
            'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
        }
    
        storages = {}
        for k, v in process_data.items():
            # 获得协程
            coroutine = grouper(storages, k)
    
            # 预激协程
            next(coroutine)
    
            # 发送数据到协程
            for dt in v:
                coroutine.send(dt)
    
            # 终止协程
            coroutine.send(None)
        print(storages)
    
    # run
    client()
    

    {'boys_2': Res(count=9, average=40.422222222222224), 'boys_1': Res(count=9, average=1.3888888888888888)}

    解释:

    1. client()函数开始,for k, v 循环内,每次创建一个新的grouper实例coroutine
    2. next(coroutine)预激协程,进入while True循环,调用averager(),yield from处暂停
    3. 内层for dt in v 结束后,grouper实例仍暂停,所以storages[key]的赋值还未完成
    4. coroutine.send(None)后,term变为None,averager子生成器中止,抛出StopIteration,并将返回的数据包含在异常对象的value中,yield from 直接抓取 StopItration ,将异常对象的 value 赋值给 storages[key]

asyncio

  • 步骤:创建消息循环(解决异步IO,有中转:相当于信箱,消息queue)-> 导入协程-> 关闭

    import threading
    import asyncio
    
    
    @asyncio.coroutine
    def hello():
        print('Hello world! (%s)' % threading.currentThread())
        print('Starting......(%s)' % threading.currentThread())
        yield from asyncio.sleep(3)
        print('Done......(%s)' % threading.currentThread())
        print('Hello again! (%s)' % threading.currentThread())
    
    
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    

async & await

  • 更简洁,不用装饰器

    import threading
    import asyncio
    
    
    async def hello():
        print('Hello world! (%s)' % threading.currentThread())
        print('Starting......(%s)' % threading.currentThread())
        await asyncio.sleep(3)
        print('Done......(%s)' % threading.currentThread())
        print('Hello again! (%s)' % threading.currentThread())
    
    
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    

aiohttp

  • 介绍:

    • 用asyncio和coroutine配合——http是io操作
  • 例:

    import asyncio
    from aiohttp import web
    
    
    async def index(request):
        await asyncio.sleep(0.5)
        return web.Response(body=b'<h1>Index</h1>')
    
    
    async def hello(request):
        await asyncio.sleep(0.5)
        text = '<h1>hello, %s!</h1>' % request.match_info['name']
        return web.Response(body=text.encode('utf-8'))
    
    
    async def init(loop):
        app = web.Application(loop=loop)
        app.router.add_route('GET', '/', index)
        app.router.add_route('GET', '/hello/{name}', hello)
        srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
        print('Server started at http://127.0.0.1:8000...')
        return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()
    

    注:查+理解

concurrent.futures

  • 类似线程池

  • 用multiprocessing实现真正并行计算——运行多个解释器

  • concurrent.furtures.Executor

    • ThreadPoolExecutor
    • ProcessPoolExecutor
  • 例子:

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def return_future(msg):
        time.sleep(3)
        return msg
    
    
    # 创建一个线程池
    pool = ThreadPoolExecutor(max_workers=2)
    
    # 往线程池加入2个task
    f1 = pool.submit(return_future, 'hello')
    f2 = pool.submit(return_future, 'world')
    
    print(f1.done())
    time.sleep(3)
    print(f2.done())
    
    print(f1.result())
    print(f2.result())
    
  • map(fn, *iterables, timeout=None):

    map和submit用一个就行

    import time
    import re
    import os
    import datetime
    from concurrent import futures
    
    data = ['1', '2']
    
    
    def wait_on(argument):
        print(argument)
        time.sleep(2)
        return "ok"
    
    
    ex = futures.ThreadPoolExecutor(max_workers=2)
    for i in ex.map(wait_on, data):
        print(i)
    
    
  • Future

    • future实例由Executor.submit创建
    from concurrent.futures import ThreadPoolExecutor as Pool
    from concurrent.futures import as_completed
    import requests
    
    URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
    
    
    def task(url, timeout=10):
        return requests.get(url, timeout=timeout)
    
    
    with Pool(max_workers=3) as executor:
        future_tasks = [executor.submit(task, url) for url in URLS]
    
        for f in future_tasks:
            if f.running():
                print('%s is running' % str(f))
    
        for f in as_completed(future_tasks):
            try:
                ret = f.done()
                if ret:
                    f_ret = f.result()
                    print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
            except Exception as e:
                f.cancel()
                print(str(e))
    
醉 生 梦 死
原文地址:https://www.cnblogs.com/TuerLueur/p/9595731.html