十二、协程

一、从生成器到协程

1.简单的生成器

Python2.3中,加入了新的关键字yield

在PEP255中,引入了yield表达式

规定yield语句只能在函数中使用。包含yield语句的函数被称为生成器函数。

当执行到yield语句是,函数的状态会被冻结(挂起所有状态,如:局部变量、指令指针、堆栈状态等),以便下载调用next时恢复状态继续执行 

2.通过生成器实现协程

协程的底层架构是在 PEP 342 中定义,在 Python2.5 实现的。

实现思想:使用yield挂起生成器,使用send 方法激活生成器。这样生成器就具备实现协程的功能。

执行 generator.send(None) 完全等同于调用生成器的 next 方法。使用其它参数调用 send 也有同样的效果,不同的是,当前生成器表达式产生的值会不一样。

参考资料:PEP 342 Coroutines via Enhanced Generators 

3.协程的演变

Python2 中生成器中 return 语句提供值,会抛出 SyntaxError 异常。

Python3 中生成器可以 return 一个值,

Python3.3 增加了 yield from 语法,使调用嵌套生成器变得简单

Python3.5 加入了关键字 async 和 await ,将生成器和协程彻底分开。

参考资料:

  • PEP 380 Syntax for Delegating to a Subgenerator
  • PEP 525 Asynchronous Generators

二、函数是如何执行的

Python 解释器是基于栈的,其中有三种栈:调用栈 (frame stack)、数据栈 (data  stack)、块栈 (block statck)。

对于Python编译器来说,PyCodeObject对象是其真正的编译结果。

在代码运行时,字节码会存储在 PyCodeObject 对象中。PyCodeObject 保存的是编译后的静态信息,在运行的时候会结合上下文形成一个完整的运行态环境。

数的 __code__ 变量其实就是函数的编译后的PyCodeObject对象 

"""
解释器:解释字节码(若干指令)
1.生成字节码
2.生成指令,解释器去执行指令
""" import dis import inspect def add(a, b): result = a + b return result def main(x, y): i = 1 print("main is running...") py_obj = inspect.currentframe() # 获得当前PyObject print(py_obj) print(py_obj.f_code is main.__code__) # True print(py_obj.f_locals) print(py_obj.f_globals) result = add(x, y) return result def f(): pass if __name__ == '__main__': print(main(10, 20)) print(dis.dis(main)) print(main.__code__.co_code) print(list(main.__code__.co_code)) print(dis.opname[100]) # LOAD_CONST 100,1 LOAD_CONST 1 print(dis.opname[125]) # STORE_FAST print(type(main.__code__)) # __code__ 是 PyCodeObject对象, 保存的是上下文信息 # 局部变量 常量 函数名等 print(main.__code__.co_varnames) # 变量 print(main.__code__.co_consts) # 常量 print(main.__code__.co_names) # 名字 f.__code__ = main.__code__ print(f(100, 200)) ''' main is running... <frame at 0x0000022EC73A2440, file 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', line 18, code main> True {'x': 10, 'y': 20, 'i': 1, 'py_obj': <frame at 0x0000022EC73A2440, file 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', line 20, code main>} {'__name__': '__main__', '__doc__': ' 解释器 解释 字节码(若干指令) ', '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x0000022EC723F8E0>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', '__cached__': None, 'dis': <module 'dis' from 'D:\Program Files (x86)\Python38\lib\dis.py'>, 'inspect': <module 'inspect' from 'D:\Program Files (x86)\Python38\lib\inspect.py'>, 'add': <function add at 0x0000022EC7387430>, 'main': <function main at 0x0000022EC7457F70>, 'f': <function f at 0x0000022EC7600550>} 30 15 0 LOAD_CONST 1 (1) 2 STORE_FAST 2 (i) 16 4 LOAD_GLOBAL 0 (print) 6 LOAD_CONST 2 ('main is running...') 8 CALL_FUNCTION 1 10 POP_TOP 17 12 LOAD_GLOBAL 1 (inspect) 14 LOAD_METHOD 2 (currentframe) 16 CALL_METHOD 0 18 STORE_FAST 3 (py_obj) 18 20 LOAD_GLOBAL 0 (print) 22 LOAD_FAST 3 (py_obj) 24 CALL_FUNCTION 1 26 POP_TOP 19 28 LOAD_GLOBAL 0 (print) 30 LOAD_FAST 3 (py_obj) 32 LOAD_ATTR 3 (f_code) 34 LOAD_GLOBAL 4 (main) 36 LOAD_ATTR 5 (__code__) 38 COMPARE_OP 8 (is) 40 CALL_FUNCTION 1 42 POP_TOP 20 44 LOAD_GLOBAL 0 (print) 46 LOAD_FAST 3 (py_obj) 48 LOAD_ATTR 6 (f_locals) 50 CALL_FUNCTION 1 52 POP_TOP 21 54 LOAD_GLOBAL 0 (print) 56 LOAD_FAST 3 (py_obj) 58 LOAD_ATTR 7 (f_globals) 60 CALL_FUNCTION 1 62 POP_TOP 22 64 LOAD_GLOBAL 8 (add) 66 LOAD_FAST 0 (x) 68 LOAD_FAST 1 (y) 70 CALL_FUNCTION 2 72 STORE_FAST 4 (result) 23 74 LOAD_FAST 4 (result) 76 RETURN_VALUE None b'dx01}x02tx00dx02x83x01x01x00tx01xa0x02xa1x00}x03tx00|x03x83x01x01x00tx00|x03jx03tx04jx05kx08x83x01x01x00tx00|x03jx06x83x01x01x00tx00|x03jx07x83x01x01x00tx08|x00|x01x83x02}x04|x04Sx00' [100, 1, 125, 2, 116, 0, 100, 2, 131, 1, 1, 0, 116, 1, 160, 2, 161, 0, 125, 3, 116, 0, 124, 3, 131, 1, 1, 0, 116, 0, 124, 3, 106, 3, 116, 4, 106, 5, 107, 8, 131, 1, 1, 0, 116, 0, 124, 3, 106, 6, 131, 1, 1, 0, 116, 0, 124, 3, 106, 7, 131, 1, 1, 0, 116, 8, 124, 0, 124, 1, 131, 2, 125, 4, 124, 4, 83, 0] LOAD_CONST STORE_FAST <class 'code'> ('x', 'y', 'i', 'py_obj', 'result') (None, 1, 'main is running...') ('print', 'inspect', 'currentframe', 'f_code', 'main', '__code__', 'f_locals', 'f_globals', 'add') main is running... <frame at 0x0000022EC758B040, file 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', line 18, code main> True {'x': 100, 'y': 200, 'i': 1, 'py_obj': <frame at 0x0000022EC758B040, file 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', line 20, code main>} {'__name__': '__main__', '__doc__': ' 解释器 解释 字节码(若干指令) ', '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x0000022EC723F8E0>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'D:/Program Files (x86)/PyCharm/workspace/test/test1.py', '__cached__': None, 'dis': <module 'dis' from 'D:\Program Files (x86)\Python38\lib\dis.py'>, 'inspect': <module 'inspect' from 'D:\Program Files (x86)\Python38\lib\inspect.py'>, 'add': <function add at 0x0000022EC7387430>, 'main': <function main at 0x0000022EC7457F70>, 'f': <function f at 0x0000022EC7600550>} 300 '''

