Python でプロセスの例外を処理する
Python のマルチプロセスでは、子プロセスで発生した例外は親プロセスには直接伝わりません。例外を適切に処理するには、いくつかの方法を使い分ける必要があります。
子プロセスの例外は親に伝わらない
子プロセスで例外が発生しても、親プロセスは何も知りません。
from multiprocessing import Process
def buggy_task():
raise ValueError("Something went wrong")
if __name__ == "__main__":
p = Process(target=buggy_task)
p.start()
p.join()
print(f"Exit code: {p.exitcode}") # 1(異常終了)
例外が発生するとプロセスは終了し、exitcode が 1 になります。しかし、例外の内容は親プロセスには伝わりません。
Queue を使って例外を伝播する
Queue を使って、例外情報を親プロセスに送ることができます。
from multiprocessing import Process, Queue
import traceback
def worker(q):
try:
# 何かの処理
raise ValueError("Error in worker")
except Exception as e:
q.put(("error", e, traceback.format_exc()))
else:
q.put(("success", None, None))
if __name__ == "__main__":
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
p.join()
status, error, tb = q.get()
if status == "error":
print(f"Error: {error}")
print(tb)
traceback.format_exc() でスタックトレースも取得できます。
ProcessPoolExecutor での例外処理
ProcessPoolExecutor を使う場合、Future.result() を呼ぶと例外が再送出されます。
from concurrent.futures import ProcessPoolExecutor
def divide(x, y):
return x / y
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
future = executor.submit(divide, 10, 0)
try:
result = future.result()
except ZeroDivisionError as e:
print(f"Caught: {e}")
子プロセスで例外発生
Future に例外が記録される
result() を呼ぶと例外が再送出
親プロセスで捕捉できる
Pool での例外処理
Pool.apply_async() の場合も、get() で例外が再送出されます。
from multiprocessing import Pool
def risky_task(x):
if x == 0:
raise ValueError("x cannot be zero")
return 10 / x
if __name__ == "__main__":
with Pool(2) as pool:
result = pool.apply_async(risky_task, (0,))
try:
value = result.get()
except ValueError as e:
print(f"Caught: {e}")
pool.map() の場合は、いずれかのタスクで例外が発生すると全体の処理が中断されます。
error_callback を使う
apply_async() には、例外発生時に呼ばれるコールバックを指定できます。
from multiprocessing import Pool
def task(x):
if x < 0:
raise ValueError("Negative value")
return x ** 2
def on_error(e):
print(f"Error callback: {e}")
if __name__ == "__main__":
with Pool(2) as pool:
result = pool.apply_async(task, (-1,), error_callback=on_error)
pool.close()
pool.join()
error_callback は例外オブジェクトを受け取ります。
複数タスクの例外を収集する
複数のタスクを実行する場合、すべての例外を収集したいことがあります。
from concurrent.futures import ProcessPoolExecutor, as_completed
def process(n):
if n % 3 == 0:
raise ValueError(f"Bad value: {n}")
return n * 2
if __name__ == "__main__":
errors = []
results = []
with ProcessPoolExecutor() as executor:
futures = {executor.submit(process, i): i for i in range(10)}
for future in as_completed(futures):
n = futures[future]
try:
results.append(future.result())
except Exception as e:
errors.append((n, e))
print(f"Results: {results}")
print(f"Errors: {errors}")
as_completed() を使うと、完了したタスクから順に結果や例外を取得できます。エラーが発生しても他のタスクは継続して処理されます。