Python でスレッドの例外を処理する
スレッド内で発生した例外は、デフォルトではメインスレッドに伝播しません。適切に例外を処理しないと、エラーが見逃されてしまいます。
スレッドの例外はメインに伝播しない
スレッド内で例外が発生しても、メインスレッドの try-except ではキャッチできません。
import threading
def buggy_task():
raise ValueError("エラーが発生!")
thread = threading.Thread(target=buggy_task)
thread.start()
thread.join()
print("メインスレッドは正常に終了")
# ValueError の情報はコンソールに表示されるが、メインでは捕捉できないスレッド内で try-except を使う
最も基本的な方法は、スレッド内で例外を処理することです。
import threading
def safe_task():
try:
raise ValueError("エラーが発生!")
except Exception as e:
print(f"スレッド内でキャッチ: {e}")
thread = threading.Thread(target=safe_task)
thread.start()
thread.join()例外を呼び出し側に渡す
結果や例外を格納する変数を用意して、後で確認する方法です。
import threading
class TaskRunner:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
self.result = None
self.exception = None
def run(self):
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
self.exception = e
def divide(a, b):
return a / b
runner = TaskRunner(divide, 10, 0)
thread = threading.Thread(target=runner.run)
thread.start()
thread.join()
if runner.exception:
print(f"例外が発生: {runner.exception}")
else:
print(f"結果: {runner.result}")ThreadPoolExecutor を使う(推奨)
ThreadPoolExecutor は例外処理が組み込まれており、Future.result() で例外を再送出できます。
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n == 0:
raise ValueError("ゼロは許可されていません")
return 100 / n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, i) for i in range(3)]
for future in futures:
try:
result = future.result()
print(f"結果: {result}")
except ValueError as e:
print(f"エラー: {e}")exception() メソッド
Future.exception() を使うと、例外を再送出せずに取得できます。
from concurrent.futures import ThreadPoolExecutor
def task():
raise RuntimeError("問題発生")
with ThreadPoolExecutor() as executor:
future = executor.submit(task)
exception = future.exception() # 例外を取得(再送出しない)
if exception:
print(f"例外が発生: {type(exception).__name__}: {exception}")sys.excepthook をカスタマイズ
スレッドの未処理例外をグローバルにフックすることもできます。
import threading
import sys
def thread_exception_handler(args):
print(f"スレッド例外をキャッチ:")
print(f" スレッド: {args.thread.name}")
print(f" 例外: {args.exc_type.__name__}: {args.exc_value}")
threading.excepthook = thread_exception_handler
def buggy_task():
raise ValueError("テストエラー")
thread = threading.Thread(target=buggy_task, name="BuggyThread")
thread.start()
thread.join()Python 3.8 以降で threading.excepthook が使えます。
実用例:エラーを収集する
複数スレッドで発生したエラーを収集する例です。
import threading
from queue import Queue
error_queue = Queue()
def worker(n):
try:
if n % 2 == 0:
raise ValueError(f"偶数エラー: {n}")
print(f"Worker {n}: 成功")
except Exception as e:
error_queue.put((n, e))
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
# エラーを確認
while not error_queue.empty():
n, error = error_queue.get()
print(f"Worker {n} でエラー: {error}")まとめ
推奨
ThreadPoolExecutor を使う。Future.result() で例外を処理。
代替策
スレッド内で try-except。結果/例外を格納するラッパーを使う。threading.excepthook でグローバルに処理。
スレッドの例外処理を適切に行うことで、エラーの見逃しを防げます。



