python开发者的AsyncIO

Threads, loops, coroutines and futures

线程是一种常用工具,大多数开发人员以前都听过并使用过。然而,asyncio使用完全不同的结构:事件循环协程和future

  • 一个事件循环基本上是管理和分配的不同任务的执行。它注册它们并处理它们之间的控制流分配。
  • 协同程序是与Python生成器类似的特殊函数,等待它们将控制流释放回事件循环。A协程需要调度到事件循环运行,once scheduled coroutines are wrapped in Taskwhich is a type of Future
  • future是表示可能或可能未执行的任务结果的对象。这个结果可能是个exception。

很简单吧?让我们继续!

 

同步和异步执行

Concurrency中并不是并行将任务分解为并发子任务只允许并行,这是创建它的这些子任务的调度。

Asyncio正是如此,您可以构建代码,以便将子任务定义为协同程序,并允许您根据需要安排它们,包括同时。协同程序包含屈服点,我们定义可能的点,如果其他任务处于挂起状态,则可能发生上下文切换,但如果没有其他任务挂起则不会。

asyncio中的上下文切换表示事件循环,产生从一个协程到下一个协同程序的控制流。我们来看一个非常基本的例子:

 1 import asyncio
 2 
 3 
 4 async def foo():
 5     print('Running in foo')
 6     await asyncio.sleep(0)
 7     print('Explicit context switch to foo again')
 8 
 9 
10 async def bar():
11     print('Explicit context to bar')
12     await asyncio.sleep(0)
13     print('Implicit context switch back to bar')
14 
15 
16 async def main():
17     tasks = [foo(), bar()]
18     await asyncio.gather(*tasks)
19 
20 
21 asyncio.get_event_loop().run_until_complete(main())


Explicit context to bar Running in foo Implicit context switch back to bar Explicit context switch to foo again
  • 首先,我们声明了几个简单的协同程序,这些协同程序假装使用asyncio中sleep函数进行非阻塞工作
  • 然后我们创建一个入口点协同程序,我们使用gather 合并先前的协程,以等待它们完成gather的内容比这更多,但我们暂时忽略它。
  • 最后我们使用asyncio.run_until_complete安排我们的入口点协程,它将负责创建一个事件循环并安排我们的入口点协同程序。 

通过在另一个协程上使用await,我们声明协程可以将控制权交给事件循环,在本例中为sleep协程将yield,事件循环将上下文切换到计划执行的下一个任务:bar类似地,bar协程使用await sleep,它允许事件循环在之前产生的点处将控制权传递给foo,就像普通的Python生成器一样。 

现在让我们模拟两个阻塞任务,gr1gr2,说它们是对外部服务的两个请求。当执行第三个任务时,可以异步执行任务,如下例所示:

 1 import time
 2 import asyncio
 3 
 4 start = time.time()
 5 
 6 
 7 def tic():
 8     return 'at %1.1f seconds' % (time.time() - start)
 9 
10 
11 async def gr1():
12     # Busy waits for a second, but we don't want to stick around...
13     print('gr1 started work: {}'.format(tic()))
14     await asyncio.sleep(2)
15     print('gr1 ended work: {}'.format(tic()))
16 
17 
18 async def gr2():
19     # Busy waits for a second, but we don't want to stick around...
20     print('gr2 started work: {}'.format(tic()))
21     await asyncio.sleep(2)
22     print('gr2 Ended work: {}'.format(tic()))
23 
24 
25 async def gr3():
26     print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
27     await asyncio.sleep(1)
28     print("Done!")
29 
30 
31 async def main():
32     tasks = [gr1(), gr2(), gr3()]
33     await asyncio.gather(*tasks)
34 
35 
36 asyncio.get_event_loop().run_until_complete(main())
37 
38 gr2 started work: at 0.0 seconds
39 Let's do some stuff while the coroutines are blocked, at 0.0 seconds
40 gr1 started work: at 0.0 seconds
41 Done!
42 gr2 Ended work: at 2.0 seconds
43 gr1 ended work: at 2.0 seconds
44 
45 Process finished with exit code 0

