Python でプロセスの例外を処理する

Python のマルチプロセスでは、子プロセスで発生した例外は親プロセスには直接伝わりません。例外を適切に処理するには、いくつかの方法を使い分ける必要があります。

子プロセスの例外は親に伝わらない

子プロセスで例外が発生しても、親プロセスは何も知りません。

from multiprocessing import Process

def buggy_task():
    raise ValueError("Something went wrong")

if __name__ == "__main__":
    p = Process(target=buggy_task)
    p.start()
    p.join()
    print(f"Exit code: {p.exitcode}")  # 1(異常終了)

例外が発生するとプロセスは終了し、exitcode が 1 になります。しかし、例外の内容は親プロセスには伝わりません。

Queue を使って例外を伝播する

Queue を使って、例外情報を親プロセスに送ることができます。

from multiprocessing import Process, Queue
import traceback

def worker(q):
    try:
        # 何かの処理
        raise ValueError("Error in worker")
    except Exception as e:
        q.put(("error", e, traceback.format_exc()))
    else:
        q.put(("success", None, None))

if __name__ == "__main__":
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    p.join()
    
    status, error, tb = q.get()
    if status == "error":
        print(f"Error: {error}")
        print(tb)

traceback.format_exc() でスタックトレースも取得できます。

ProcessPoolExecutor での例外処理

ProcessPoolExecutor を使う場合、Future.result() を呼ぶと例外が再送出されます。

from concurrent.futures import ProcessPoolExecutor

def divide(x, y):
    return x / y

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        future = executor.submit(divide, 10, 0)
        
        try:
            result = future.result()
        except ZeroDivisionError as e:
            print(f"Caught: {e}")

子プロセスで例外発生

Future に例外が記録される

result() を呼ぶと例外が再送出

親プロセスで捕捉できる

Pool での例外処理

Pool.apply_async() の場合も、get() で例外が再送出されます。

from multiprocessing import Pool

def risky_task(x):
    if x == 0:
        raise ValueError("x cannot be zero")
    return 10 / x

if __name__ == "__main__":
    with Pool(2) as pool:
        result = pool.apply_async(risky_task, (0,))
        
        try:
            value = result.get()
        except ValueError as e:
            print(f"Caught: {e}")

pool.map() の場合は、いずれかのタスクで例外が発生すると全体の処理が中断されます。

error_callback を使う

apply_async() には、例外発生時に呼ばれるコールバックを指定できます。

from multiprocessing import Pool

def task(x):
    if x < 0:
        raise ValueError("Negative value")
    return x ** 2

def on_error(e):
    print(f"Error callback: {e}")

if __name__ == "__main__":
    with Pool(2) as pool:
        result = pool.apply_async(task, (-1,), error_callback=on_error)
        pool.close()
        pool.join()

error_callback は例外オブジェクトを受け取ります。

複数タスクの例外を収集する

複数のタスクを実行する場合、すべての例外を収集したいことがあります。

from concurrent.futures import ProcessPoolExecutor, as_completed

def process(n):
    if n % 3 == 0:
        raise ValueError(f"Bad value: {n}")
    return n * 2

if __name__ == "__main__":
    errors = []
    results = []
    
    with ProcessPoolExecutor() as executor:
        futures = {executor.submit(process, i): i for i in range(10)}
        
        for future in as_completed(futures):
            n = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                errors.append((n, e))
    
    print(f"Results: {results}")
    print(f"Errors: {errors}")

as_completed() を使うと、完了したタスクから順に結果や例外を取得できます。エラーが発生しても他のタスクは継続して処理されます。