shan

Python concurrent.futures 模块实现并发

2020-05-15

原文标题Effortless Concurrency with Python’s concurrent.futures

用 Python 编写并发的代码是十分棘手的。你需要担心这些棘手的问题,例如手头上的任务是 I/O 还是 CPU 相关的,或者是否为实现并发性付出额外的努力甚至给你带来所需的提升。此外,全局解释器锁GIL的存在进一步限制了编写真正的并发代码。你可在不出错的前提下,简化相关并发的代码:

在Python中,如果手头的任务是 I/O 相关的,则可以使用标准库的threading 模块,或者如果任务是CPU相关的,则可以使用 multiprocessing 模块。threadingmultiprocessing 的这些 API 让你在编写并发代码时更加灵活以及容易控制,但代价是必须编写相对详细低级的代码,从而在核心逻辑之上增加了额外的复杂性。有的时候,当目标任务很复杂时,通常不可能在增加并发的同时避免复杂性。但是,可以使许多更简单的任务实现并发,从而不会增加太多额外的开销。

Python 标准库还包含一个名为concurrent.futures的模块。在 Python 3.2 中添加了此模块,用于为开发人员提供启动异步任务的高级接口。它是threadingmultiprocessing 模块之上的通用抽象层,用于提供使用线程池或进程池时运行任务的接口。当你想同时运行一段代码并且不需要threadingmultiprocessing API 公开的附加模块化功能时,这是一个完美的工具。

Anatomy of concurrent.futures

下面是官方文档中的内容:

concurrent.futures 模块提供异步执行回调高层接口。

这意味着你可以通过公共高级接口使用线程或进程异步运行子例程。基本上,该模块提供了一个名为 Executor 的抽象类。你无法直接实例化它,而是需要使用它提供的两个子类之一来运行任务。

1
2
3
4
5
6
7
8
9
10
11
Executor (抽象基类)

├── ThreadPoolExecutor

│ │Executor 类的一个具体子类
│ │用于通过底层的线程管理与 I/O 相关的任务

├── ProcessPoolExecutor

│ │Executor 类的一个具体子类
│ │用于通过底层的进程管理与 CPU 相关的任务

在内部,这两个类与 pools 交互并管理 worker。 future 用于管理 worker 计算的结果。要使用一组 workers ,应用程序需要创建适当的 executor 类的实例,然后提交它们使其运行。当每个任务启动时,将返回一个 Future 实例。当需要任务的结果时,应用程序可以使用 Future 对象进行阻塞,直到结果可以获得为止。这个模块提供了各种 API,以方便等待任务的完成,因此不需要直接管理Future对象。

Executor Objects

由于 ThreadPoolExecutorProcessPoolExecutor 具有相同的API接口,因此在这种情况下,我们主要讨论它们提供的两种方法。下面的描述来自官方文档。

submit(fn, args, *kwargs)

调度可调用对象 fn 以 fn(args, *kwargs) 来运行,并返回表示可调用对象执行的 Future 对象。

1
2
3
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

map(func, *iterables)类似,除了:

  • 可迭代对象是立即收集的,而不是延迟收集的;

  • func 是异步执行的,并且可以同时进行对 func 的多次调用。

    如果调用了__next __(),并且在最初调用 Executor.map() 的超时后结果不可用,则返回的迭代器引发了一个 concurrent.futures.TimeoutError。超时可以是 intfloat 。如果未指定超时或者是 None,则等待时间没有限制。如果 func 调用引发异常,则从迭代器中检索其值时将引发该异常。

    当使用ProcessPoolExecutor时,此方法将可迭代项分为许多块,并作为单独的任务提交给 pool。这些块的大小可以通过将 chunksize 设置为正整数来指定。对于非常长的可迭代对象,与默认大小为1相比,将 chunksize 设置为较大的值可以显著的提高性能。对于ThreadPoolExecutorchunksize是无效的。

Generic Workflows for Running Tasks Concurrently

我们大部分脚本都包含下面这样的变量:

1
2
for task in get_tasks():
perform(task)

这里,get_tasks 返回了一个可迭代对象,它包含了目标任务或者参数,在该任务或参数上需要应用特定的函数。任务通常会阻塞可调用对象,并且按照顺序执行,且一次只能运行一个任务。因为其顺序执行流程,所以这个逻辑是易于推理的。当任务数量少或单个任务的执行时间要求和复杂性较低时,这没什么问题。但是,当任务数量巨大或单个任务很耗时的时候,就可能很快失去控制。