注意事件循环如何管理和调度执行,允许我们的单线程代码同时运行。当两个阻塞任务被阻止时,第三个阻塞任务可以控制流程

执行顺序

在同步世界中,我们习惯于线性思考。如果我们要完成一系列占用不同时间的任务,它们将按照它们被调用的顺序执行。

但是,在使用并发时,我们需要知道任务的完成顺序与计划顺序不同。

 1 import random
 2 from time import sleep
 3 import asyncio
 4 
 5 
 6 def task(pid):
 7     """Synchronous non-deterministic task."""
 8     sleep(random.randint(0, 2) * 0.001)
 9     print('Task %s done' % pid)
10 
11 
12 async def task_coro(pid):
13     """Coroutine non-deterministic task"""
14     await asyncio.sleep(random.randint(0, 2) * 0.001)
15     print('Task %s done' % pid)
16 
17 
18 def synchronous():
19     for i in range(1, 10):
20         task(i)
21 
22 
23 async def asynchronous():
24     tasks = [task_coro(i) for i in range(1, 10)]
25     await asyncio.gather(*tasks)
26 
27 
28 print('Synchronous:')
29 synchronous()
30 
31 print('Asynchronous:')
32 
33 asyncio.get_event_loop().run_until_complete(asynchronous())
34 
35 
36 Synchronous:
37 Task 1 done
38 Task 2 done
39 Task 3 done
40 Task 4 done
41 Task 5 done
42 Task 6 done
43 Task 7 done
44 Task 8 done
45 Task 9 done
46 Asynchronous:
47 Task 5 done
48 Task 2 done
49 Task 3 done
50 Task 4 done
51 Task 1 done
52 Task 8 done
53 Task 9 done
54 Task 6 done
55 Task 7 done

当然,您的输出会有所不同,因为每个任务都会随机休眠一段时间,但请注意结果顺序是如何完全不同的,即使我们使用范围以相同的顺序构建任务数组重要的是要理解asyncio不会神奇地使事情变得非阻塞。在编写asyncio时,标准库中独立存在,其余模块仅提供阻塞功能。您可以使用concurrent.futures模块将阻塞任务包装在线程或进程中,并返回Asyncio可以使用的Future 

 

这可能是使用asyncio时的主要缺点,但是有很多库用于不同的任务和服务

当然,一个非常常见的阻塞任务是从HTTP服务获取数据。我正在使用优秀的aiohttp库来处理从Github的公共事件API中检索数据的非阻塞HTTP请求,并简单地采用Date响应头。

请不要关注aiohttp_get下面协程的细节它们使用异步上下文管理器语法,这超出了本文的范围,但是使用aiohttp执行异步HTTP请求是必要的方式只是假装是一个外部协程,并专注于它如何在下面使用。

 1 import time
 2 import urllib.request
 3 import asyncio
 4 import aiohttp
 5 
 6 URL = 'https://api.github.com/events'
 7 MAX_CLIENTS = 3
 8 
 9 
