Python でスレッドの例外を処理する

スレッド内で発生した例外は、デフォルトではメインスレッドに伝播しません。適切に例外を処理しないと、エラーが見逃されてしまいます。

スレッドの例外はメインに伝播しない

スレッド内で例外が発生しても、メインスレッドの try-except ではキャッチできません。

import threading

def buggy_task():
    raise ValueError("エラーが発生!")

thread = threading.Thread(target=buggy_task)
thread.start()
thread.join()

print("メインスレッドは正常に終了")
# ValueError の情報はコンソールに表示されるが、メインでは捕捉できない

スレッド内で try-except を使う

最も基本的な方法は、スレッド内で例外を処理することです。

import threading

def safe_task():
    try:
        raise ValueError("エラーが発生!")
    except Exception as e:
        print(f"スレッド内でキャッチ: {e}")

thread = threading.Thread(target=safe_task)
thread.start()
thread.join()

例外を呼び出し側に渡す

結果や例外を格納する変数を用意して、後で確認する方法です。

import threading

class TaskRunner:
    def __init__(self, func, *args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.result = None
        self.exception = None
    
    def run(self):
        try:
            self.result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            self.exception = e

def divide(a, b):
    return a / b

runner = TaskRunner(divide, 10, 0)
thread = threading.Thread(target=runner.run)
thread.start()
thread.join()

if runner.exception:
    print(f"例外が発生: {runner.exception}")
else:
    print(f"結果: {runner.result}")

ThreadPoolExecutor を使う(推奨)

ThreadPoolExecutor は例外処理が組み込まれており、Future.result() で例外を再送出できます。

from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
    if n == 0:
        raise ValueError("ゼロは許可されていません")
    return 100 / n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(risky_task, i) for i in range(3)]
    
    for future in futures:
        try:
            result = future.result()
            print(f"結果: {result}")
        except ValueError as e:
            print(f"エラー: {e}")

exception() メソッド

Future.exception() を使うと、例外を再送出せずに取得できます。

from concurrent.futures import ThreadPoolExecutor

def task():
    raise RuntimeError("問題発生")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task)
    
    exception = future.exception()  # 例外を取得(再送出しない)
    if exception:
        print(f"例外が発生: {type(exception).__name__}: {exception}")

sys.excepthook をカスタマイズ

スレッドの未処理例外をグローバルにフックすることもできます。

import threading
import sys

def thread_exception_handler(args):
    print(f"スレッド例外をキャッチ:")
    print(f"  スレッド: {args.thread.name}")
    print(f"  例外: {args.exc_type.__name__}: {args.exc_value}")

threading.excepthook = thread_exception_handler

def buggy_task():
    raise ValueError("テストエラー")

thread = threading.Thread(target=buggy_task, name="BuggyThread")
thread.start()
thread.join()

Python 3.8 以降で threading.excepthook が使えます。

実用例:エラーを収集する

複数スレッドで発生したエラーを収集する例です。

import threading
from queue import Queue

error_queue = Queue()

def worker(n):
    try:
        if n % 2 == 0:
            raise ValueError(f"偶数エラー: {n}")
        print(f"Worker {n}: 成功")
    except Exception as e:
        error_queue.put((n, e))

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# エラーを確認
while not error_queue.empty():
    n, error = error_queue.get()
    print(f"Worker {n} でエラー: {error}")

まとめ

推奨

ThreadPoolExecutor を使う。Future.result() で例外を処理。

代替策

スレッド内で try-except。結果/例外を格納するラッパーを使う。threading.excepthook でグローバルに処理。

スレッドの例外処理を適切に行うことで、エラーの見逃しを防げます。