concurrent.futures モジュールの ProcessPoolExecutor は、プロセスプールを使った並列処理を簡潔に書けるクラスです。multiprocessing.Pool と同様の機能を、より高レベルな API で提供します。
基本的な使い方
ProcessPoolExecutor は with 文と組み合わせて使います。
from concurrent.futures import ProcessPoolExecutor def square(x): return x ** 2 if __name__ == "__main__": with ProcessPoolExecutor(max_workers=4) as executor: results = executor.map(square, [1, 2, 3, 4, 5]) print(list(results)) # [1, 4, 9, 16, 25]
max_workers でワーカープロセスの数を指定します。省略すると CPU コア数が使われます。
Pool との比較
ProcessPoolExecutor と multiprocessing.Pool は似た機能を持ちますが、API に違いがあります。
multiprocessing モジュール。map、imap、apply_async など多くのメソッドを持つ。
concurrent.futures モジュール。ThreadPoolExecutor と同じ API で統一されている。
スレッドとプロセスを切り替えやすいという点で、ProcessPoolExecutor は使い勝手が良いです。
submit メソッド
submit() は単一のタスクを投入し、Future オブジェクトを返します。
from concurrent.futures import ProcessPoolExecutor def compute(x, y): return x + y if __name__ == "__main__": with ProcessPoolExecutor() as executor: future = executor.submit(compute, 10, 20) print(future.result()) # 30
Future.result() で実行結果を取得できます。例外が発生した場合は、result() を呼んだ時点で再送出されます。
複数の Future を管理する
submit() で複数のタスクを投入し、それぞれの Future を管理できます。
from concurrent.futures import ProcessPoolExecutor, as_completed def process(n): return n ** 2 if __name__ == "__main__": with ProcessPoolExecutor() as executor: futures = [executor.submit(process, i) for i in range(5)] for future in as_completed(futures): print(future.result())
as_completed() は完了した順に Future を返すイテレータです。処理時間がばらつく場合に便利です。
map メソッド
map() は複数の引数に対して関数を並列実行し、結果をイテレータで返します。
from concurrent.futures import ProcessPoolExecutor def double(x): return x * 2 if __name__ == "__main__": with ProcessPoolExecutor() as executor: results = executor.map(double, range(10)) for r in results: print(r)
結果は入力と同じ順序で返されます。
timeout を指定する
map() や result() にはタイムアウトを指定できます。
from concurrent.futures import ProcessPoolExecutor, TimeoutError import time def slow_task(x): time.sleep(5) return x if __name__ == "__main__": with ProcessPoolExecutor() as executor: future = executor.submit(slow_task, 1) try: result = future.result(timeout=2) except TimeoutError: print("タイムアウトしました")
タイムアウトした場合でも、タスク自体はバックグラウンドで実行され続けます。
shutdown メソッド
with 文を使わない場合は、shutdown() を呼んでリソースを解放します。
from concurrent.futures import ProcessPoolExecutor def task(x): return x * 2 if __name__ == "__main__": executor = ProcessPoolExecutor() future = executor.submit(task, 10) print(future.result()) executor.shutdown(wait=True)
wait=True(デフォルト)だと、すべてのタスクが完了するまで待機します。wait=False だとすぐに戻りますが、実行中のタスクはバックグラウンドで継続します。