三、生成器是如何执行的

YIELD_VALUE 指令,挂起当前函数
 

调用生成器函数时,生成器不会立即执行,而是返回一个生成器对象。生成器对象有 gi_code 和 gi_frame 两个常用的属性,gi_code 和 code 一样存放了字节码,gi_frame 中存储了运行时的数据。

使用 send 方法也可以使生成器继续执行,和 next 不同的是,send 方法可以向生成器传入值

yield 之后和之前的代码执行是分离的,函数在遇到 yield 会交出控制权,send方法会重新获取控制权。

import dis
import inspect

def f():
    i = 1
    return i

def gen():

    print("gen is running...")
    i = yield 1  # YIELD_VALUE 指令
    print('aaaaa')
    print('xxxxx')
    yield 2
    print("aaaaa")
    yield 3
    return None

def gen_test():
    """
    g = gen()  :  创建生成器对象
    g.gi_code : 存的gen()函数的字节码,保存的是上下文信息, 和 gen.__code__是一样的
    g.gi_frame: 存储的是运行时的数据
    g.gi_frame.f_lasti : 上一次执行的指令的位置
    遇到yield就挂起
    遇到next就激活
    """
    print(dis.dis(f))
    print(dis.dis(gen))
    g = gen()
    print(g.gi_code is gen.__code__)  # True
    print(g.gi_frame.f_lasti)  # 记录执行到第几行, 默认是-1 (-1代表还没有执行)
    # 查看生成器状态
    print(inspect.getgeneratorstate(g))  # GEN_CREATED  创建完毕
    next(g)
    next(g)
    print(g.gi_frame.f_lasti)
    next(g)
    print(g.gi_frame.f_lasti)
    print(inspect.getgeneratorstate(g))  # GEN_SUSPENDED  挂起
    for i in g:
        pass

    print(inspect.getgeneratorstate(g))  # GEN_CLOSED  生成器结束


if __name__ == '__main__':
    gen_test()

