Python で子プロセスの終了を待つ
Python のマルチプロセスで親プロセスが子プロセスの終了を待つには、join() メソッドを使います。複数のプロセスを効率的に待機する方法や、タイムアウトの設定について解説します。
join の基本
join() は、対象のプロセスが終了するまで呼び出し元をブロックします。
from multiprocessing import Process
import time
def worker():
time.sleep(2)
print("Worker finished")
if __name__ == "__main__":
p = Process(target=worker)
p.start()
print("Waiting for worker...")
p.join() # worker が終了するまで待機
print("Main finished")
join() を呼ばないと、メインプロセスが先に終了してしまう可能性があります。
複数プロセスの終了を待つ
複数のプロセスを待機する場合は、順番に join() を呼びます。
from multiprocessing import Process
import time
def worker(name, duration):
time.sleep(duration)
print(f"{name} finished")
if __name__ == "__main__":
processes = [
Process(target=worker, args=("A", 3)),
Process(target=worker, args=("B", 1)),
Process(target=worker, args=("C", 2)),
]
for p in processes:
p.start()
for p in processes:
p.join()
print("All workers finished")
すべてのプロセスを start() してから join() を呼ぶことで、並列に実行されます。
タイムアウト付きの待機
join(timeout) でタイムアウトを指定できます。
from multiprocessing import Process
import time
def slow_worker():
time.sleep(10)
print("Slow worker finished")
if __name__ == "__main__":
p = Process(target=slow_worker)
p.start()
p.join(timeout=3) # 最大 3 秒待機
if p.is_alive():
print("Worker is still running")
p.terminate()
p.join() # terminate 後も join が必要
else:
print("Worker finished in time")
タイムアウト後に is_alive() でプロセスの状態を確認し、必要に応じて terminate() で強制終了できます。
exitcode で終了状態を確認する
join() 後に exitcode でプロセスの終了状態を確認できます。
| None | プロセスがまだ終了していない |
| 0 | 正常終了 |
| 正の数 | 例外で終了(例外のないエラー) |
| 負の数 | シグナルで終了(-N は シグナル N) |
from multiprocessing import Process
import sys
def success_task():
pass
def error_task():
sys.exit(1)
def exception_task():
raise ValueError("error")
if __name__ == "__main__":
p1 = Process(target=success_task)
p2 = Process(target=error_task)
p3 = Process(target=exception_task)
for p in [p1, p2, p3]:
p.start()
p.join()
print(f"exitcode: {p.exitcode}")
完了したプロセスから順に処理する
特定のプロセスの完了を待たずに、完了したものから順に処理したい場合があります。
from multiprocessing import Process
import time
def worker(name, duration):
time.sleep(duration)
return name
if __name__ == "__main__":
processes = [
Process(target=worker, args=("A", 3)),
Process(target=worker, args=("B", 1)),
Process(target=worker, args=("C", 2)),
]
for p in processes:
p.start()
# 完了したプロセスから順に処理
while processes:
for p in processes[:]:
p.join(timeout=0.1)
if not p.is_alive():
print(f"Process {p.name} finished")
processes.remove(p)
ポーリングで確認する方法ですが、ProcessPoolExecutor の as_completed() を使うほうがスマートです。
ProcessPoolExecutor で待機する
concurrent.futures を使うと、より柔軟な待機ができます。
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
import time
def task(n):
time.sleep(n)
return n
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in [3, 1, 2]]
# 最初の 1 つが完了するまで待機
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for f in done:
print(f"Completed: {f.result()}")
wait() の return_when には FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED を指定できます。