Python协程学习基本!!!

Python协程应该是我最后重点攻克难点,最近写一个twitter的爬虫,希望也能用上相关知道:

具体参考的链接:

非常适合小白的 Asyncio 教程:

https://mp.weixin.qq.com/s/BN4l_ek87_bKNe0SYSRFBg

Python中协程异步IO(asyncio)详解:

https://zhuanlan.zhihu.com/p/59621713

Python黑魔法 --- 异步IO( asyncio) 协程:

https://www.jianshu.com/p/b5e347b3a17c

此链接指定了asyncio.wait, asyncio.gather,asyncio.as_completed的一系列用法

https://pymotw.com/3/asyncio/control.html

本笔记仅供个人参考,有需要可以去看原文。

协程的定义,需要使用 async def 语句。

In [1]: import asyncio                                                                                                                                         

In [2]: async def some_work(x):...                                                                                                                             

In [3]: print(asyncio.iscoroutinefunction(some_work))                                                                                                          
True

  通过asyncio.iscoroutinefunction的方法可以来检查一个函数是否为协程函数, 协程函数前面要加上async

写一个标准的协程函数

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

  asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。可参见 asyncio.sleep 的文档:

sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds).

  

2 运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象,可以通过 asyncio.iscoroutine 来验证:

In [2]: async def some_work(x):...                                                                                                                             

In [3]: print(asyncio.iscoroutinefunction(some_work))                                                                                                          
True

In [4]: s = some_work(3)                                                                                                                                       

In [5]: asyncio.iscoroutine(s)                                                                                                                                 
Out[5]: True

  要执行协程对象,需要拿到当前loop,然后运行run_until_complete方法

In [7]: loop.run_until_complete?                                                                                                                               
Signature: loop.run_until_complete(future)
Docstring:
Run until the Future is done.

If the argument is a coroutine, it is wrapped in a Task.

WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.

Return the Future's result, or raise its exception.
File:      ~/opt/anaconda3/lib/python3.8/asyncio/base_events.py
Type:      method

  这个方法只接受参数是一个 future,但是我们这里传给它的却是协程对象,是因为它在内部做了检查,通过 ensure_future 函数把协程对象包装(wrap)成了 future。

In [10]: async def do_some_work(x): 
    ...:     print('Waiting', x) 
    ...:     await asyncio.sleep(x) 
    ...:                                                                                                                                                       

In [12]: loop = asyncio.get_event_loop()                                                                                                                       

In [13]: loop.run_until_complete(do_some_work(3))                                                                                                              
Waiting 3

In [14]: loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))                                                                                       
Waiting 3

  

插入一个疑问,为什么明明有了协程(coroutine),也能跑起来,但要有future/task这个玩意。

这里有具体的回答:

https://stackoverflow.com/questions/34753401/difference-between-coroutine-and-future-task-in-python-3-5

Direct Answer: You don't need ensure_future if you don't need the results. They are good if you need the results or retrieve exceptions occurred.

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

  书中的代码,当你包装成一个task以后,就可以添加回调函数,回调函数的第一个参数默认就是task本身,函数内部能够调用到loop对象,真实神奇。

stackoverflow真是非常不错的一个网站,第二个回答里面又介绍了更有意思的。

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

  运行结果

1539486251.7055213 - await
1539486251.7055705 - create_task
async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

 

如果在中间添加一个await asyncio.sleep,那task2会先运行

1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

因为中间的await asyncip.sleep()会让出控制权,task2已经在loop上面注册了,所以task2先运行了。

Under the hood

loop.create_task actually calls asyncio.tasks.Task(), which will call loop.call_soon. And loop.call_soon will put the task in loop._ready. During each iteration of the loop, it checks for every callbacks in loop._ready and runs it.

asyncio.waitasyncio.ensure_future and asyncio.gather actually call loop.create_task directly or indirectly.

Also note in the docs:

Callbacks are called in the order in which they are registered. Each callback will be called exactly once.

上面的说明也介绍了,很多方法都是创建了task并注册到loop上面。

继续往下写

