shan

Python中的异步IO

2020-08-28

原文链接:Async IO in Python: A Complete Walkthrough

Setting Up Your Environment

You’ll need Python 3.7 or above to follow this article in its entirety, as well as the aiohttp and aiofiles packages:

1
2
3
$ python3.7 -m venv ./py37async
$ source ./py37async/bin/activate # Windows: .\py37async\Scripts\activate.bat
$ pip install --upgrade pip aiohttp aiofiles # Optional: aiodns

The 10,000-Foot View of Async IO

相比于久经考验的多进程和线程来说,Async IO 更少为人所知。这个小节会告诉你 async IO 是什么以及它所适用的环境。

Where Does Async IO Fit In?

并发和并行是非常难以涉及的主题。这篇文章聚焦于异步 IO(Async IO)以及它在 Python 中的实现,花费一点时间去比较异步 IO 和一些类似的概念是值得的,以了解异步 IO 如何适应大的,有时令人头晕的问题。

并行(Parallelism):可以同时执行多项操作。多进程(Multiprocessing)是实现并行的一种手段,它将任务分散到各个计算机的中央处理单元(CPUs)。多进程非常适合 CPU 相关的任务:例如 for 循环和数学相关的计算。

并发(Concurrency):是一个比并行更加宽泛的概念。这表明多个任务具有以重叠方式运行的能力。线程(Threading)是一种并发执行模型,其中多个线程轮流执行任务。一个进程可以包含多个线程。由于GIL,Python 与线程之间的关系非常复杂,但这超出了本文的范围。

重要的是要知道线程化对于 IO 相关的任务来说更好。CPU 密集型任务的特征是计算机内核从头到尾都在不断地努力工作,而 IO 相关的工作主要由大量等待输入/输出才能完成。

综上所述,并发包括多进程(理想的是 CPU 相关的任务)和线程(适用于IO 相关的任务)。多进程是并行的一种形式,并行是并发的特定类型(子集)。
Python通过它的 multiprocessingthreadingconcurrent.futures 等标准库对此提供了长期的的支持。

在过去的几年中,CPython 中更加全面地构建了单独的设计:异步 IO,它通过标准库的 asyncio 包以及新的 async 和 await 语言关键字而启用。需要明确的是,异步IO不是一个新发明的概念,它已经存在或正在其他语言和运行时环境中构建,例如GoC#Scala

Python文档将 asyncio 这个包标记为编写并发代码的库。但是,异步 IO 既不是线程化的,也不是多进程的,它不是建立在这两个之上的。

事实上,异步 IO 是一个单线程单进程的设计:它使用的是合作的多任务(cooperative multitasking)。换句话说,虽然异步 IO 使用单进程单线程,但是它给人一种并发执行的感觉。协程(异步 IO 的核心特征)可以调度并发,但他们不是内部的并发。

再次声明,异步 IO 是并发编程的一种方式,但是它不是并行的,与多进程相比,它与线程更加紧密地结合在一起,但两者却是截然不同的。

那么对于某些东西是异步的这种说法是什么意思?这并不是一个严格意义上的定义,但是对于我们这里的目的来说,我认为有两点属性:

  • 异步例程可以在等待其最终结果的同时”暂停“并让其他例程同时运行。
  • 通过上述机制,异步代码有助于并发执行。换句话说,异步代码给出了并发的外观。

这里有张图把上述的概念放在了一起。白色的代表概念,绿色的代表实现的方式:

接下来我不在进行并发编程模型之间的比较。本教程重点介绍作为异步IO的子组件,如何使用它以及随之而来的API。要全面了解线程,多进程和异步IO,请在此处暂停并查看 Jim Anderson 的 overview of concurrency in Python

Async IO Explained

异步 IO 一开始看可能是反直觉的和自相矛盾的。使用单核单线程去促进并发?我从来都不擅长制作示例,因此我会解释一个2017 年 PyCon 上 Miguel Grinberg 的演讲,它十分完美的解释了这些:

国际象棋大师 Judit Polgár 举办一个象棋展览,在这个展览上她扮演多个玩家角色,她有两种方式去组织这个展览:同步和异步。

假设:

  • 24 个对手
  • Judit 在 5 秒钟内移动棋子
  • 对手花费 55 秒去移动棋子
  • 游戏平均30对动作(总共60个动作)

同步版本:Judit 每次只能进行一个游戏,从来不进行两个及以上的游戏;每个游戏花费 (55 + 5) * 30 == 1800 seconds,或者30分钟。整个展示花费24*30==720minutes,或者说 12 个小时。

异步版本:Judit 从一个桌子到另一个桌子,然后从中移动其中的一个棋子。她在等待对手进行下一步移动的时候离开桌子。在24个游戏上的每步移动花费Judit 24 * 5 == 120 seconds,或者说2分钟。整个展览现在需要花费120*30==3600s,或者说1个小时。

Judit Polgár 只有一个,她有两只手并且每次只能移动一步。但是异步的进行游戏会将时间从12个小时降低为1个小时。因此,协作式多任务处理是一种有趣的说法,即程序的事件循环(稍后会详细介绍)与多个任务进行通信,以使每个任务在最佳时间轮流运行。

