Python で子プロセスの終了を待つ

Python のマルチプロセスで親プロセスが子プロセスの終了を待つには、join() メソッドを使います。複数のプロセスを効率的に待機する方法や、タイムアウトの設定について解説します。

join の基本

join() は、対象のプロセスが終了するまで呼び出し元をブロックします。

from multiprocessing import Process
import time

def worker():
    time.sleep(2)
    print("Worker finished")

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()
    print("Waiting for worker...")
    p.join()  # worker が終了するまで待機
    print("Main finished")

join() を呼ばないと、メインプロセスが先に終了してしまう可能性があります。

複数プロセスの終了を待つ

複数のプロセスを待機する場合は、順番に join() を呼びます。

from multiprocessing import Process
import time

def worker(name, duration):
    time.sleep(duration)
    print(f"{name} finished")

if __name__ == "__main__":
    processes = [
        Process(target=worker, args=("A", 3)),
        Process(target=worker, args=("B", 1)),
        Process(target=worker, args=("C", 2)),
    ]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()
    
    print("All workers finished")

すべてのプロセスを start() してから join() を呼ぶことで、並列に実行されます。

タイムアウト付きの待機

join(timeout) でタイムアウトを指定できます。

from multiprocessing import Process
import time

def slow_worker():
    time.sleep(10)
    print("Slow worker finished")

if __name__ == "__main__":
    p = Process(target=slow_worker)
    p.start()
    
    p.join(timeout=3)  # 最大 3 秒待機
    
    if p.is_alive():
        print("Worker is still running")
        p.terminate()
        p.join()  # terminate 後も join が必要
    else:
        print("Worker finished in time")

タイムアウト後に is_alive() でプロセスの状態を確認し、必要に応じて terminate() で強制終了できます。

exitcode で終了状態を確認する

join() 後に exitcode でプロセスの終了状態を確認できます。

Noneプロセスがまだ終了していない
0正常終了
正の数例外で終了(例外のないエラー)
負の数シグナルで終了(-N は シグナル N)
from multiprocessing import Process
import sys

def success_task():
    pass

def error_task():
    sys.exit(1)

def exception_task():
    raise ValueError("error")

if __name__ == "__main__":
    p1 = Process(target=success_task)
    p2 = Process(target=error_task)
    p3 = Process(target=exception_task)
    
    for p in [p1, p2, p3]:
        p.start()
        p.join()
        print(f"exitcode: {p.exitcode}")

完了したプロセスから順に処理する

特定のプロセスの完了を待たずに、完了したものから順に処理したい場合があります。

from multiprocessing import Process
import time

def worker(name, duration):
    time.sleep(duration)
    return name

if __name__ == "__main__":
    processes = [
        Process(target=worker, args=("A", 3)),
        Process(target=worker, args=("B", 1)),
        Process(target=worker, args=("C", 2)),
    ]
    
    for p in processes:
        p.start()
    
    # 完了したプロセスから順に処理
    while processes:
        for p in processes[:]:
            p.join(timeout=0.1)
            if not p.is_alive():
                print(f"Process {p.name} finished")
                processes.remove(p)

ポーリングで確認する方法ですが、ProcessPoolExecutoras_completed() を使うほうがスマートです。

ProcessPoolExecutor で待機する

concurrent.futures を使うと、より柔軟な待機ができます。

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
import time

def task(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(task, i) for i in [3, 1, 2]]
        
        # 最初の 1 つが完了するまで待機
        done, not_done = wait(futures, return_when=FIRST_COMPLETED)
        
        for f in done:
            print(f"Completed: {f.result()}")

wait()return_when には FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETED を指定できます。