Python の ProcessPoolExecutor でプロセスプールを使う

concurrent.futures モジュールの ProcessPoolExecutor は、プロセスプールを使った並列処理を簡潔に書けるクラスです。multiprocessing.Pool と同様の機能を、より高レベルな API で提供します。

基本的な使い方

ProcessPoolExecutorwith 文と組み合わせて使います。

from concurrent.futures import ProcessPoolExecutor

def square(x):
    return x ** 2

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(square, [1, 2, 3, 4, 5])
    print(list(results))  # [1, 4, 9, 16, 25]

max_workers でワーカープロセスの数を指定します。省略すると CPU コア数が使われます。

Pool との比較

ProcessPoolExecutormultiprocessing.Pool は似た機能を持ちますが、API に違いがあります。

Pool

multiprocessing モジュール。map、imap、apply_async など多くのメソッドを持つ。

ProcessPoolExecutor

concurrent.futures モジュール。ThreadPoolExecutor と同じ API で統一されている。

スレッドとプロセスを切り替えやすいという点で、ProcessPoolExecutor は使い勝手が良いです。

submit メソッド

submit() は単一のタスクを投入し、Future オブジェクトを返します。

from concurrent.futures import ProcessPoolExecutor

def compute(x, y):
    return x + y

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        future = executor.submit(compute, 10, 20)
        print(future.result())  # 30

Future.result() で実行結果を取得できます。例外が発生した場合は、result() を呼んだ時点で再送出されます。

複数の Future を管理する

submit() で複数のタスクを投入し、それぞれの Future を管理できます。

from concurrent.futures import ProcessPoolExecutor, as_completed

def process(n):
    return n ** 2

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(process, i) for i in range(5)]
        
        for future in as_completed(futures):
            print(future.result())

as_completed() は完了した順に Future を返すイテレータです。処理時間がばらつく場合に便利です。

map メソッド

map() は複数の引数に対して関数を並列実行し、結果をイテレータで返します。

from concurrent.futures import ProcessPoolExecutor

def double(x):
    return x * 2

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = executor.map(double, range(10))
        for r in results:
            print(r)

結果は入力と同じ順序で返されます。

timeout を指定する

map()result() にはタイムアウトを指定できます。

from concurrent.futures import ProcessPoolExecutor, TimeoutError
import time

def slow_task(x):
    time.sleep(5)
    return x

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        future = executor.submit(slow_task, 1)
        try:
            result = future.result(timeout=2)
        except TimeoutError:
            print("タイムアウトしました")

タイムアウトした場合でも、タスク自体はバックグラウンドで実行され続けます。

shutdown メソッド

with 文を使わない場合は、shutdown() を呼んでリソースを解放します。

from concurrent.futures import ProcessPoolExecutor

def task(x):
    return x * 2

if __name__ == "__main__":
    executor = ProcessPoolExecutor()
    future = executor.submit(task, 10)
    print(future.result())
    executor.shutdown(wait=True)

wait=True(デフォルト)だと、すべてのタスクが完了するまで待機します。wait=False だとすぐに戻りますが、実行中のタスクはバックグラウンドで継続します。