异步 IO 需要较长的等待时间,否则功能将会被阻塞,并允许其他功能在停机期间运行。

Async IO Is Not Easy

我听过下面一句话:尽可能的使用异步 IO,必要时使用线程。事实是,构建持久的多线程代码可能很困难并且容易出错。异步 IO 可以避免线程设计可能会遇到的某些潜在的速度起伏。

但是这并不能说明在Python中使用异步 IO就十分容易。请注意:当你冒险进入更加底层的时候,异步编程也可能变的十分困难!Python 的异步模型是基于诸如回调,事件,传输,协议和 futures 之类的概念构建的。其 API 不断变化的事实使其变得不那么容易。

幸运的是,asyncio 已经发展到其大部分功能不再是临时的状态,而其文档已得到了巨大的改进,与此相关的一些优质资源也开始出现。

The asyncio Package and async/await

现在,你已经对异步IO作为设计有了一定的了解,让我们来探讨 Python 的实现。Python的 asyncio 库(在Python 3.4中引入)及其两个关键字asyncawait具有不同的用途,但可以一起帮助你声明,构建,执行和管理异步代码。

The async/await Syntax and Native Coroutines

警告:请注意你在 Internet 上阅读的内容。 Python 的异步 IO API 已从 Python 3.4 迅速发展到 Python 3.7 。某些旧的模块不再被使用,现在介绍一些最初被禁止的但现在允许的一些功能。就我所知,本教程也将很快过时。

异步 IO 的核心是协程。协程是 Python 生成器函数的专用版本。让我们从基准的定义开始,然后逐步构建:协程是一种函数,可以在达到 return 之前暂停其执行,并且可以在一段时间内间接将控制权传递给另一个协程。

之后,你将深入研究如何将传统的生成器准确地用于协程。目前,了解协程工作方式的最简单方法是开始编写一些协程的代码。

让我们采用沉浸式的方法并编写一些异步 IO 代码。这个简短的程序是异步IO的Hello World,但是在解释其核心功能方面还有很长的路要走:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# countasync.py

import asyncio

async def count():
print("One")
await asyncio.sleep(1)
print("Two")

async def main():
await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

当你执行这个文件的时候,记录一下它和你仅仅使用 deftime.sleep() 定义的函数有什么不同:

1
2
3
4
5
6
7
8
$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.

这个输出的顺序就是异步IO的核心。与每个对 count() 的调用进行通信的都是一个单一的事件循环或协调器。当每个任务到达 await asyncio.sleep(1) 时,该函数会告诉事件循环:“我要去sleeping 1 秒,继续让其他有意义的事情同时进行。”,然后将程序的控制权交给事件循环。

下面是同步的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# countsync.py

import time

def count():
print("One")
time.sleep(1)
print("Two")

def main():
for _ in range(3):
count()

if __name__ == "__main__":
s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

When executed, there is a slight but critical change in order and execution time:

1
2
3
4
5
6
7
8
$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.

使用 time.sleep()asyncio.sleep()可能非常平淡无奇,但是它们通常代表任何的耗时程序等待时间的过程。也就是说,time.sleep() 可以表示任何耗时的阻塞函数调用,而 asyncio.sleep() 则用于代表非阻塞的调用(但也需要一些时间才能完成)。

正如你在下面的章节所看见的那样,await 诸如 asyncio.sleep() 之类的事情的好处是,当前函数可以暂时将控制权让给另一个更容易立即执行某项功能的函数。相比之下,time.sleep() 或者其他阻塞的调用与异步 Python 代码不兼容,因为它会在睡眠时间内停止所有内容。

The Rules of Async IO

在这点上,一个对于 async, await 来说更加正式的定义,并且由它们创建的协程函数是有序的。(At this point, a more formal definition of async, await, and the coroutine functions that they create are in order. )这部分内容比较繁琐,但是对于掌握 async /await 是有帮助的,因此,如果需要执行以下操作,请返回至此处:

  • 语法 async def 引入了原生协程(native coroutine)或一个异步生成器(asynchronous generator)async withasync for 这两个表达式也是有效的,稍后你就会看到它们。
  • 关键字 await 将函数的控制权交还给事件循环。它暂停了当前协程的执行。如果Python在g()的范围内遇到await f()表达式,这就是await告诉事件循环,“暂停执行g()直到我等待的是f()返回的结果。同时,让其他东西运行。”

在代码中,第二个要点大致如下:

1
2
3
4
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r

关于何时、如何、能不能使用async /await的有着一套严格的规定。无论你仍在学习语法还是已经接触过使用 async/await,这些都需要了解:

  • 你通过 async def 引入的函数是协程。它可以使用awaitreturnyield ,但是所有这些都是可选的。声明 async def noop(): pass 是有效的:
    • 使用 await或者return创建一个协程。要调用协程函数,必须 await 它以获得结果。
    • async def 块中使用yield是不太常见的(并且仅在 Python 中是最近才合法的)。这会创建一个异步生成器,你可以使用 async for 进行迭代。暂时不要使用异步生成器,而将注意力集中在使用 awaitreturn 的协程函数的语法上。
    • 任何使用async def定义的函数不要使用yield from,它会引发一个syntaxError
  • 就像在 def 函数之外使用 yield 会导致 SyntaxError ,在 async def 协程之外使用 await 也会引发 SyntaxError。你只能在协程体内使用await