10 def fetch_sync(pid):
11     print('Fetch sync process {} started'.format(pid))
12     start = time.time()
13     response = urllib.request.urlopen(URL)
14     datetime = response.getheader('Date')
15 
16     print('Process {}: {}, took: {:.2f} seconds'.format(
17         pid, datetime, time.time() - start))
18 
19     return datetime
20 
21 
22 async def aiohttp_get(url):
23     """Nothing to see here, carry on ..."""
24     async with aiohttp.ClientSession() as session:
25         async with session.get(url) as response:
26             return response
27 
28 
29 async def fetch_async(pid):
30     print('Fetch async process {} started'.format(pid))
31     start = time.time()
32     response = await aiohttp_get(URL)
33     datetime = response.headers.get('Date')
34 
35     print('Process {}: {}, took: {:.2f} seconds'.format(
36         pid, datetime, time.time() - start))
37 
38     response.close()
39     return datetime
40 
41 
42 def synchronous():
43     start = time.time()
44     for i in range(1, MAX_CLIENTS + 1):
45         fetch_sync(i)
46     print("Process took: {:.2f} seconds".format(time.time() - start))
47 
48 
49 async def asynchronous():
50     start = time.time()
51     tasks = [asyncio.ensure_future(
52         fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
53     await asyncio.wait(tasks)
54     print("Process took: {:.2f} seconds".format(time.time() - start))
55 
56 
57 print('Synchronous:')
58 synchronous()
59 
60 print('Asynchronous:')
61 
62 asyncio.get_event_loop().run_until_complete(asynchronous())
63 
64 Synchronous:
65 Fetch sync process 1 started
66 Process 1: Fri, 07 Dec 2018 07:02:06 GMT, took: 1.50 seconds
67 Fetch sync process 2 started
68 Process 2: Fri, 07 Dec 2018 07:02:07 GMT, took: 1.09 seconds
69 Fetch sync process 3 started
70 Process 3: Fri, 07 Dec 2018 07:02:08 GMT, took: 1.18 seconds
71 Process took: 3.76 seconds
72 Asynchronous:
73 Fetch async process 1 started
74 Fetch async process 2 started
75 Fetch async process 3 started
76 Process 3: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.22 seconds
77 Process 1: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.22 seconds
78 Process 2: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.28 seconds
79 Process took: 1.28 seconds
80 
81 Process finished with exit code 0

首先,注意时间差异,通过使用异步调用我们同时对服务的所有请求进行。如上所述,每个请求都会产生到下一个控制流的控制流,并在完成时返回。结果是请求和检索所有请求的结果只需要最慢的请求!查看最慢请求的时间如何记录1.28秒,这是处理所有请求所用的总时间。很酷,对吧?

其次,看看代码与同步版本的相似程度!它基本上是一样的!主要区别在于执行GET请求和创建任务并等待它们完成的库实现。

创建并发

到目前为止,我们一直在使用一种方法来创建和检索协同程序的结果,创建一组任务并等待所有任务完成。

但协同程序可以安排以不同的方式运行或检索其结果。想象一下,我们需要在它们到达时立即处理HTTP GET请求的结果,这个过程实际上与前面的例子非常相似

注意填充和每个结果调用的时间,它们是同时执行调用的,结果是无序的,我们会尽快处理它们。

这种情况下的代码只是略有不同,我们将协同程序收集到一个列表中,每个都准备好进行调度和执行。as_completed 函数返回一个迭代,因为他们进来,将产生一个完整的future。现在不要告诉我,这不是很酷。顺便说一句,as_completed最初来自concurrent.futures 模块。

让我们再举一个例子,假设您正在尝试获取您的IP地址。您可以使用类似的服务来检索它,但您不确定它们是否可以在运行时访问。你不想按顺序检查每一个。你会向每个服务发送并发请求并选择第一个响应的服务,对吧?对!

好吧,还有一种方法可以在asyncio中调度任务wait碰巧有一个参数可以做到这一点:return_when但现在我们想从协程中检索结果,因此我们可以使用两组futures, done and pending.

在下一个例子中,我们将使用pre Python 3.7在asyncio中启动的方式来说明一点,请耐心等待:

 1 from collections import namedtuple
 2 import time
 3 import asyncio
 4 from concurrent.futures import FIRST_COMPLETED
 5 import aiohttp
 6 
 7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
 8 
 9 SERVICES = (
10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
11     Service('ip-api', 'http://ip-api.com/json', 'query')
12 )
13 
14 
15 async def aiohttp_get_json(url):
16     async with aiohttp.ClientSession() as session:
17         async with session.get(url) as response:
18             return await response.json()
19 
20 
21 async def fetch_ip(service):
22     start = time.time()
23     print('Fetching IP from {}'.format(service.name))
24 
25     json_response = await aiohttp_get_json(service.url)
26     ip = json_response[service.ip_attr]
27 
28     return '{} finished with result: {}, took: {:.2f} seconds'.format(
29         service.name, ip, time.time() - start)
30 
31 
32 async def main():
33     futures = [fetch_ip(service) for service in SERVICES]
34     done, pending = await asyncio.wait(
35         futures, return_when=FIRST_COMPLETED)
36 
37     print(done.pop().result())
38 
39 
40 ioloop = asyncio.get_event_loop()
41 ioloop.run_until_complete(main())
42 ioloop.close()
43 
44 Fetching IP from ipify
45 Fetching IP from ip-api
46 ip-api finished with result: 58.135.78.16, took: 0.44 seconds
47 Task was destroyed but it is pending!
48 task: <Task pending coro=<fetch_ip() running at /usr/local/envs/async/201812/04.py:29> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(6)(), <TaskWakeupMethWrapper object at 0x7fbb9cfe8c48>()]>>
49 
50 Process finished with exit code 0

等等,那里发生了什么?第一个服务响应得很好,但所有这些警告是什么?

好吧,我们安排了两个任务,但是一旦第一个任务完成关闭循环,第二个任务挂起。Asyncio认为这是一个错误并打印出警告。我们真的应该自己清理一下,让事件循环知道不要打扰待定的futures

Future states

(正如在Future可以进入的状态,而不是Future的状态......你知道我的意思)

这些是:

  • Pending
  • Running
  • Done
  • Cancelled

就如此容易。当一个未来完成时,它的结果方法将返回未来的结果,如果它正在挂起或正在运行它会引发InvalidStateError,如果它被取消它将引发CancelledError,最后如果协同程序引发异常它将再次引发,这是与调用异常相同的行为但是不要相信我的话

如果Future处于该状态,您还可以在Future上调用done cancelledrunnin g来获取布尔值,请注意,done表示结果将返回或引发异常。您可以通过调用cancel方法(说来也奇怪)来具体取消Future ,这正是asyncio.runPython 3.7中引人注目的内容,因此您不必担心它

 1 from collections import namedtuple
 2 import time
 3 import asyncio
 4 import aiohttp
 5 
 6 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
 7 
 8 SERVICES = (
 9     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
10     Service('ip-api', 'http://ip-api.com/json', 'query'),
11     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
12 )
13 
14 async def aiohttp_get_json(url):
15     async with aiohttp.ClientSession() as session:
16         async with session.get(url) as response:
17             return await response.json()
18 
19 
20 async def fetch_ip(service):
21     start = time.time()
22     print('Fetching IP from {}'.format(service.name))
23 
24     try:
25         json_response = await aiohttp_get_json(service.url)
26     except:
27         return '{} is unresponsive'.format(service.name)
28 
29     ip = json_response[service.ip_attr]
30 
31     return '{} finished with result: {}, took: {:.2f} seconds'.format(
32         service.name, ip, time.time() - start)
33 
34 
35 async def main():
36     futures = [fetch_ip(service) for service in SERVICES]
37     done, _ = await asyncio.wait(futures)
38 
39     for future in done:
40         print(future.result())
41 
42 
43 asyncio.run(main())
44 
45 Fetching IP from ip-api
46 Fetching IP from ipify
47 Fetching IP from borken
48 ipify finished with result: 81.106.46.223, took: 5.35 seconds
49 borken is unresponsive
50 ip-api finished with result: 81.106.46.223, took: 4.91 seconds

同样的,这是Python3.7之前版本的写法

 1 from collections import namedtuple
 2 import time
 3 import asyncio
 4 from concurrent.futures import FIRST_COMPLETED
 5 import aiohttp
 6 
 7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
 8 
 9 SERVICES = (
10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
11     Service('ip-api', 'http://ip-api.com/json', 'query')
12 )
13 
14 
15 async def aiohttp_get_json(url):
16     async with aiohttp.ClientSession() as session:
17         async with session.get(url) as response:
18             return await response.json()
19 
20 
21 async def fetch_ip(service):
22     start = time.time()
23     print('Fetching IP from {}'.format(service.name))
24 
25     json_response = await aiohttp_get_json(service.url)
26     ip = json_response[service.ip_attr]
27 
28     return '{} finished with result: {}, took: {:.2f} seconds'.format(
29         service.name, ip, time.time() - start)
30 
31 
32 async def main():
33     futures = [fetch_ip(service) for service in SERVICES]
34     done, pending = await asyncio.wait(
35         futures, return_when=FIRST_COMPLETED)
36 
37     print(done.pop().result())
38     for task in pending:
39         task.cancel()
40 
41 loop = asyncio.get_event_loop()
42 loop.run_until_complete(main())
43 loop.close
45 
46 
47 
48 Fetching IP from ipify
49 Fetching IP from ip-api
50 ipify finished with result: 58.135.78.16, took: 1.20 seconds
51 
52 Process finished with exit code 0

这种类型的“Task is destroyed but is was pending”错误在使用asyncio时非常常见,现在你知道它背后的原因以及如何避免它

Futures还允许在回到完成状态时附加回调,以防您想要添加额外的逻辑。您甚至可以手动设置Future的结果或exception ,通常用于单元测试。

 

Exception handling

Asyncio的全部内容是使并发代码易于管理和可读,并且在处理异常时变得非常明显。让我们回到一个例子来说明这一点。

想象一下,我们希望确保我们所有的IP服务都返回相同的结果,但我们的一项服务是脱机的而不是解决方案。我们可以简单地使用try ...except 

像这样:

from collections import namedtuple
import time
import asyncio
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)