回调函数,前面的stackoverflow以前有了回调函数的代码,这里面我就不写了。

回调函数可以取到loop以及task本身,loop函数里面就可以读到,task需要默认的第一个参数。

多个协程

asyncio.gather可以在里面添加多个future或者coroutines

Signature: asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Docstring:
Return a future aggregating results from the given coroutines/futures.

Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as
passed in.

All futures must share the same event loop.  If all the tasks are
done successfully, the returned future's result is the list of
results (in the order of the original sequence, not necessarily
the order of results arrival).  If *return_exceptions* is True,
exceptions in the tasks are treated the same as successful
results, and gathered in the result list; otherwise, the first
raised exception will be immediately propagated to the returned
future.

Cancellation: if the outer Future is cancelled, all children (that
have not completed yet) are also cancelled.  If any child is
cancelled, this is treated as if it raised CancelledError --
the outer Future is *not* cancelled in this case.  (This is to
prevent the cancellation of one child to cause other children to
be cancelled.)

  接受的是不定长的参数。

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

5. run_until_complete和run_forever

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
loop.run_until_complete(coro)

  run_until_complete当loop里面的task跑完了以后,就停止了。

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
asyncio.ensure_future(coro)

loop.run_forever()   # 一直阻塞在这里

  

这个会一直阻塞。

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

  可以在协程函数里面传入loop,停止loop的执行,其实我发现不传参也可以使用。

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

# 回调函数必须传入future参数。就好比实例方法的第一个参数是self一样。
def done_callback(future):
    loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(1), do_some_work(3))
futus.add_done_callback(done_callback)

loop.run_forever()

  这里我差点又卡住了,在回调函数里面,future参数是必须要填写的,也就是第一个参数。loop可以通过functools.partial或者lambda的方式传参进去,但回调函数也可以直接读取到运行的loop。

自定义了一个定时任务

import asyncio

async def timer(x, cb):
    futu = asyncio.ensure_future(asyncio.sleep(x))
    futu.add_done_callback(cb)
    await futu

t = timer(3, lambda futu: print('Done'))

loop = asyncio.get_event_loop()
loop.run_until_complete(t)

  通过将asyncio.sleep()协程函数包装成future然后添加回调函数的方式,定时启动脚本。

以上是我https://mp.weixin.qq.com/s/BN4l_ek87_bKNe0SYSRFBg这个链接的笔记,确实讲的比较少,也比较基础。

还是stackoverflow让我学到了不少。

这里有asyncio.await与asynvio.gather的区别介绍

https://stackoverflow.com/questions/42231161/asyncio-gather-vs-asyncio-wait

相对来说,await入参为tasks的列表,但可以在方法内部设置具体的参数,返回两对不同的列表,分别包含完成任务的tasks与没有完成任务的tasks

以下是来至知乎链接的笔记

https://zhuanlan.zhihu.com/p/59621713,一些重复,或者我觉的已经理解或者不重要的将跳过。

前面讲了一些Python协程相关基本知识,跳过了。这篇知道,我感觉有些地方可能不全,我就补一些我自己的理解。

获取协程返回值

这里,原作者介绍了,可以通过task.resule()或者回调函数或者返回值。

还有一种就是通过

res = loop.run_until_complete(task)

  loop运行后的返回值也可以得到结果。

 后面作者介绍了asyncio.wait的一些简单使用,而且写法也特别不舒服。

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

 这是要所有协程返回值的写法,用了asyncio.wait的写法,但中间创建了一堆task,最后又从task取返回值实在太累了。

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
# 汇聚
wait_coro = asyncio.gather(*tasks)
# loop注册跑起来
res = loop.run_until_complete(wait_coro)

for r in res:
    print(r)

loop.close()

  如果就简单的收集结果,前面我刚学的asyncio.gather更加合适。

作者的回调函数更加麻烦了。

后面作者一下跳跃到了动态添加协程,开辟了一个新的线程添加协程,跳跃的速度感觉向去哪里复制了一篇文章过来。

主要介绍了,通过新开一条线程加入一个loop调用一些回调函数,然后在loop里面动态的添加一些future,感觉没啥用,代码我也不抄写了。

