Python のマルチプロセスで親プロセスが子プロセスの終了を待つには、join() メソッドを使います。複数のプロセスを効率的に待機する方法や、タイムアウトの設定について解説します。
join の基本
join() は、対象のプロセスが終了するまで呼び出し元をブロックします。
from multiprocessing import Process import time def worker(): time.sleep(2) print("Worker finished") if __name__ == "__main__": p = Process(target=worker) p.start() print("Waiting for worker...") p.join() # worker が終了するまで待機 print("Main finished")
join() を呼ばないと、メインプロセスが先に終了してしまう可能性があります。
複数プロセスの終了を待つ
複数のプロセスを待機する場合は、順番に join() を呼びます。
from multiprocessing import Process import time def worker(name, duration): time.sleep(duration) print(f"{name} finished") if __name__ == "__main__": processes = [ Process(target=worker, args=("A", 3)), Process(target=worker, args=("B", 1)), Process(target=worker, args=("C", 2)), ] for p in processes: p.start() for p in processes: p.join() print("All workers finished")
すべてのプロセスを start() してから join() を呼ぶことで、並列に実行されます。
タイムアウト付きの待機
join(timeout) でタイムアウトを指定できます。
from multiprocessing import Process import time def slow_worker(): time.sleep(10) print("Slow worker finished") if __name__ == "__main__": p = Process(target=slow_worker) p.start() p.join(timeout=3) # 最大 3 秒待機 if p.is_alive(): print("Worker is still running") p.terminate() p.join() # terminate 後も join が必要 else: print("Worker finished in time")
タイムアウト後に is_alive() でプロセスの状態を確認し、必要に応じて terminate() で強制終了できます。
exitcode で終了状態を確認する
join() 後に exitcode でプロセスの終了状態を確認できます。
| None | プロセスがまだ終了していない |
| 0 | 正常終了 |
| 正の数 | 例外で終了(例外のないエラー) |
| 負の数 | シグナルで終了(-N は シグナル N) |
from multiprocessing import Process import sys def success_task(): pass def error_task(): sys.exit(1) def exception_task(): raise ValueError("error") if __name__ == "__main__": p1 = Process(target=success_task) p2 = Process(target=error_task) p3 = Process(target=exception_task) for p in [p1, p2, p3]: p.start() p.join() print(f"exitcode: {p.exitcode}")
完了したプロセスから順に処理する
特定のプロセスの完了を待たずに、完了したものから順に処理したい場合があります。
from multiprocessing import Process import time def worker(name, duration): time.sleep(duration) return name if __name__ == "__main__": processes = [ Process(target=worker, args=("A", 3)), Process(target=worker, args=("B", 1)), Process(target=worker, args=("C", 2)), ] for p in processes: p.start() # 完了したプロセスから順に処理 while processes: for p in processes[:]: p.join(timeout=0.1) if not p.is_alive(): print(f"Process {p.name} finished") processes.remove(p)
ポーリングで確認する方法ですが、ProcessPoolExecutor の as_completed() を使うほうがスマートです。
ProcessPoolExecutor で待機する
concurrent.futures を使うと、より柔軟な待機ができます。
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED import time def task(n): time.sleep(n) return n if __name__ == "__main__": with ProcessPoolExecutor() as executor: futures = [executor.submit(task, i) for i in [3, 1, 2]] # 最初の 1 つが完了するまで待機 done, not_done = wait(futures, return_when=FIRST_COMPLETED) for f in done: print(f"Completed: {f.result()}")
wait() の return_when には FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED を指定できます。