with concurrent.futures.Executor() as executor: futures = {executor.submit(perform, task): task for task in get_tasks()}
for fut in concurrent.futures.as_completed(futures): original_task = futures[fut] print(f"The result of {original_task} is {fut.result()}")
注意变量 futures,其中原始任务使用字典映射到其相应的 futures 。
Running Tasks with Executor.map
可以按照预定的顺序收集结果的另一种方法是使用execuror.map() 方法。
1 2 3 4 5 6
import concurrent.futures
with concurrent.futures.Executor() as executor: for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())): print(f"The result of {arg} is {res}")
Note: You can also try running the above functions with ProcessPoolExecutor via the same interface and notice that the threaded version performs slightly better than due to the nature of the task.
Running Multiple CPU Bound Subroutines with Multi-processing
以下示例显示了 CPU 相关的哈希函数。主要函数将按顺序多次运行计算密集型哈希算法。然后,另一个函数将再次多次运行主要功能。让我们先按顺序运行该功能。
The following example shows a CPU bound hashing function. The primary function will sequentially run a compute intensive hash algorithm multiple times. Then another function will again run the primary function multiple times. Let’s run the function sequentially first.
import time from concurrent.futures import ThreadPoolExecutor
defwait_on_b(): time.sleep(5) print(b.result()) # b will never complete because it is waiting on a. return5
defwait_on_a(): time.sleep(5) print(a.result()) # a will never complete because it is waiting on b. return6
with ThreadPoolExecutor(max_workers=2) as executor: # here, the future from a depends on the future from b # and vice versa # so this is never going to be completed a = executor.submit(wait_on_b) b = executor.submit(wait_on_a)
print("Result from wait_on_b", a.result()) print("Result from wait_on_a", b.result())
defwait_on_future(): f = executor.submit(pow, 5, 2) # This will never complete because there is only one worker thread and # it is executing this function. print(f.result())
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(wait_on_future) print(future.result())
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import math
num_list = [num for num in range(19000, 20000)]
defis_prime(n): if n < 2: returnFalse if n == 2: returnTrue if n % 2 == 0: returnFalse
sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: returnFalse returnTrue
@timeit defmain(): with ThreadPoolExecutor(max_workers=13) as executor: for number, prime in zip(PRIMES, executor.map(is_prime, num_list)): print(f"{number} is prime: {prime}")
if __name__ == "__main__": main() >>> 19088is prime: False ... 19089is prime: False ... 19090is prime: False ... ... ... main => 140.17250061035156 ms
from concurrent.futures import ProcessPoolExecutor import math
num_list = [num for num in range(19000, 20000)]
defis_prime(n): if n < 2: returnFalse if n == 2: returnTrue if n % 2 == 0: returnFalse
sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: returnFalse returnTrue
@timeit defmain(): with ProcessPoolExecutor(max_workers=13) as executor: for number, prime in zip(PRIMES, executor.map(is_prime, num_list)): print(f"{number} is prime: {prime}")
if __name__ == "__main__": main() >>> 19088is prime: False ... 19089is prime: False ... 19090is prime: False ... ... ... main => 311.3126754760742 ms
尽管从直观上看,检查质数的任务似乎应该是 CPU 相关的操作,但确定任务本身的计算量是否足以证明产生多个线程或进程的合理性也很重要。否则,你可能会得到比简单解决方案性能更差的复杂代码。
Remarks
All the pieces of codes in the blog were written and tested with python 3.8 on a machine running Ubuntu 18.04.