逻辑中还是有很多问题的。

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    return '返回结果:' + name



new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()

future = asyncio.run_coroutine_threadsafe(thread_example('Zarten1'), new_loop)
print(future.result())

asyncio.run_coroutine_threadsafe(thread_example('Zarten2'), new_loop)

print('主线程不会阻塞')

asyncio.run_coroutine_threadsafe(thread_example('Zarten3'), new_loop)

print('继续运行中...')

  这里作者说

从上面2个例子中,当主线程运行完成后,由于子线程还没有退出,故主线程还没退出,等待子线程退出中。若要主线程退出时子线程也退出,可以设置子线程为守护线程 t.setDaemon(True)

我不知道作者是笔误还是不了解守护线程,当主线程退出,子线程全部关闭,里面的loop都关了,你所有子线程的loop还执行个啥呢?

import asyncio
from threading import Thread
from collections import deque
import random
import time

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def consumer():
    while True:
        if dq:
            msg = dq.pop()
            # 函数内部动态,向指定事件循环提交一个协程。线程安全。
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop)


async def thread_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(2)
    return '返回结果:' + name



dq = deque()
# 注册一个新的loop
new_loop = asyncio.new_event_loop()
# 开启一个线程,让loop跑起来一直跑
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

# 再开启一个线程,去启动consumer函数
consumer_thread = Thread(target= consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()

while True:
    i = random.randint(1, 10)
    dq.appendleft(str(i))
    time.sleep(2)

   这是作者的代码,我不知道守护线程开着还有何用,主线程写了死循环。但确实通过

asyncio.run_coroutine_threadsafe

的方法向loop动态添加协程还是不错的一个方案。

最后作者通过asyncio.Semaphore(3)的对象,限制并发数量,通过查看官方网站的相关资料,走的也是锁的机制。

import asyncio
import aiohttp


async def get_http(url):
    # 通过这里限制并发数量
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as res:
                global count
                count += 1
                print(count, res.status)

if __name__ == '__main__':
    count = 0
    semaphore = asyncio.Semaphore(5)
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

  

下面是我参考https://www.jianshu.com/p/b5e347b3a17c此链接的学习笔记,由于这是第三篇,我就记录一些我觉的比较重要的知识点。

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。
import asyncio
import time

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# 两种创建task的方法
task = asyncio.ensure_future(coroutine)
# task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)

  

绑定回调

绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入。[回调函数自动可以使用loop]

这个要记住,回调的最后一个参数是future对象。

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', now() - start)

  

用偏函数

def callback(t, future):
    print('Callback:', t, future.result())

task.add_done_callback(functools.partial(callback, 2))

  

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

通过await可以激活上一个协程,并获得上一个协程的输出。

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    # 协程的嵌套,通过await的关键字来实现
    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
        print('Task ret: ', task.result())

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)

  

此链接指定了asyncio.wait, asyncio.gather,asyncio.as_completed的一系列用法

https://pymotw.com/3/asyncio/control.html

作者用了asyncio.as_completed的方法对多任务模式下进行任务取值

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)



async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    # 当有先完成的先处理
    for task in asyncio.as_completed(tasks):
        result = task.result()
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

  

文中后面博主,还是跟上一篇的博主差不多,采取新开一条线程,新开线程注册loop,由于消费主线程添加的任务。

测试的协程方法,用的aiohttp,感觉总体还是差不多类型。

这是最后一篇我参考国内的学习链接:

https://www.dongwm.com/archives

这是第一章节

看了第一章节,讲的比较基础,学到了测试运行速度的time.pref_counter。

还有就是通过

future = loop.run_in_executor(None, c) 

  将一个普通的函数c包装成一个future,可以注册在loop中。

第二章节

协程比我想象中的要累很多,

我后期准备直接用loop.run_in_executor的方法把普通函数包装成一个协程跑一次脚本看看。

或者用aiohttp的模块配合跑一次脚本看看。

现在来看,已经看的我越来越晕了

原文地址:https://www.cnblogs.com/sidianok/p/14213346.html