Python のマルチプロセスでは、子プロセスで発生した例外は親プロセスには直接伝わりません。例外を適切に処理するには、いくつかの方法を使い分ける必要があります。
子プロセスの例外は親に伝わらない
子プロセスで例外が発生しても、親プロセスは何も知りません。
from multiprocessing import Process def buggy_task(): raise ValueError("Something went wrong") if __name__ == "__main__": p = Process(target=buggy_task) p.start() p.join() print(f"Exit code: {p.exitcode}") # 1(異常終了)
例外が発生するとプロセスは終了し、exitcode が 1 になります。しかし、例外の内容は親プロセスには伝わりません。
Queue を使って例外を伝播する
Queue を使って、例外情報を親プロセスに送ることができます。
from multiprocessing import Process, Queue import traceback def worker(q): try: # 何かの処理 raise ValueError("Error in worker") except Exception as e: q.put(("error", e, traceback.format_exc())) else: q.put(("success", None, None)) if __name__ == "__main__": q = Queue() p = Process(target=worker, args=(q,)) p.start() p.join() status, error, tb = q.get() if status == "error": print(f"Error: {error}") print(tb)
traceback.format_exc() でスタックトレースも取得できます。
ProcessPoolExecutor での例外処理
ProcessPoolExecutor を使う場合、Future.result() を呼ぶと例外が再送出されます。
from concurrent.futures import ProcessPoolExecutor def divide(x, y): return x / y if __name__ == "__main__": with ProcessPoolExecutor() as executor: future = executor.submit(divide, 10, 0) try: result = future.result() except ZeroDivisionError as e: print(f"Caught: {e}")
子プロセスで例外発生
Future に例外が記録される
result() を呼ぶと例外が再送出
親プロセスで捕捉できる
Pool での例外処理
Pool.apply_async() の場合も、get() で例外が再送出されます。
from multiprocessing import Pool def risky_task(x): if x == 0: raise ValueError("x cannot be zero") return 10 / x if __name__ == "__main__": with Pool(2) as pool: result = pool.apply_async(risky_task, (0,)) try: value = result.get() except ValueError as e: print(f"Caught: {e}")
pool.map() の場合は、いずれかのタスクで例外が発生すると全体の処理が中断されます。
error_callback を使う
apply_async() には、例外発生時に呼ばれるコールバックを指定できます。
from multiprocessing import Pool def task(x): if x < 0: raise ValueError("Negative value") return x ** 2 def on_error(e): print(f"Error callback: {e}") if __name__ == "__main__": with Pool(2) as pool: result = pool.apply_async(task, (-1,), error_callback=on_error) pool.close() pool.join()
error_callback は例外オブジェクトを受け取ります。
複数タスクの例外を収集する
複数のタスクを実行する場合、すべての例外を収集したいことがあります。
from concurrent.futures import ProcessPoolExecutor, as_completed def process(n): if n % 3 == 0: raise ValueError(f"Bad value: {n}") return n * 2 if __name__ == "__main__": errors = [] results = [] with ProcessPoolExecutor() as executor: futures = {executor.submit(process, i): i for i in range(10)} for future in as_completed(futures): n = futures[future] try: results.append(future.result()) except Exception as e: errors.append((n, e)) print(f"Results: {results}") print(f"Errors: {errors}")
as_completed() を使うと、完了したタスクから順に結果や例外を取得できます。エラーが発生しても他のタスクは継続して処理されます。