这里有一些简短的例子去说明上面的规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y

async def g(x):
yield x # OK - this is an async generator

async def m(x):
yield from gen(x) # No - SyntaxError

def m(x):
y = await z(x) # Still no - SyntaxError (no `async def` here)
return y

最后,当你使用await f()时,要求f()awaitable。好吧,那不是很有帮助,是吗?现在,只知道一个 awaitable 对象是:(1)另一个协程或(2)一个返回是迭代器并且定义了 .__ await __() 方法的对象。如果你正在编写一个程序,则出于大多数目的,你只需要担心第一种情况。

这给我们带来了另一个可能会遇到的技术:将函数标记为协程的一种较旧的方法是用@asyncio.coroutine装饰一个普通的def函数。结果是基于生成器的协程。自从 Python 3.5 中引入了 async/await 语法以来,这种构造已经过时了。

这两个协程在本质上是等效的(两者都是awaitable),但是第一个是基于生成器,而第二个是原生协程

1
2
3
4
5
6
7
8
9
10
import asyncio

@asyncio.coroutine
def py34_coro():
"""Generator-based coroutine, older syntax"""
yield from stuff()

async def py35_coro():
"""Native coroutine, modern syntax"""
await stuff()

如果只是你自己编写任何代码,则最好使用原生协程,以使其显式而不是隐式。在 Python 3.10 中,将删除基于生成器的协程。

在本教程的后半部分,我们将仅出于说明的目的触及基于生成器的协程。引入async/await 的原因是为了使协程成为 Python 的独立特性,可以很容易地将其与普通的生成器函数区分开,从而减少了歧义。

不要陷入基于生成器的协程,这些协程已经被async/await 所淘汰。它们有自己的一小套规则(例如,await 不能在基于生成器的协程中使用),如果你坚持使用 async /await 语法,则它们在很大程度上是不相关的。

事不宜迟,让我们看更多的例子。

这是异步 IO 如何减少等待时间的一个示例:给定一个协程makerandom(),它不断产生范围为[0, 10]的随机整数,直到其中一个超过阈值为止,你希望多次调用这个协程不需要等待彼此相继完成。你可以在很大程度上遵循上述两个脚本的模式,并稍作更改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
# rand.py

import asyncio
import random

