Python のデッドロックを防ぐ

デッドロックは、複数のスレッドが互いにロックの解放を待ち合って、永遠に処理が進まなくなる状態です。マルチスレッドプログラミングで最も厄介なバグの1つです。

デッドロックとは

2つのスレッドが、互いが持っているロックを待ち合うと、どちらも進めなくなります。

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread1():
    with lock_a:
        print("Thread1: lock_a を取得")
        time.sleep(0.1)  # タイミングをずらす
        print("Thread1: lock_b を待機")
        with lock_b:  # lock_b を待つ
            print("Thread1: 両方取得")

def thread2():
    with lock_b:
        print("Thread2: lock_b を取得")
        time.sleep(0.1)
        print("Thread2: lock_a を待機")
        with lock_a:  # lock_a を待つ(デッドロック!)
            print("Thread2: 両方取得")

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
# このプログラムは永遠に終わらない

デッドロックの4条件

デッドロックは以下の4条件がすべて揃うと発生します。

相互排他

リソースは一度に1つのスレッドしか使えない。

保持と待機

スレッドがリソースを保持しながら、他のリソースを待つ。

横取り不可

スレッドが持つリソースを強制的に奪えない。

循環待機

スレッドが循環的に互いのリソースを待つ。

対策1:ロックの順序を統一する

すべてのスレッドが同じ順序でロックを取得すれば、循環待機を防げます。

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread1():
    with lock_a:  # 常に lock_a を先に取得
        print("Thread1: lock_a を取得")
        time.sleep(0.1)
        with lock_b:
            print("Thread1: lock_b を取得")

def thread2():
    with lock_a:  # 同じ順序で lock_a を先に取得
        print("Thread2: lock_a を取得")
        time.sleep(0.1)
        with lock_b:
            print("Thread2: lock_b を取得")

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()
print("正常終了")

対策2:タイムアウトを使う

ロック取得にタイムアウトを設定し、取得できなければ諦めて再試行します。

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def acquire_locks():
    while True:
        acquired_a = lock_a.acquire(timeout=0.1)
        if acquired_a:
            acquired_b = lock_b.acquire(timeout=0.1)
            if acquired_b:
                return True  # 両方取得成功
            lock_a.release()  # lock_a を解放して再試行
        time.sleep(0.01)  # 少し待って再試行

def worker(name):
    if acquire_locks():
        try:
            print(f"{name}: 両方のロックを取得")
            time.sleep(0.1)
        finally:
            lock_b.release()
            lock_a.release()

threads = [threading.Thread(target=worker, args=(f"Thread-{i}",)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

対策3:RLock を使う(同じスレッドの再入)

同じスレッドからの再入によるデッドロックは RLock で防げます。

import threading

# Lock だとデッドロック
lock = threading.Lock()

def outer():
    with lock:
        inner()  # 同じスレッドでもデッドロック

def inner():
    with lock:  # ここでデッドロック
        print("inner")

# RLock なら問題ない
rlock = threading.RLock()

def outer_safe():
    with rlock:
        inner_safe()

def inner_safe():
    with rlock:  # 同じスレッドなら取得可能
        print("inner_safe")

outer_safe()  # 正常動作

対策4:ロックを最小限にする

ロックの範囲と期間を最小限にすることで、デッドロックのリスクを減らせます。

import threading

lock = threading.Lock()
data = []

# 悪い例:ロック範囲が広い
def bad_append(item):
    with lock:
        processed = heavy_processing(item)  # ロック中に重い処理
        data.append(processed)

# 良い例:ロック範囲を最小限に
def good_append(item):
    processed = heavy_processing(item)  # ロック外で処理
    with lock:
        data.append(processed)  # データアクセスだけロック

対策5:デッドロック検出

デバッグ時にデッドロックを検出するには、スレッドダンプを確認します。

import threading
import sys
import traceback

def dump_threads():
    print("\n=== Thread Dump ===")
    for thread_id, frame in sys._current_frames().items():
        print(f"\nThread {thread_id}:")
        traceback.print_stack(frame)

# シグナルでダンプを取る設定も可能
import signal

def handler(signum, frame):
    dump_threads()

signal.signal(signal.SIGUSR1, handler)

まとめ

対策説明
ロック順序の統一すべてのスレッドで同じ順序でロックを取得
タイムアウトロック取得に時間制限を設けて再試行
RLock同じスレッドからの再入を許可
ロック範囲の最小化ロックを保持する時間と範囲を減らす

デッドロックは発見が難しいバグです。設計段階から予防策を講じることが重要です。