一般的经验法则是,当任务主要是 I/O 相关时使用 ThreadPoolExecutor,例如:向多个 url 发送 http 请求,将大量文件保存到磁盘等。但当主要执行的任务是 CPU 相关时,应使用 ProcessPoolExecutor。它进行大量密集的计算,对大量图像应用预处理方法,一次处理多个文本文件等。

Running Tasks with Executor.submit

当你有大量的任务,你可以一次性排列它们然后等待这些任务的完成,之后你就可以收集这些任务的结果。

1
2
3
4
5
6
7
8
import concurrent.futures


with concurrent.futures.Executor() as executor:
futures = {executor.submit(perform, task) for task in get_tasks()}

for fut in concurrent.futures.as_completed(futures):
print(f"The outcome is {fut.result()}")

在这里,需要首先创建一个 Executor,该 Executor 在单独的进程或线程中管理所有正在运行的任务。使用 with 语句会创建一个上下文管理器,以确保在完成后通过隐式调用 executor.shutdown() 方法来清理所有杂散线程或进程。

在实际代码中,你需要根据可调用对象的性质,将 Executor 替换为 ThreadPoolExecutorProcessPoolExecutor 。然后,在集合推导式中开始所有任务。executor.submit() 方法管理每个任务。它会创建一个 Future 对象,该对象代表要完成的任务。排列好所有任务后,会调用方法concurrent.futures_as_completed(),该方法会在完成任务时产生 furtures 对象。 executor.result() 方法提供 perform(task) 的返回值,或者在失败的情况下引发异常。

executor.submit() 方法异步安排任务,并且不包含任何与原始任务有关的上下文。因此,如果你想用原始任务映射结果,则需要自己跟踪这些任务。

1
2
3
4
5
6
7
8
9
import concurrent.futures


with concurrent.futures.Executor() as executor:
futures = {executor.submit(perform, task): task for task in get_tasks()}

for fut in concurrent.futures.as_completed(futures):
original_task = futures[fut]
print(f"The result of {original_task} is {fut.result()}")

注意变量 futures,其中原始任务使用字典映射到其相应的 futures 。

Running Tasks with Executor.map

可以按照预定的顺序收集结果的另一种方法是使用execuror.map() 方法。

1
2
3
4
5
6
import concurrent.futures


with concurrent.futures.Executor() as executor:
for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
print(f"The result of {arg} is {res}")

注意map函数如何一次获取整个可迭代对象。它会立即而不是按照预定的顺序懒惰地返回结果。如果在操作过程中发生任何未处理的异常,也会立即引发该异常,并且执行不会继续进行。

在Python 3.5+ 中,executor.map()接收一个可选参数:chunksize。使用ProcessPoolExecutor时,在很长的可迭代中,与默认大小 1 相比,使用较大的 chunksize 值可以显著提高性能。使用ThreadPoolExecutor时,chunksize 无效。

A Few Real World Examples

在继续理解接下来的例子之前,让我们写一个小的decorator,它将有助于测量和比较并发和顺序代码之间的执行时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
from functools import wraps


def timeit(method):
@wraps(method)
def wrapper(*args, **kwargs):
start_time = time.time()
result = method(*args, **kwargs)
end_time = time.time()
print(f"{method.__name__} => {(end_time-start_time)*1000} ms")

return result
return wrapper
1
2
3
4
@timeit
def func(n):
def func(n):
return list(range(n))

这会打印出方法的名称以及执行它所花费的时间。

Download & Save Files from URLs with Multi-threading

首先,让我们从大量网址中下载一些 pdf 文件,然后将其保存到磁盘中。大概这是一个 I/O 相关的任务,我们将使用ThreadPoolExecutor类进行操作。但在此之前,让我们先按顺序进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from pathlib import Path
import urllib.request


def download_one(url):
"""
Downloads the specified URL and saves it to disk
"""

req = urllib.request.urlopen(url)
fullpath = Path(url)
fname = fullpath.name
ext = fullpath.suffix

if not ext:
raise RuntimeError("URL does not contain an extension")