# ANSI colors
c = (
"\033[0m", # End of color
"\033[36m", # Cyan
"\033[91m", # Red
"\033[35m", # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
print(c[idx + 1] + f"Initiated makerandom({idx}).")
i = random.randint(0, 10)
while i <= threshold:
print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
await asyncio.sleep(idx + 1)
i = random.randint(0, 10)
print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
return i

async def main():
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
return res

if __name__ == "__main__":
random.seed(444)
r1, r2, r3 = asyncio.run(main())
print()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

彩色输出的内容可以使你对如何执行此脚本有所了解:

rand.py execution

该程序使用一个主要的协程 makerandom(),并在3个不同的输入上同时运行它。大多数程序包含小型模块化协程和一个用于将每个较小的协程链接在一起的包装器函数。然后,通过将中央协程映射到一些可迭代对象或pool中,使用main()来收集任务(futures)。

在此微型示例中,pool 为range(3)。在稍后提供的完整示例中,有一组URL需要同时请求、解析和处理,而 main() 是封装了每个 URL 的整个例程。

虽然 “making random integers”(它其实是CPU相关的)可能不是使用 asyncio 最佳选择,但在示例中却存在asyncio.sleep(),旨在模仿
IO相关的过程,其中涉及不确定的等待时间。例如,asyncio.sleep()调用可能表示在消息应用程序中两个客户端之间发送和接收非随机整数。

Async IO Design Patterns

异步 IO 附带了自己的一组可能的脚本设计,本节将介绍它们。

Chaining Coroutines

协程的一个关键特征是它们可以链接在一起。请记住,一个协程对象是 awaitable,因此另一个协程可以 await 它。这使你可以将程序分解为较小的,可管理的,可回收的协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3
# chained.py

import asyncio
import random
import time

async def part1(n: int) -> str:
i = random.randint(0, 10)
print(f"part1({n}) sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-1"
print(f"Returning part1({n}) == {result}.")
return result

async def part2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(f"part2{n, arg} sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(f"Returning part2{n, arg} == {result}.")
return result

async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await part1(n)
p2 = await part2(n, p1)
end = time.perf_counter() - start
print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
import sys
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start = time.perf_counter()
asyncio.run(main(*args))
end = time.perf_counter() - start
print(f"Program finished in {end:0.2f} seconds.")

注意输出,part1() 睡眠一段可变的时间,part2() 在结果可用时开始处理结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.

在这种设置中,main() 的运行时间将等于它收集在一起并调度的任务中最大的运行时间。

Using a Queue

asyncio库中提供了queue classes,它们被设计为类似于queue模块。到目前为止,在我们的示例中,我们实际上并不需要队列结构。在chained.py 中,每个任务(future)都由一组协程组成,这些协程明确地相互 await ,并通过每个链上都有一个输入。

还有一个也可以与异步IO一起使用的结构:许多彼此不相关的生产者将项目添加到队列中。每个生产者可以在交错、随机或未通知的时间内将多个项目添加到队列中。一群消费者在它们出现时将它们从队列中取出,而不必等待任何其他信号。

在这种设计中,没有任何个消费者链接到生产者。消费者不知道生产者的数量,甚至不知道将要添加到队列中的项目的累计数量。

每个生产者或消费者花费可变的时间分别从队列中放入和取出项目。队列用作可以与生产者和消费者进行通信的中间件,而无需他们彼此直接交谈。

注意:由于 queue.Queue() 的线程安全性,队列通常在线程程序中使用,但在异步IO方面,你不必担心线程安全。

队列的一个用例(如此处的情况)是充当生产者和消费者的发送者,而这些生产者和消费者原本不是直接链接或彼此关联的。

该程序的同步版本看起来非常令人沮丧:一组阻塞的生产者将项目串行添加到队列中,一次添加一个生产者。只有在所有生产者都完成之后,队列才能由一个消费者逐项处理。此设计存在大量延迟。项目可能闲置在队列中,而不是立即拿起并处理。

下面是一个异步版本 asyncq.py。此工作流程中具有挑战性的部分是,需要向消费者发出生产已完成的信号。否则,await q.get()将无限期挂起,因为队列已被完全处理,但是消费者不会知道生产已经完成。

下面是完整的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()

async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()

if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")

前几个协程是辅助函数,它们返回一个随机字符串,一个小数秒性能计数器和一个随机整数。生产者将1到5个项目放入队列中。每个项目都是 (i,t) 的元组,其中i是随机字符串,而 t 是生产者尝试将元组放入队列的时间。

消费者将商品取出时,它仅使用商品放入的时间戳来计算商品在队列中的经过时间。

请记住,asyncio.sleep()用于模仿其他更复杂的协程,如果这是常规的阻塞函数,则会消耗时间并阻止所有其他执行。

下面是由两个生产者和五个消费者进行的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added <377b1e8f82> to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added <413b8802f8> to queue.
Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element <413b8802f8> in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <06c055b3ab> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added <17a8613276> to queue.
Consumer 4 got element <17a8613276> in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.

在这种情况下,项目将在几分之一秒内完成处理。造成延迟的原因可能有两个:

  • 标准的,基本无法避免的开销
  • 队列中出现某个项目时所有消费者都处在sleep状态

关于第二个原因,幸运的是,扩展到成百上千的消费者是完全正常的。你使用 python3 asyncq.py -p 5 -c 100 是没有问题。这里的重点是,从理论上来说,你可以在不同的系统上使用不同的用户来控制生产者和消费者的管理,而队列则作为中心进行吞吐。

到目前为止,你已经陷入困境,并看到了三个相关的例子,分别是 asyncio 调用用 asyncawait 定义的协程。如果你不完全了解或只是想更深入地了解现代协程在Python中的使用机理,那么你将从下一节开始。

Async IO’s Roots in Generators

之前,你看到了一个基于生成器的老式协程的示例,该协程已被更明确的原生协程所淘汰。该示例值得稍作调整并用来展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

@asyncio.coroutine
def py34_coro():
"""Generator-based coroutine"""
# No need to build these yourself, but be aware of what they are
s = yield from stuff()
return s

async def py35_coro():
"""Native coroutine, modern syntax"""
s = await stuff()
return s

async def stuff():
return 0x10, 0x20, 0x30

作为实验,如果你单独调用 py34_coro()py35_coro() ,而没有 await ,或者没有任何对 asyncio.run() 或其他 asyncio 函数的调用,会发生什么情况? 孤立地调用协程将返回协程对象:

1
2
>>> py35_coro()
<coroutine object py35_coro at 0x10126dcc8>

这表面上看起来没有那么有趣。单独调用协程的结果是一个 awaitable 协程对象

Python还有其它的什么类似的特征吗?

希望你将生成器视为该问题的答案,因为协程是内部强化的生成器。在这方面,行为是相似的:

1
2
3
4
5
6
7
8
>>> def gen():
... yield 0x10, 0x20, 0x30
...
>>> g = gen()
>>> g # Nothing much happens - need to iterate with `.__next__()`
<generator object gen at 0x1012705e8>
>>> next(g)
(16, 32, 48)

碰巧的是,生成器函数是异步IO的基础(无论你是使用async def声明协程,还是使用旧的@asyncio.coroutine装饰器声明)。从技术上讲,awaityield相比更类似于yield from。(但请记住,yield from x() 仅仅是 for i in x(): yield i 的语法糖。)

与异步IO有关的生成器的一项关键功能是可以有效地随意停止和重启。例如,你可以 break 迭代生成器对象,然后在剩余的值上恢复迭代。当生成器函数达到 yield 时,它会yield该值,但随后会处于空闲状态,直到被告知要生成其后续值。

下面是一个关于这方面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> from itertools import cycle
>>> def endless():
... """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
... yield from cycle((9, 8, 7, 6))

>>> e = endless()
>>> total = 0
>>> for i in e:
... if total < 30:
... print(i, end=" ")
... total += i
... else:
... print()
... # Pause execution. We can resume later.
... break
9 8 7 6 9 8 7 6 9 8 7 6 9 8

>>> # Resume
>>> next(e), next(e), next(e)
(6, 9, 8)

关键字 await 的行为类似,它标出了协程自动挂起并让其他协程工作的断点。在这种情况下,break是指已暂时放弃控制权但尚未完全退出或结束的协程。请记住,yield(以及扩展为 yield fromawait )是生成器执行中的一个断点。

这是函数和生成器之间的根本区别。功能是全有还是全无。一旦启动,它将不会停止,直到它遇到一个return,然后将该值返回给调用者(调用它的函数)。另一方面,生成器每次遇到yield时都会暂停,并且不再前进。它不仅可以将此值推送到调用堆栈,而且还可以通过在其上调用 next() 来保留其局部变量。

生成器的第二个鲜为人知的功能也很重要。你也可以通过其 .send() 方法将值发送到生成器中。这允许生成器(和协程)在不阻塞的情况下相互调用(await)。我不会再赘述此功能了,因为它主要对幕后协程的实现很重要,但是你根本不需要自己直接使用它。

如果你有兴趣探索更多内容,可以从PEP 342开始,在此正式引入了协程。布雷特·坎农(Brett Cannon)的How the Heck Does Async-Await Work in Python也是一个很好的选择阅读,PYMOTW writeup on asyncio一样。最后,还有大卫·比兹利(David Beazley)的Curious Course on Coroutines and Concurrency,它深入探讨了协程的运行机制。

让我们尝试将以上所有文章压缩成几句话:这些协程实际上是通过一种特殊的非常规机制运行的。他们的结果是异常对象的属性,该异常对象在调用他们的 .send() 方法时被抛出。所有的这些还有更多的细节,但这可能无法帮助你在实践中使用这部分。

为了将事情联系在一起,以下是协程作为生成器的主题的一些关键点:

  • 协程是专用生成器,利用生成器方法的独特性。
  • 旧的基于生成器的协程使用yield from来等待协程结果。原生协程中的现代Python语法仅将yield from替换为await,作为等待协程结果的手段。
    await 类似于 yield from,并且通常有助于将其视为 yield from
  • 使用 await 是标记断点的信号。它允许协程暂时中止执行,并允许程序稍后返回。

Other Features: async for and Async Generators + Comprehensions

除了普通的async/await,Python 还使async for可以在异步迭代器上进行迭代。异步迭代器的目的是使它能够在迭代时在每个阶段调用异步代码。

这个概念的自然扩展是异步生成器。回想一下,你可以在本地协程中使用awaitreturnyield。在Python 3.6中(通过PEP 525)可以在协程中使用yield,该引入了异步生成器,目的是允许在同一协程函数体中使用awaityield

1
2
3
4
5
6
7
>>> async def mygen(u: int = 10):
... """Yield powers of 2."""
... i = 0
... while i < u:
... yield 2 ** i
... i += 1
... await asyncio.sleep(0.1)

最后,Python 通过 async for实现了异步表达式。和同步的表达式类似,这是一个巨大的语法糖:

1
2
3
4
5
6
7
8
9
10
11
12
>>> async def main():
... # This does *not* introduce concurrent execution
... # It is meant to show syntax only
... g = [i async for i in mygen()]
... f = [j async for j in mygen() if not (j // 3 % 5)]
... return g, f
...
>>> g, f = asyncio.run(main())
>>> g
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
>>> f
[1, 2, 16, 32, 256, 512]

这是一个至关重要的区别:异步生成器和表达式都不会使迭代并发。他们所做的只是提供同步对象的外观,但具有使相关循环放弃对事件循环的控制权以便其他协程运行的能力。

换句话说,异步迭代器和异步生成器并未设计为在序列或迭代器上同时映射某些功能。它们只是为了让封闭的协程允许其他任务轮流使用。仅在以下情况下才需要使用 async forasync with 语句:使用普通的 forwithbreak协程中await的性质。这个异步和并发之间的区别就是要把握的关键。

The Event Loop and asyncio.run()

你可以将事件循环想像成 while True 循环,该循环监视协程,获取有关空闲状态的反馈,并四处寻找可以同时执行的事情。当协程正在等待的东西可用时,它能够唤醒一个空闲的协程。

到目前为止,事件循环的整个管理已由一个函数调用隐式处理:

1
asyncio.run(main())  # Python 3.7+

负责获取事件循环,运行任务,直到将其标记为已完成,然后关闭事件循环。

使用get_event_loop(),可以更轻松地管理asyncio事件循环。典型的模式如下所示:

1
2
3
4
5
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

在较早的示例中,你可能会看到loop.get_event_loop()随处可见,但是除非你有特定的需求需要微调对事件循环管理的控制,否则对于大多数程序而言,asyncio.run()就足够了。

如果确实需要与 Python 程序中的事件循环进行交互,则 loop 是一个老式的 Python 对象,它通过 loop.is_running()loop.is_closed()支持自省。如果你需要获得更精细的控制,就可以操作这个对此昂,例如scheduling a callback,方法是将循环作为参数进行传递。

更关键的是要对事件循环的机制有一些了解。以下是有关事件循环的一些要点。

#1: 协程在绑定到事件循环之前并不会做很多事情。

你之前在有关生成器的说明中已经看到了这一点,但值得重申。如果你有一个等待的主协程,则简单地单独调用它几乎没有效果:

1
2
3
4
5
6
7
8
9
10
>>> import asyncio

>>> async def main():
... print("Hello ...")
... await asyncio.sleep(1)
... print("World!")

>>> routine = main()
>>> routine
<coroutine object main at 0x1027a6150>

记住使用 asyncio.run()通过调度main()协程(future object)在事件循环上执行来实际强制执行:

1
2
3
>>> asyncio.run(routine)
Hello ...
World!

(其他协程可以用await执行。通常将main()包装在asyncio.run()中,然后从中调用与await链接的协程。)

#2: 默认情况下,异步IO事件循环在单个线程和单个CPU内核上运行。通常,在一个CPU内核中运行一个单线程事件循环绰绰有余。还可以跨多个内核运行事件循环。

#3. 事件循环是可插入的。也就是说,如果你确实需要,可以编写自己的事件循环实现,并使它运行相同的任务。这在uvloop软件包中得到了很好的演示,该软件包是Cython中事件循环的实现。

这就是术语“可插入事件循环”的含义:你可以使用事件循环的任何可行实现,而与协程本身的结构无关。asyncio包本身附带两种不同的事件循环实现,默认设置基于selectors模块。(第二种实现仅适用于Windows。)

A Full Program: Asynchronous Requests

到目前为止,你已经学习了很多,现在该是有趣而轻松的部分了。在本部分中,你将使用aiohttp(一个非常快的异步HTTP客户端/服务器框架)构建一个抓取网址的网址收集器areq.py。 这样的工具可用于映射站点集群之间的连接,链接形成一个有向图
注意:你可能想知道为什么Python的requests包与异步IO不兼容。 requests建立在urllib3之上,而urllib3则使用 Python 的httpsocket模块。

默认情况下,套接字操作处于阻塞状态。这意味着Python不会喜欢 await requests.get(url),因为.get()不是 awaitable 的。相比之下,aiohttp中的几乎所有内容都是可以awaitable的协程,例如session.request()response.text()。否则,它是一个很棒的软件包,但是你在异步代码中使用requests没有任何用处。

高级程序结构如下所示:

  1. 从本地文件urls.txt中读取URL序列。
  2. 发送对URL的GET请求并解码结果内容。如果失败,请在此处停止输入URL。
  3. 在响应的HTML中的href标记内搜索URL。
  4. 将结果写入foundurls.txt
  5. 尽可能异步同时执行上述所有操作。(将aiohttp用于请求,将aiofiles用于文件附件。这是两个非常适合异步IO模型的IO主要示例。)

以下是urls.txt的内容。它并不庞大,并且包含流量最高的网站:

1
2
3
4
5
6
7
8
9
$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

列表中的第二个URL应该返回404响应,你需要对其进行适当处理。如果你正在运行此程序的扩展版本,则可能需要处理比这更棘手的问题,例如服务器断开连接和无止尽的重定向。

请求本身应使用单个 session 发出,以充分利用会话的内部连接池。

让我们看一下完整的程序。我们将逐步介绍以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.DEBUG,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
"""GET request wrapper to fetch page HTML.

kwargs are passed to `session.request()`.
"""

resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()
logger.info("Got response [%s] for URL: %s", resp.status, url)
html = await resp.text()
return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
"""Find HREFs in the HTML of `url`."""
found = set()
try:
html = await fetch_html(url=url, session=session, **kwargs)
except (
aiohttp.ClientError,
aiohttp.http_exceptions.HttpProcessingError,
) as e:
logger.error(
"aiohttp exception for %s [%s]: %s",
url,
getattr(e, "status", None),
getattr(e, "message", None),
)
return found
except Exception as e:
logger.exception(
"Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
)
return found
else:
for link in HREF_RE.findall(html):
try:
abslink = urllib.parse.urljoin(url, link)
except (urllib.error.URLError, ValueError):
logger.exception("Error parsing URL: %s", link)
pass
else:
found.add(abslink)
logger.info("Found %d links for %s", len(found), url)
return found

async def write_one(file: IO, url: str, **kwargs) -> None:
"""Write the found HREFs from `url` to `file`."""
res = await parse(url=url, **kwargs)
if not res:
return None
async with aiofiles.open(file, "a") as f:
for p in res:
await f.write(f"{url}\t{p}\n")
logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
"""Crawl & write concurrently to `file` for multiple `urls`."""
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
write_one(file=file, url=url, session=session, **kwargs)
)
await asyncio.gather(*tasks)

if __name__ == "__main__":
import pathlib
import sys

assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent

with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))

outpath = here.joinpath("foundurls.txt")
with open(outpath, "w") as outfile:
outfile.write("source_url\tparsed_url\n")

asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

该脚本比我们最初的玩具程序要长,所以让我们对其进行分解。

常量HREF_RE是一个正则表达式,用于提取我们最终在HTML中寻找的href标签:

1
2
>>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
<re.Match object; span=(15, 45), match='href="https://realpython.com/"'>

协程fetch_html()是GET请求的包装,用于发出请求并解码生成的页面HTML。它发出请求,等待响应,并在非200状态下立即引发:

1
2
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()

如果状态正常,fetch_html()返回页面HTML(str)。值得注意的是,此函数没有完成异常处理。逻辑是将该异常传播给调用者,并在那里进行处理:

1
html = await resp.text()

我们await session.request()resp.text()是因为它们是可以 awaitable 的协程。否则请求/响应周期将是应用程序的耗时的部分,但是对于异步IO来说,fetch_html()使事件循环可用于其他易于获取的作业上,例如解析和写入已获取的URL。

协程链中的下一个是parse(),它在fetch_html()中等待给定的URL,然后从该页面的HTML中提取所有href标签,确保每个标签均有效并对其进行格式化作为绝对路径。

诚然,parse() 的第二部分是阻塞的,但是它由快速的正则表达式匹配组成,并确保将发现的链接设置为绝对路径。

在这种特定情况下,此同步代码应该快速而不起眼。但是请记住,给定协程中的任何行都将阻止其他协程,除非该行使用yieldawaitreturn。如果解析是一个比较繁琐的过程,则可能需要考虑使用[loop.run_in_executor()](https://docs.python.org/3/library/asyncio-eventloop。
html#executing-code-in-thread-或-process-pools)。

接下来,协程 write()接收一个文件对象和一个URL,然后等待parse()返回一组解析的URL,通过使用aiofiles包将每个URL及其源URL异步写入文件中。

最后,bulk_crawl_and_write()是脚本协程链的主要入口。它使用单个会话,并为最终从 urls.txt 中读取的每个URL创建一个任务。

以下是一些值得一提的其他要点:

  • 默认的 ClientSession具有一个adapter,最多具有100个打开的连接。要更改此设置,请将asyncio.connector.TCPConnector的实例传递给ClientSession。你还可以基于每个主机指定限制。
  • 你可以为整个会话和各个请求指定最大超时
  • 此脚本还使用了async with,可与异步上下文管理器一起使用。我没有专门讨论这个概念,因为从同步上下文管理器到异步上下文管理器的过渡非常简单。后者必须定义.__aenter__().__aexit__()而不是.__exit__().__enter__()。如你所料,async with只能在用async def声明的协程函数中使用。

如果你想进一步了解,请在GitHub上查看本教程的companion files,并附带注释和文档字符串。

这是所有执行过程的结果,因为areq.py在不到一秒钟的时间内获取,解析并保存了9个网址的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/s
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/

作为健全性检查,你可以检查输出中的行数。以我的为例,是626,但请注意,这可能会有所波动:

1
2
3
4
5
6
7
$ wc -l foundurls.txt
626 foundurls.txt

$ head -n 3 foundurls.txt
source_url parsed_url
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos

后续步骤:如果你想要继续提高,请使此网络爬虫递归爬取。你可以使用aio-redis跟踪树中已爬网址的URL以避免两次请求,并使用Python的networkx库连接当中的链接。

发送1000个并发请求到一个小的,毫无戒心的网站是不好的。有一些方法可以限制你在一批中发出的并发请求,例如使用asyncio的sempahore对象或使用模式像这样。如果你不注意此警告,则可能会收到大量的 TimeoutError 异常,最终只会损害你自己的程序。

Async IO in Context

既然你已经看了很多健壮的代码,那么让我们退一步,考虑一下什么时候异步IO是理想的选择,以及如何进行比较以得出结论或选择其他并发模型。

When and Why Is Async IO the Right Choice?

本教程没有关于异步IO,线程与多进程的扩展论述。但是,了解异步IO什么时候会是三个中的最佳候选者很有用。

异步IO与多进程之间的斗争根本不是一场争斗。实际上,它们可以一起使用。如果你在诸如scikit-learn之类的库中有多个相当统一的CPU绑定任务),多进程应该是显而易见的选择。

如果所有函数都使用阻塞调用,则在每个函数前简单地将 async 放在前面是个坏主意。这实际上可能会减慢你的代码的速度。但是如前所述,异步IO和多进程可以在某些地方共存

异步IO和线程之间的竞争更为直接。我在导言中提到线程很难。全文是,即使在线程似乎易于实现的情况下,由于竞争条件和内存使用等原因,它仍可能导致臭名昭著的无法跟踪的错误。

由于线程是具有有限可用性的系统资源,因此线程也往往不如异步IO那样灵活地扩展。在许多计算机上创建数千个线程将失败,因此我不建议你首先尝试。但
创建数千个异步IO任务是完全可行的。

当你有多个IO绑定任务时,异步IO会发挥作用,否则这些任务将通过阻塞IO等待时间来控制,例如:

  • 网络IO,无论你的程序是服务器还是客户端
  • 无服务器设计,例如peer-to-peer,多用户网络,如群组聊天室-读/写操作,
  • 你希望模仿“即发即弃”风格,但不必担心在阅读和写入内容时会受到限制

不使用它的最大原因是await只支持定义一组特定方法的一组特定对象。如果你想对某个DBMS执行异步读取操作,则不仅需要查找该DBMS的Python包,还需要查找支持async /await语法的包。包含同步调用的协程会阻止其他协程和任务运行。

有关可与async/await一起使用的库的简短列表,请参阅列表

Async IO It Is, but Which One?

本教程重点介绍异步IO,async/await语法以及如何使用asyncio进行事件循环管理和指定任务。asyncio当然不是唯一的异步IO库。Nathaniel J.Smith 的观察表明:

[In] a few years, asyncio might find itself relegated to becoming one of those stdlib libraries that savvy developers avoid, like urllib2.

What I’m arguing, in effect, is that asyncio is a victim of its own success: when it was designed, it used the best approach possible; but since then, work inspired by asyncio – like the addition of async/await – has shifted the landscape so that we can do even better, and now asyncio is hamstrung by its earlier commitments. (Source)

为此,虽然具有不同的API和不同的方法,但可以执行asyncio功能的一些有名的替代方法有curiotrio 。我个人认为,如果你要构建大小适中,简单易用的程序,仅使用asyncio就足够了并且易于理解,并且可以避免在Python标准库之外添加其他大型依赖项。

但是,一定要检查一下curiotrio这两个库,你可能会发现它们以一种对用户来说更直观的方式完成了相同的工作。这里介绍的许多与包无关的概念也渗透到其他异步IO软件包中。

Odds and Ends

在接下来的几节中,将会向你介绍asyncioasync/await的其他部分,这些部分到目前为止还没有很好地融入本教程,但是对于构建和理解完整的程序仍然很重要。

Other Top-Level asyncio Functions

除了asyncio.run()之外,你还看到了其他一些包级的功能,例如asyncio.create_task()asyncio.gather()。你可以使用create_task()调度协程对象的执行,然后使用asyncio.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> import asyncio

>>> async def coro(seq) -> list:
... """'IO' wait time is proportional to the max element."""
... await asyncio.sleep(max(seq))
... return list(reversed(seq))
...
>>> async def main():
... # This is a bit redundant in the case of one task
... # We could use `await coro([3, 2, 1])` on its own
... t = asyncio.create_task(coro([3, 2, 1])) # Python 3.7+
... await t
... print(f't: type {type(t)}')
... print(f't done: {t.done()}')
...
>>> t = asyncio.run(main())
t: type <class '_asyncio.Task'>
t done: True

这种模式有一个精妙之处:如果你不在main()await t,则可能会在main()本身已完成之前发出提示。因为asyncio.run(main())调用loop.run_until_complete(main()),事件循环仅仅关注(没有 await t 存在)main()是否完成,而不关注在main()中创建的任务是否完成。
如果没有await t,则循环的其他任务将被取消。如果需要获取当前待处理任务的列表,可以使用asyncio.Task.all_tasks()

注意asyncio.create_task()是Python 3.7中引入的。在Python 3.6或更低版本中,使用asyncio.ensure_future()代替create_task()

另外,还有 asyncio.gather()。尽管它没有做任何特别的事情,但是gather()的作用是将一系列协程(futures)巧妙地放入一个单一的 future 中。然后,它返回一个 future 对象,并且,如果你 await asyncio.gather() 并指定多个任务或协程,则你正在等待所有任务或协程完成。这与先前示例中的queue.join()有点相似。gather()的结果将是输入中结果的列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> import time
>>> async def main():
... t = asyncio.create_task(coro([3, 2, 1]))
... t2 = asyncio.create_task(coro([10, 5, 0])) # Python 3.7+
... print('Start:', time.strftime('%X'))
... a = await asyncio.gather(t, t2)
... print('End:', time.strftime('%X')) # Should be 10 seconds
... print(f'Both tasks done: {all((t.done(), t2.done()))}')
... return a
...
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]

你可能已经注意到gather()等待传递给它的 Futures 或协程的结果集。或者,你可以遍历asyncio.as_completed()以按完成顺序获取任务完成时的任务。
该函数返回一个迭代器,该迭代器在完成任务时 yield 任务。下面,coro([3,2,1])的结果将在coro([10,5,0])完成之前可用,而gather()则不是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> async def main():
... t = asyncio.create_task(coro([3, 2, 1]))
... t2 = asyncio.create_task(coro([10, 5, 0]))
... print('Start:', time.strftime('%X'))
... for res in asyncio.as_completed((t, t2)):
... compl = await res
... print(f'res: {compl} completed at {time.strftime("%X")}')
... print('End:', time.strftime('%X'))
... print(f'Both tasks done: {all((t.done(), t2.done()))}')
...
>>> a = asyncio.run(main())
Start: 09:49:07
res: [1, 2, 3] completed at 09:49:10
res: [0, 5, 10] completed at 09:49:17
End: 09:49:17
Both tasks done: True

最后,你可能还会看到asyncio.ensure_future()。你几乎不需要它,因为它是一个较低级的管道API,并在很大程度上被稍后介绍的create_task()所取代。

The Precedence of await

尽管它们的行为有些相似,但是关键字await 的优先级明显高于yield。这意味着,由于绑定更为紧密,在许多情况下,你需要在yield from语句中使用括号,而在类似的 await 语句中则不需要。有关更多信息,请参阅PEP 492中的await表达式示例

Conclusion

现在,你已经可以使用async/await以及由此构建的库。以下是你所介绍内容的回顾:

  • 异步IO作为与语言无关的模型,以及通过让协程彼此间接通信来实现并发的方式
  • Python新的asyncawait关键字的细节,标记和定义协程
  • asyncio,Python包,提供运行和管理协程的API
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章