Python の apply_async で非同期にプロセスを実行する

Python の multiprocessing.Pool には、非同期にタスクを実行する apply_async() メソッドがあります。タスクの投入と結果の取得を分離でき、より柔軟な並列処理が可能になります。

apply_async の基本

apply_async() はタスクを投入し、すぐに AsyncResult オブジェクトを返します。

from multiprocessing import Pool
import time

def slow_task(x):
    time.sleep(2)
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        result = pool.apply_async(slow_task, (5,))
        print("Task submitted")
        
        # 他の処理ができる
        print("Doing other work...")
        
        # 結果を取得(ブロックする)
        print(f"Result: {result.get()}")

apply_async() は即座に戻るので、結果を待つ間に他の処理を実行できます。

apply と apply_async の違い

apply(func, args)

同期的に実行し、結果を直接返す。結果が得られるまでブロック。

apply_async(func, args)

非同期に実行し、AsyncResult を返す。すぐに制御が戻る。

apply() は 1 つのタスクを同期的に実行するため、並列処理には向きません。

AsyncResult のメソッド

apply_async() が返す AsyncResult には、結果を取得するためのメソッドがあります。

get(timeout)結果を取得(タイムアウト指定可)
ready()タスクが完了したか確認
successful()例外なく完了したか確認
wait(timeout)完了を待機

複数のタスクを非同期に実行する

複数のタスクを投入し、それぞれの結果を取得できます。

from multiprocessing import Pool
import time

def process(n):
    time.sleep(1)
    return n * 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = [pool.apply_async(process, (i,)) for i in range(10)]
        
        # すべての結果を取得
        output = [r.get() for r in results]
        print(output)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

タスクは並列に実行されるため、全体の処理時間は大幅に短縮されます。

タイムアウト付きで結果を取得する

get(timeout) でタイムアウトを指定できます。

from multiprocessing import Pool, TimeoutError
import time

def slow_task():
    time.sleep(10)
    return "Done"

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.apply_async(slow_task)
        
        try:
            value = result.get(timeout=3)
        except TimeoutError:
            print("Task timed out")

タイムアウトすると multiprocessing.TimeoutError が発生します。

callback と error_callback

タスク完了時に呼ばれるコールバック関数を指定できます。

from multiprocessing import Pool

def compute(x):
    return x ** 2

def on_success(result):
    print(f"Success: {result}")

def on_error(error):
    print(f"Error: {error}")

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.apply_async(
            compute,
            (5,),
            callback=on_success,
            error_callback=on_error
        )
        result.wait()

callback は結果を引数に受け取り、error_callback は例外オブジェクトを受け取ります。

ready と successful で状態を確認する

ブロックせずにタスクの状態を確認できます。

from multiprocessing import Pool
import time

def task():
    time.sleep(2)
    return "Done"

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.apply_async(task)
        
        while not result.ready():
            print("Still working...")
            time.sleep(0.5)
        
        if result.successful():
            print(f"Result: {result.get()}")
        else:
            print("Task failed")

ready() は完了したかどうか、successful() は例外なく完了したかどうかを返します。

map_async との比較

map_async()map() の非同期版です。

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        result = pool.map_async(square, range(10))
        
        # 他の処理
        print("Processing...")
        
        # すべての結果を取得
        print(result.get())

apply_async は 1 つのタスク

map_async は複数のタスクを一括投入

どちらも AsyncResult を返す

get() で結果を取得

大量のタスクを一括投入する場合は map_async() のほうが効率的です。