async def aiohttp_get_json(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()


async def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        json_response = await aiohttp_get_json(service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    ip = json_response[service.ip_attr]

    return '{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start)


async def main():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = await asyncio.wait(futures)

    for future in done:
        print(future.result())


asyncio.get_event_loop().run_until_complete(main())


Connected to pydev debugger (build 183.4284.139)
Fetching IP from borken
Fetching IP from ip-api
Fetching IP from ipify
borken is unresponsive
ipify finished with result: 58.135.78.16, took: 34.84 seconds
ip-api finished with result: 58.135.78.16, took: 12.60 seconds

Process finished with exit code 0

我们还可以在处理Futures结果时处理异常,以防发生意外异常:

 1 from collections import namedtuple
 2 import time
 3 import asyncio
 4 import aiohttp
 5 import traceback
 6 
 7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
 8 
 9 SERVICES = (
10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
11     Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
12     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
13 )
14 
15 async def aiohttp_get_json(url):
16     async with aiohttp.ClientSession() as session:
17         async with session.get(url) as response:
18             return await response.json()
19 
20 
21 async def fetch_ip(service):
22     start = time.time()
23     print('Fetching IP from {}'.format(service.name))
24 
25     try:
26         json_response = await aiohttp_get_json(service.url)
27     except:
28         return '{} is unresponsive'.format(service.name)
29 
30     ip = json_response[service.ip_attr]
31 
32     return '{} finished with result: {}, took: {:.2f} seconds'.format(
33         service.name, ip, time.time() - start)
34 
35 
36 async def main():
37     futures = [fetch_ip(service) for service in SERVICES]
38     done, _ = await asyncio.wait(futures)
39 
40     for future in done:
41         try:
42             print(future.result())
43         except:
44             print("Unexpected error: {}".format(traceback.format_exc()))
45 
46 
47 asyncio.get_event_loop().run_until_complete(main())
48 
49 
50 Fetching IP from ip-api
51 Fetching IP from ipify
52 Fetching IP from borken
53 ipify finished with result: 58.135.78.16, took: 1.15 seconds
54 Unexpected error: Traceback (most recent call last):
55   File "/usr/local/envs/async/201812/异常处理在处理Futures时.py", line 46, in main
56     print(future.result())
57   File "/usr/local/envs/async/201812/异常处理在处理Futures时.py", line 34, in fetch_ip
58     ip = json_response[service.ip_attr]
59 KeyError: 'this-is-not-an-attr'
60 
61 borken is unresponsive
62 
63 Process finished with exit code 0

同样地,调度一个任务而不等待它完成被认为是一个bug,调度一个任务而不检索可能引发的异常也会抛出一个警告:

 1 from collections import namedtuple
 2 import time
 3 import asyncio
 4 import aiohttp
 5 
 6 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
 7 
 8 SERVICES = (
 9     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
10     Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
11     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
12 )
13 
14 async def aiohttp_get_json(url):
15     async with aiohttp.ClientSession() as session:
16         async with session.get(url) as response:
17             return await response.json()
18 
19 
20 async def fetch_ip(service):
21     start = time.time()
22     print('Fetching IP from {}'.format(service.name))
23 
24     try:
25         json_response = await aiohttp_get_json(service.url)
26     except:
27         print('{} is unresponsive'.format(service.name))
28     else:
29         ip = json_response[service.ip_attr]
30 
31         print('{} finished with result: {}, took: {:.2f} seconds'.format(
32             service.name, ip, time.time() - start))
33 
34 
35 async def main():
36     futures = [fetch_ip(service) for service in SERVICES]
37     await asyncio.wait(futures)  # intentionally ignore results
38 
39 
40 asyncio.get_event_loop().run_until_complete(main())
41 
42 
43 Fetching IP from ip-api
44 Fetching IP from ipify
45 Fetching IP from borken
46 borken is unresponsive
47 Task exception was never retrieved
48 ipify finished with result: 58.135.78.16, took: 1.29 seconds
49 future: <Task finished coro=<fetch_ip() done, defined at /usr/local/envs/async/201812/a.py:24> exception=KeyError('this-is-not-an-attr',)>
50 Traceback (most recent call last):
51   File "/usr/local/envs/async/201812/a.py", line 33, in fetch_ip
52     ip = json_response[service.ip_attr]
53 KeyError: 'this-is-not-an-attr'

这看起来非常像我们上一个例子的输出,减去了来自asyncio的tut-tut消息

Timeouts

如果我们不太关心IP?想象一下,它是一个更复杂的响应的一个很好的补充,但我们当然不希望让用户等待它。理想情况下,我们会将非阻塞调用设置为超时,之后我们只发送没有IP属性的复杂响应。

再次等待 只有我们需要的属性:

import time
import random
import asyncio
import aiohttp
import argparse
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
)

DEFAULT_TIMEOUT = 0.01


async def aiohttp_get_json(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()


async def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    await asyncio.sleep(random.randint(1, 3) * 0.1)
    try:
        json_response = await aiohttp_get_json(service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    ip = json_response[service.ip_attr]

    print('{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start))
    return ip


async def main(timeout):
    response = {
        "message": "Result from asynchronous.",
        "ip": "not available"
    }

    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = await asyncio.wait(
        futures, timeout=timeout, return_when=FIRST_COMPLETED)

    for future in pending:
        future.cancel()

    for future in done:
        response["ip"] = future.result()

    print(response)


parser = argparse.ArgumentParser()
parser.add_argument(
    '-t', '--timeout',
    help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
    default=DEFAULT_TIMEOUT, type=float)
args = parser.parse_args()

print("Using a {} timeout".format(args.timeout))
asyncio.get_event_loop().run_until_complete(main(args.timeout))


Using a 0.01 timeout
Fetching IP from ip-api
Fetching IP from ipify
{'message': 'Result from asynchronous.', 'ip': 'not available'}
原文地址:https://www.cnblogs.com/tcppdu/p/10083991.html