'''
  6           0 LOAD_CONST               1 (1)
              2 STORE_FAST               0 (i)

  7           4 LOAD_FAST                0 (i)
              6 RETURN_VALUE
None
 12           0 LOAD_GLOBAL              0 (print)
              2 LOAD_CONST               1 ('gen is running...')
              4 CALL_FUNCTION            1
              6 POP_TOP

 13           8 LOAD_CONST               2 (1)
             10 YIELD_VALUE
             12 STORE_FAST               0 (i)

 14          14 LOAD_GLOBAL              0 (print)
             16 LOAD_CONST               3 ('aaaaa')
             18 CALL_FUNCTION            1
             20 POP_TOP

 15          22 LOAD_GLOBAL              0 (print)
             24 LOAD_CONST               4 ('xxxxx')
             26 CALL_FUNCTION            1
             28 POP_TOP

 16          30 LOAD_CONST               5 (2)
             32 YIELD_VALUE
             34 POP_TOP

 17          36 LOAD_GLOBAL              0 (print)
             38 LOAD_CONST               3 ('aaaaa')
             40 CALL_FUNCTION            1
             42 POP_TOP

 18          44 LOAD_CONST               6 (3)
             46 YIELD_VALUE
             48 POP_TOP

 19          50 LOAD_CONST               0 (None)
             52 RETURN_VALUE
None
True
-1
GEN_CREATED
gen is running...
aaaaa
xxxxx
32
aaaaa
46
GEN_SUSPENDED
GEN_CLOSED
'''

四、使用生成器实现多任务 

"""
虽然实现了并发,但是不能检测IO阻塞
"""

import time


def calc_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f'执行时间是:{end_time - start_time}')
        return result

    return wrapper


def work1():
    for i in range(5):
        print(f'听音乐....{i}')
        time.sleep(1)
        yield


def work2():
    for i in range(5):
        time.sleep(1)
        print(f'打游戏....{i}')
        yield

@calc_time
def main():
    g1 = work1()
    g2 = work2()

    while True:
        try:
            next(g1)
            next(g2)
        except StopIteration:
            break

if __name__=='__main__':
    main()

'''
听音乐....0
打游戏....0
听音乐....1
打游戏....1
听音乐....2
打游戏....2
听音乐....3
打游戏....3
听音乐....4
打游戏....4
执行时间是:10.009641408920288
'''

什么时候报异常:

五、gevent实现异步

 gevent 是应用非常广泛的异步网络库,底层使用 greenlet 协程

"""
gevent 默认遇到gevent.sleep会自动切换
遇到普通的io阻塞,不会切换任务,如果要实现自动切换,需要加  monkey.patch_all()
"""


import time

from gevent import monkey

monkey.patch_all()
import gevent


def calc_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f'执行时间是:{end_time - start_time}')
        return result

    return wrapper


def work1():
    for i in range(5):
        print(f'听音乐....{i}')
        # time.sleep(1)  # 不会切换任务,执行work1完毕后在执行work2
        gevent.sleep(60*5)


def work2():
    for i in range(5):
        time.sleep(1)  # gevent能自动检测io阻塞,遇到阻塞自动切换任务
        print(f'打游戏....{i}')


@calc_time
def main():
    g1 = gevent.spawn(work1)  # 协程1
    g2 = gevent.spawn(work2)  # 协程2
    g1.join()
    g2.join()


if __name__ == '__main__':
    main()

'''
听音乐....0
打游戏....0
打游戏....1
打游戏....2
打游戏....3
打游戏....4
听音乐....1
听音乐....2
听音乐....3
听音乐....4
执行时间是:1500.1551814079285
'''

 六、asycio实现协程 

"""
async 函数: 标识函数是一个异步任务
await 耗时的操作 : 挂起耗时的操作,自动切换到别的任务上"""


import time
import asyncio

def calc_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f'执行时间是:{end_time - start_time}')
        return result

    return wrapper

async def work1():
    for i in range(5):
        await asyncio.sleep(2)
        print(f'听音乐....{i}')

async def work2():
    for i in range(5):
        await asyncio.sleep(1)
        print(f'打游戏....{i}')


async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    await task1
    await task2


if __name__ == '__main__':
    asyncio.run(main())

'''
打游戏....0
听音乐....0
打游戏....1
打游戏....2
听音乐....1
打游戏....3
打游戏....4
听音乐....2
听音乐....3
听音乐....4

'''

七、协程的核心思想

为了实现异步io
若干个协程任务,当某个任务遇到阻塞时,自动切换到非阻塞的任务上。
阻塞: io阻塞。 如: input("请输入一个数:") , 磁盘io(写数据是需要时间的),网络io(网络请求数据是需要时间的)
time.sleep(2)
用户态 : 切换不需要CPU调度
核心态: 线程切换,进程切换,核心态 , 切换需要CPU调度
 
 

原文地址:https://www.cnblogs.com/zhangjx2457/p/14151439.html