Python の ProcessPoolExecutor でプロセスプールを使う
concurrent.futures モジュールの ProcessPoolExecutor は、プロセスプールを使った並列処理を簡潔に書けるクラスです。multiprocessing.Pool と同様の機能を、より高レベルな API で提供します。
基本的な使い方
ProcessPoolExecutor は with 文と組み合わせて使います。
from concurrent.futures import ProcessPoolExecutor
def square(x):
return x ** 2
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(square, [1, 2, 3, 4, 5])
print(list(results)) # [1, 4, 9, 16, 25]
max_workers でワーカープロセスの数を指定します。省略すると CPU コア数が使われます。
Pool との比較
ProcessPoolExecutor と multiprocessing.Pool は似た機能を持ちますが、API に違いがあります。
multiprocessing モジュール。map、imap、apply_async など多くのメソッドを持つ。
concurrent.futures モジュール。ThreadPoolExecutor と同じ API で統一されている。
スレッドとプロセスを切り替えやすいという点で、ProcessPoolExecutor は使い勝手が良いです。
submit メソッド
submit() は単一のタスクを投入し、Future オブジェクトを返します。
from concurrent.futures import ProcessPoolExecutor
def compute(x, y):
return x + y
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
future = executor.submit(compute, 10, 20)
print(future.result()) # 30
Future.result() で実行結果を取得できます。例外が発生した場合は、result() を呼んだ時点で再送出されます。
複数の Future を管理する
submit() で複数のタスクを投入し、それぞれの Future を管理できます。
from concurrent.futures import ProcessPoolExecutor, as_completed
def process(n):
return n ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = [executor.submit(process, i) for i in range(5)]
for future in as_completed(futures):
print(future.result())
as_completed() は完了した順に Future を返すイテレータです。処理時間がばらつく場合に便利です。
map メソッド
map() は複数の引数に対して関数を並列実行し、結果をイテレータで返します。
from concurrent.futures import ProcessPoolExecutor
def double(x):
return x * 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = executor.map(double, range(10))
for r in results:
print(r)
結果は入力と同じ順序で返されます。
timeout を指定する
map() や result() にはタイムアウトを指定できます。
from concurrent.futures import ProcessPoolExecutor, TimeoutError
import time
def slow_task(x):
time.sleep(5)
return x
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
future = executor.submit(slow_task, 1)
try:
result = future.result(timeout=2)
except TimeoutError:
print("タイムアウトしました")
タイムアウトした場合でも、タスク自体はバックグラウンドで実行され続けます。
shutdown メソッド
with 文を使わない場合は、shutdown() を呼んでリソースを解放します。
from concurrent.futures import ProcessPoolExecutor
def task(x):
return x * 2
if __name__ == "__main__":
executor = ProcessPoolExecutor()
future = executor.submit(task, 10)
print(future.result())
executor.shutdown(wait=True)
wait=True(デフォルト)だと、すべてのタスクが完了するまで待機します。wait=False だとすぐに戻りますが、実行中のタスクはバックグラウンドで継続します。