携程的那点事

携程的那点事

当前python使用的携程模块

  1. greenlet和基于greenlet开发的gevent模块(手动切换阻塞)
  2. yield关键字,只是能模拟出携程的运行过程
  3. asyncio (自动切换阻塞) python3.4版本后加入
  4. async & await关键字 python3.5版本后加入

3和4的区别在于asyncio是用语法糖模式,而async是直接在函数前加async,可以看下他们的语法上的差别并不大

asyncio模块的方法解释

  • asyncio的原理就是通过把N个任务放到一个死循环中,那么放入前我们需要先获得一个循环器的对象。

然后在循环器中,N个任务组成的任务列表,任务列表返回可执行任务和已经完成任务,

可执行任务丢到执行列表,准备执行,已完成任务从已完成任务列表中删除。

最后任务列表为空的时候,那么循环结束。

# 先获取一个事件循环器的对象
loop=asyncio.get_event_loop()
# 将任务放到任务列表中
loop.run_until_complete(asyncio.wait(task))
  • async def 函数名 叫协程函数,而协程函数的返回值叫协程对象.

    执行协程对象并不会运行协程函数内部代码,必须要交给事件循环器来执行

async def func():  #协程函数
	print("start")
ret = func() #到这一步并不会立即执行协程对象

# 必须交给循环器来执行
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(ret))

# python3.7对循环器又进行了封装,只需要调用run方法即可
asyncio.run(ret)
  • await + 可等待对象(协程对象,Future,Task对象) <io等待>, 一个协程函数里可以有多个await

    协程函数体中,遇到await,函数是等待的,不会切换运行内部的其他函数.这种运行方式和普通的函数从上而下执行顺序没有区别

import asyncio

async def func(aa):
    print("%s>>start"%aa)
    await asyncio.sleep(2)
    print("%s>>end"%aa)
    return "func结束了,返回值是%s"%aa


async def main():
    print("main执行")
    ret1 = await func("ret1")
    print("ret1返回值是%s"%ret1)

    ret2 = await func("ret2")
    print("ret2返回值是%s" % ret2)

obj=asyncio.get_event_loop()
obj.run_until_complete(main())

"""
main执行
ret1>>start
ret1>>end
ret1返回值是func结束了,返回值是ret1
ret2>>start
ret2>>end
ret2返回值是func结束了,返回值是ret2
"""
  • Task对象 在事件循环器中添加多个任务的,因为是协程模式调度执行,和并发感觉比较像

    Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环器中被调度执行,除了通过asyncio.create_task(协程对象)的方式创建Task对象,还可以用低层级的loop.create_task()或者ensure_future()函数

    asyncio.create_task(协程对象) 在Python 3.7才有,之前版本推荐使用ensure_future()函数

    import asyncio
    
    async def func(aa):
        print("%s>>start"%aa)
        await asyncio.sleep(2)
        print("%s>>end"%aa)
        return "func结束了,返回值是%s"%aa
    
    async def main():
        print("main执行")
        task1 = obj.create_task(func("func1"))
        print("1")
        task2 = obj.create_task(func("func2"))
        print("2")
        task3 = asyncio.ensure_future(func("func3"))
        print("3")
        # python 3.7以上版本适用
        # task4 = asyncio.create_task(func("fun4"))
        ret1 = await task1
        ret2 = await task2
        ret3 = await task3
        print(ret1)
        print(ret2)
        print(ret3)
    obj=asyncio.get_event_loop()
    obj.run_until_complete(main())
    # python 3.7以上版本适用
    # asyncio.run(main())
    
    """
    main执行
    func1>>start
    func2>>start
    func3>>start
    func1>>end
    func2>>end
    func3>>end
    func结束了,返回值是func1
    func结束了,返回值是func2
    func结束了,返回值是func3
    """
    从输出打印的内容看task会把函数添加任务后执行,添加后会继续往下执行,await是阻塞等待进程返回值
    
    

    上述例子只是了解使用语法.一般我们会这么去遍历使用

    下面看下红色框框就是优化后的代码.done和pending是await asyncio.wait(takslist)的返回值,运行结束的会放到done变量,而未结束的会放到pending 中去,done和pending都是一个集合对象.

    await asyncio.wait(takslist,timeout =2),他们还有个timeout参数,可以设置等待时间,如果等待时间到强行结束,默认设置为None

