Python の multiprocessing.Pool には、非同期にタスクを実行する apply_async() メソッドがあります。タスクの投入と結果の取得を分離でき、より柔軟な並列処理が可能になります。
apply_async の基本
apply_async() はタスクを投入し、すぐに AsyncResult オブジェクトを返します。
from multiprocessing import Pool import time def slow_task(x): time.sleep(2) return x ** 2 if __name__ == "__main__": with Pool(4) as pool: result = pool.apply_async(slow_task, (5,)) print("Task submitted") # 他の処理ができる print("Doing other work...") # 結果を取得(ブロックする) print(f"Result: {result.get()}")
apply_async() は即座に戻るので、結果を待つ間に他の処理を実行できます。
apply と apply_async の違い
同期的に実行し、結果を直接返す。結果が得られるまでブロック。
非同期に実行し、AsyncResult を返す。すぐに制御が戻る。
apply() は 1 つのタスクを同期的に実行するため、並列処理には向きません。
AsyncResult のメソッド
apply_async() が返す AsyncResult には、結果を取得するためのメソッドがあります。
| get(timeout) | 結果を取得(タイムアウト指定可) |
| ready() | タスクが完了したか確認 |
| successful() | 例外なく完了したか確認 |
| wait(timeout) | 完了を待機 |
複数のタスクを非同期に実行する
複数のタスクを投入し、それぞれの結果を取得できます。
from multiprocessing import Pool import time def process(n): time.sleep(1) return n * 2 if __name__ == "__main__": with Pool(4) as pool: results = [pool.apply_async(process, (i,)) for i in range(10)] # すべての結果を取得 output = [r.get() for r in results] print(output) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
タスクは並列に実行されるため、全体の処理時間は大幅に短縮されます。
タイムアウト付きで結果を取得する
get(timeout) でタイムアウトを指定できます。
from multiprocessing import Pool, TimeoutError import time def slow_task(): time.sleep(10) return "Done" if __name__ == "__main__": with Pool() as pool: result = pool.apply_async(slow_task) try: value = result.get(timeout=3) except TimeoutError: print("Task timed out")
タイムアウトすると multiprocessing.TimeoutError が発生します。
callback と error_callback
タスク完了時に呼ばれるコールバック関数を指定できます。
from multiprocessing import Pool def compute(x): return x ** 2 def on_success(result): print(f"Success: {result}") def on_error(error): print(f"Error: {error}") if __name__ == "__main__": with Pool() as pool: result = pool.apply_async( compute, (5,), callback=on_success, error_callback=on_error ) result.wait()
callback は結果を引数に受け取り、error_callback は例外オブジェクトを受け取ります。
ready と successful で状態を確認する
ブロックせずにタスクの状態を確認できます。
from multiprocessing import Pool import time def task(): time.sleep(2) return "Done" if __name__ == "__main__": with Pool() as pool: result = pool.apply_async(task) while not result.ready(): print("Still working...") time.sleep(0.5) if result.successful(): print(f"Result: {result.get()}") else: print("Task failed")
ready() は完了したかどうか、successful() は例外なく完了したかどうかを返します。
map_async との比較
map_async() は map() の非同期版です。
from multiprocessing import Pool def square(x): return x ** 2 if __name__ == "__main__": with Pool(4) as pool: result = pool.map_async(square, range(10)) # 他の処理 print("Processing...") # すべての結果を取得 print(result.get())
apply_async は 1 つのタスク
map_async は複数のタスクを一括投入
どちらも AsyncResult を返す
get() で結果を取得
大量のタスクを一括投入する場合は map_async() のほうが効率的です。