with open(fname, "wb") as handle:
while True:
chunk = req.read(1024)
if not chunk:
break
handle.write(chunk)

msg = f"Finished downloading {fname}"
return msg


@timeit
def download_all(urls):
return [download_one(url) for url in urls]


if __name__ == "__main__":
urls = (
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
)

results = download_all(urls)
for result in results:
print(result)
>>> download_all => 22850.6863117218 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf

在上面的代码片段中,我主要定义了两个函数。 download_one 函数从给定的 URL 下载 pdf 文件并将其保存到磁盘。它检查 URL 中的文件是否具有扩展名,如果没有扩展名,则引发 RunTimeError 。如果在文件名中找到扩展名,它将逐块下载文件并将其保存到磁盘。第二个函数 download_all 只是遍历一系列 URL,并对每个 URL应用 download_one 函数。顺序代码大约需要22.8秒才能运行完成。现在,让我们看看相同代码的线程版本的性能如何。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pathlib import Path
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed


def download_one(url):
"""
Downloads the specified URL and saves it to disk
"""

req = urllib.request.urlopen(url)
fullpath = Path(url)
fname = fullpath.name
ext = fullpath.suffix

if not ext:
raise RuntimeError("URL does not contain an extension")

with open(fname, "wb") as handle:
while True:
chunk = req.read(1024)
if not chunk:
break
handle.write(chunk)

msg = f"Finished downloading {fname}"
return msg


@timeit
def download_all(urls):
"""
Create a thread pool and download specified urls
"""

with ThreadPoolExecutor(max_workers=13) as executor:
return executor.map(download_one, urls, timeout=60)


if __name__ == "__main__":
urls = (
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
)

results = download_all(urls)
for result in results:
print(result)
>>> download_all => 5042.651653289795 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf

并发版本的代码只需要顺序代码的 1/4 左右。注意,在该并发版本中,download_one 函数与以前的功能相同,但在 download_all 函数中,ThreadPoolExecutor 上下文管理器包装了 execute.map() 方法。函数download_one 和包含 URL 的可迭代项一起传递到 map 中。 timeout 参数确定线程在放弃管道中的单个任务之前将花费多长时间。 max_workers 表示要部署多少个工作器以生成和管理线程。一般的经验法则是使用 2 * multiprocessing.cpu_count() + 1。我的机器有 6 个物理核心和 12 个线程。所以 13 是我选择的值。

Note: You can also try running the above functions with ProcessPoolExecutor via the same interface and notice that the threaded version performs slightly better than due to the nature of the task.

Running Multiple CPU Bound Subroutines with Multi-processing

以下示例显示了 CPU 相关的哈希函数。主要函数将按顺序多次运行计算密集型哈希算法。然后,另一个函数将再次多次运行主要功能。让我们先按顺序运行该功能。

The following example shows a CPU bound hashing function. The primary function will sequentially run a compute intensive hash algorithm multiple times. Then another function will again run the primary function multiple times. Let’s run the function sequentially first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import hashlib


def hash_one(n):
"""A somewhat CPU-intensive task."""

for i in range(1, n):
hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

return "done"


@timeit
def hash_all(n):
"""Function that does hashing in serial."""

for i in range(n):
hsh = hash_one(n)

return "done"


if __name__ == "__main__":
hash_all(20)
>>> hash_all => 18317.330598831177 ms

如果分析 hash_onehash_all 函数,可以看到它们实际上正在运行两个计算密集型嵌套 for 循环。上面的代码大约需要 18 秒才能在顺序模式下运行完成。现在,使用 ProcessPoolExecutor 并行运行它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import hashlib
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def hash_one(n):
"""A somewhat CPU-intensive task."""

for i in range(1, n):
hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

return "done"


@timeit
def hash_all(n):
"""Function that does hashing in serial."""

with ProcessPoolExecutor(max_workers=10) as executor:
for arg, res in zip(range(n), executor.map(hash_one, range(n), chunksize=2)):
pass

return "done"


if __name__ == "__main__":
hash_all(20)
>>> hash_all => 1673.842430114746 ms

如果仔细观察,即使在并发版本中,hash_one函数中的for循环也会按顺序运行。但是,hash_all函数中的另一个for循环是通过多进程执行的。在这里,我使用了10个工作进程,块大小为 2。调整了工作进程数和块大小以实现最佳性能。如你所见,上述 CPU 密集型操作的并发版本比其顺序对应版本快 11 倍。