下面例三,看下去掉上面的main协程函数怎么运行,asyncio.wait里加的

import asyncio

async def func(aa):
    print("%s>>start"%aa)
    await asyncio.sleep(2)
    print("%s>>end"%aa)
    return "func结束了,返回值是%s"%aa


takslist = [
        func("func1"),
        func("func2"),
        func("func3"),
    ]

# 用下面这种就不行,因为这么写会把task立即加到循环器中,而此时obj还未产生循环器的实例对象
#tasklist=[
#     obj.create_task(func("func1")),
#     obj.create_task(func("func2")),
# ]

obj=asyncio.get_event_loop()
done,pending = obj.run_until_complete(asyncio.wait(takslist))
print(done)


但是把tasklist放到obj下面就可以运行了,但是这也破坏了代码的结构和调用方式
#obj=asyncio.get_event_loop()

#takslist=[
#    obj.create_task(func("func1")),
#    obj.create_task(func("func2")),
#]
#done,pending = obj.run_until_complete(asyncio.wait(takslist))
  • Future对象 功能也是用来等待异步处理的结果的

    task继承了Future,Task对象内部await的结果的处理是基于Future对象来的.

    但是Future偏向底层,而且我们一般也不会手动去创建,都是通过调用task对象进行处理的

    这里主要提下另一个多(进)线程模块下也有个future对象

    asyncio.Future和concurrent.futures.Future

    concurrent.futures.Future 是使用进程池和线程池使用到的模块,作用是进程池和线程池异步操作时使用的对象

    一般这2个模块不会有交集,但是如果调用的第三方产品 假设Mysql不支持异步访问调用,那么用这个Future对象把他们的处理方式模拟成异步形态进行处理

    下面是如何用协程函数调用普通函数,2个要点,

    1. 普通函数外面要包一层协程函数,
    2. 用循环器.run_in_executor()来跳开阻塞,用多线程运行,第一个参数是用来放进程池(线程池的)
    import asyncio
    
    def faa(idx):
        print("第%s个foo开始运行" % idx)
        time.sleep(2)
        return idx
    
    async def faa_faster(idx):
        obj = asyncio.get_event_loop()
        #在创建迭代器时候就用run_in_executor调用多(进)线程
        ret = obj.run_in_executor(None,faa,idx)
        # print("第%s个foo开始运行" % idx)
        a = await ret
        print(a)
    
    
    task = [faa_faster(i) for i in range(1000)]
    obj = asyncio.get_event_loop()
    obj.run_until_complete(asyncio.wait(task))
    

    示例2.多线程执行普通函数,并以协程模式运行

    import asyncio
    from concurrent.futures.thread import ThreadPoolExecutor
    
    def faa(idx):
        print("第%s个foo开始运行" % idx)
        time.sleep(2)
        return idx
    
    
    async def faa_faster(pool,idx):
        obj = asyncio.get_event_loop()
        #第一个参数默认是None,如果传(进)线程池进去,就以多(进)线程形式运行普通函数
        ret = obj.run_in_executor(pool,faa,idx)
        # print("第%s个foo开始运行" % idx)
        a = await ret
        print(a)
    
    pool = ThreadPoolExecutor(max_workers=5)
    task = [faa_faster(pool,i) for i in range(1000)]
    obj = asyncio.get_event_loop()
    obj.run_until_complete(asyncio.wait(task))
    
原文地址:https://www.cnblogs.com/Young-shi/p/15442990.html