17.4. concurrent.futures — 启动并行任务

3.2 新版功能.

源码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块提供异步执行回调高层接口。

异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口。

17.4.1. 执行器对象

class concurrent.futures.Executor

抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

submit(fn, *args, **kwargs)

调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对像代表可调用对象的执行。:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

类似于 map(func, *iterables) 除去:

  • 应立即收集 iterables 不要延迟再收集;
  • func 是异步执行的且对 func 的调用可以并发执行。

如果 __next__() 已被调用且返回的结果在对 Executor.map() 的原始调用经过 timeout 秒后还不可用,则已返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None,则不限制等待时间。

如果 func 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。

使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 chunksize 指定正整数设置。 对很长的迭代器来说,使用大的 chunksize 值比默认值 1 能显著地提高性能。 chunksizeThreadPoolExecutor 没有效果。

在 3.5 版更改: 加入 chunksize 参数。

shutdown(wait=True)

当待执行的期程完成执行后向执行者发送信号,它就会释放正在使用的任何资源。调用 Executor.submit()Executor.submit() 会在关闭后触发 RuntimeError

如果 waitTrue 则此方法只有在所有待执行的期程完成执行且释放已分配的资源后才会返回。 如果 waitFalse,方法立即返回,所有待执行的期程完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的期程完成执行后才退出。

如果使用 with 语句,你就可以避免显式调用这个方法,它将会停止 Executor (就好像 Executor.shutdown() 调用时 wait 设为 True 一样等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

17.4.2. ThreadPoolExecutor

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

当回调已关联了一个 Future 然后再等待另一个 Future 的结果时就会发生死锁情况。 例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

And:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

Executor 的一个子类,使用最多 max_workers 个线程的线程池来异步执行调用。

在 3.5 版更改: 如果 max_workersNone 或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor 则重于 I/O 操作而不是 CPU 运算,那么可以乘以 5,同时工作线程的数量可以比 ProcessPoolExecutor 的数量高。

3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。

17.4.2.1. ProcessPoolExecutor例子

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

17.4.3. ProcessPoolExecutor

ProcessPoolExecutorExecutor 的子类,它使用进程池来实现异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味着只可以处理和返回可序列化的对象。

__main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。

从提交给 ProcessPoolExecutor 的回调中调用 ExecutorFuture 方法会导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。可预计的行为没有定义,但执行器上的操作或它的期程会被冻结或死锁。

17.4.3.1. ProcessPoolExecutor例子

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

17.4.4. 期程对象

Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

cancel()

尝试取消调用。如果调用正在执行而且不能被取消那么方法返回 False,否则调用会被取消同时方法返回 True

cancelled()

如果调用成功取消返回 True

running()

如果调用正在执行而且不能被取消那么返回``True``。

done()

如果调用已被取消或正常结束那么返回 True

result(timeout=None)

返回调用返回的值。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。

如果 futrue 在完成前被取消则 CancelledError 将被触发。

如果调用引发了一个异常,这个方法也会引发同样的异常。

exception(timeout=None)

返回由调用引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。

如果 futrue 在完成前被取消则 CancelledError 将被触发。

如果调用正常完成那么返回 None

add_done_callback(fn)

附加可调用 fn 到期程。当期程被取消或完成运行时,将会调用 fn,而这个期程将作为它唯一的参数。

加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException 子类,这个行为没有定义。

如果期程已经完成或已取消,fn 会被立即调用。

下面这些 Future 方法用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

这个方法只可以在执行关联 Future 工作之前由 Executor 实现调用或由单测试调用。

如果这个方法返回 False 那么 Future 已被取消,即 Future.cancel() 已被调用并返回 True 。等待 Future 完成 (即通过 as_completed()wait()) 的线程将被唤醒。

如果这个方法返回 True 那么 Future 不会被取消并已将它变为正在运行状态,也就是说调用 Future.running() 时将返回 True

这个方法只可以被调用一次并且不能在调用 Future.set_result()Future.set_exception() 之后再调用。

set_result(result)

设置将 Future 关联工作的结果给 result

这个方法只可以由 Executor 实现和单元测试使用。

set_exception(exception)

设置 Future 关联工作的结果给 Exception exception

这个方法只可以由 Executor 实现和单元测试使用。

17.4.5. 模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 fs 指定的 Future 实例(可能由不同的 Executor 实例创建)完成。返回一个已被命名的2元元组集合。第一个集合被命名为 done ,包含等待完成前已完成的期程(正常结束或被取消)。第二个集合被命名为 not_done,包含未完成的期程。

timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。
concurrent.futures.as_completed(fs, timeout=None)

通过 fs 指定的 the Future 实例(可能由不同的 Executor 实例创建)返回一个迭代器,当它们完成时(正常结束或被取消)产生期程。任何被 fs 指定的重复期程都将会返回一次。首先产生调用 as_completed() 前已完成的期程。 当 __next__() 调用以及从原始调用到 as_completed() 的时间超过 timeout 秒后结果还不可用时返回的迭代器就会引发 concurrent.futures.TimeoutErrortimeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。

参见

PEP 3148 – futures - 异步执行指令。
该提案描述了Python标准库中包含的这个特性。

17.4.6. Exception类

exception concurrent.futures.CancelledError

future被取消时会触发。

exception concurrent.futures.TimeoutError

future运算超出给定的超时数值时触发。

exception concurrent.futures.process.BrokenProcessPool

Derived from RuntimeError, this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside).

3.3 新版功能.