Python の Pool でプロセスプールを使う

Python の multiprocessing.Pool は、プロセスプールを管理するクラスです。あらかじめ複数のワーカープロセスを起動しておき、タスクを効率的に分散処理できます。

Pool の基本的な使い方

Pool を使うと、プロセスの作成と管理が自動化されます。

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:  # 4つのワーカープロセス
        results = pool.map(square, [1, 2, 3, 4, 5])
    print(results)  # [1, 4, 9, 16, 25]

with 文を使うことで、処理終了後にプロセスが自動的にクリーンアップされます。

Pool のコンストラクタ

Pool には以下の引数を指定できます。

processesワーカープロセスの数(省略時は CPU コア数)
initializer各ワーカーの初期化関数
initargsinitializer に渡す引数
maxtasksperchildワーカーが処理するタスクの上限

processes を省略すると、os.cpu_count() の値が使われます。

map メソッド

map() はイテラブルの各要素に関数を適用し、結果をリストで返します。

from multiprocessing import Pool

def double(x):
    return x * 2

if __name__ == "__main__":
    with Pool() as pool:
        results = pool.map(double, range(10))
    print(results)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

組み込みの map() と似ていますが、処理が並列に実行されます。

chunksize を指定する

大量のデータを処理する場合、chunksize を指定すると効率が上がります。

from multiprocessing import Pool

def process(x):
    return x ** 2

if __name__ == "__main__":
    data = range(10000)
    with Pool(4) as pool:
        results = pool.map(process, data, chunksize=100)

chunksize は、一度にワーカーに送るタスクの数です。デフォルトでは自動計算されますが、明示的に指定することでオーバーヘッドを減らせます。

imap メソッド

imap()map() の遅延評価版で、イテレータを返します。

from multiprocessing import Pool

def slow_square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        for result in pool.imap(slow_square, range(10)):
            print(result)

結果を順番に取得できるため、メモリ効率が良くなります。順序を気にしない場合は imap_unordered() を使うとさらに高速です。

map()

全結果をリストで返す。メモリを多く使う。

imap()

イテレータを返す。メモリ効率が良い。

Pool を閉じる

with 文を使わない場合は、明示的に close()join() を呼びます。

from multiprocessing import Pool

def task(x):
    return x * 2

if __name__ == "__main__":
    pool = Pool(4)
    results = pool.map(task, range(10))
    pool.close()  # 新しいタスクの受付を停止
    pool.join()   # すべてのワーカーの終了を待つ
    print(results)

close() を呼ぶと新しいタスクは受け付けなくなり、join() ですべてのタスクが完了するのを待ちます。terminate() を使うと、実行中のタスクを中断して即座に終了します。