Avoiding Concurrency Pitfalls

由于current.futures提供了这样一个简单的 API,因此你可能会想将并发应用于手头的每个简单任务。但是,这不是一个好主意。首先,简单性具有一定的约束条件。这样,你可以将并发仅应用于最简单的任务,通常将函数映射到可迭代的对象或同时运行一些子例程。如果手头的任务需要排队,从多个进程中产生多个线程,那么你仍然需要诉诸较低级别的线程和多处理模块。

使用并发的另一个陷阱是使用ThreadPoolExecutor时可能发生的死锁情况。当与Future相关联的可调用对象等待另一个Future的结果时,它们可能永远不会释放对线程的控制并导致死锁。让我们看一下官方文档中的一个稍作修改的示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from concurrent.futures import ThreadPoolExecutor


def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5


def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6


with ThreadPoolExecutor(max_workers=2) as executor:
# here, the future from a depends on the future from b
# and vice versa
# so this is never going to be completed
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

print("Result from wait_on_b", a.result())
print("Result from wait_on_a", b.result())

在上面的示例中,函数 wait_on_b 取决于函数 wait_on_a 的结果(Future 对象的结果),同时,后一个函数的结果取决于前一个函数的结果。因此,上下文管理器中的代码块由于相互依赖而永远不会执行。这会造成死锁情况。让我们解释一下官方文档中的另一种死锁情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor


def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())


with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wait_on_future)
print(future.result())

当子例程生成嵌套的 Future 对象并在单个线程上运行时,通常会发生上述情况。在函数 wait_on_future中,executor.submit(pow, 5, 2)创建了另一个Future对象。由于我是使用单个线程运行整个过程,因此内部的 future 对象将阻塞该线程,并且上下文管理器中的外部 executor.submit() 方法不能使用任何线程。使用多进程可以避免这种情况,但是总的来说,这本身就是一个糟糕的设计。

在某些情况下,并发代码的性能可能会比顺序代码低。发生这种情况可能有多种原因。

  1. 使用线程来执行与 CPU 相关的任务
  2. 使用多进程来执行与 I/O 相关的任务
  3. 这些任务太琐碎,无法使用线程或多个进程来证明其合理性。

生成和销毁多个线程或进程带来了额外的开销。通常,线程比生成和销毁的进程快得多。但是,使用错误的并发类型实际上会减慢你的代码速度,而不会其提高性能。下面是一个简单的示例,其中 ThreadPoolExecutorProcessPoolExecutor 的性能均比其顺序对应的性能差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import math

PRIMES = [num for num in range(19000, 20000)]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@timeit
def main():
for number in PRIMES:
print(f"{number} is prime: {is_prime(number)}")


if __name__ == "__main__":
main()
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 67.65174865722656 ms

上面的示例验证列表中的数字是否为质数。我们对1000个数字运行了该函数,以确定它们是否为质数。顺序版本大约需要67毫秒来完成此操作。但是,请看下面的相同代码的线程版本所花费的时间(140毫秒)需要翻倍,才能完成相同的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import math

num_list = [num for num in range(19000, 20000)]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@timeit
def main():
with ThreadPoolExecutor(max_workers=13) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
print(f"{number} is prime: {prime}")


if __name__ == "__main__":
main()
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 140.17250061035156 ms

相同代码的多进程版本甚至更慢。这些任务并不能证明要打开这么多进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from concurrent.futures import ProcessPoolExecutor
import math

num_list = [num for num in range(19000, 20000)]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@timeit
def main():
with ProcessPoolExecutor(max_workers=13) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
print(f"{number} is prime: {prime}")


if __name__ == "__main__":
main()
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 311.3126754760742 ms

尽管从直观上看,检查质数的任务似乎应该是 CPU 相关的操作,但确定任务本身的计算量是否足以证明产生多个线程或进程的合理性也很重要。否则,你可能会得到比简单解决方案性能更差的复杂代码。

Remarks

All the pieces of codes in the blog were written and tested with python 3.8 on a machine running Ubuntu 18.04.

References

  1. concurrent.fututures- the official documentation
  2. Easy Concurrency in Python
  3. Adventures in Python with concurrent.futures
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章