Python のデッドロックを防ぐ
デッドロックは、複数のスレッドが互いにロックの解放を待ち合って、永遠に処理が進まなくなる状態です。マルチスレッドプログラミングで最も厄介なバグの1つです。
デッドロックとは
2つのスレッドが、互いが持っているロックを待ち合うと、どちらも進めなくなります。
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread1():
with lock_a:
print("Thread1: lock_a を取得")
time.sleep(0.1) # タイミングをずらす
print("Thread1: lock_b を待機")
with lock_b: # lock_b を待つ
print("Thread1: 両方取得")
def thread2():
with lock_b:
print("Thread2: lock_b を取得")
time.sleep(0.1)
print("Thread2: lock_a を待機")
with lock_a: # lock_a を待つ(デッドロック!)
print("Thread2: 両方取得")
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
# このプログラムは永遠に終わらないデッドロックの4条件
デッドロックは以下の4条件がすべて揃うと発生します。
相互排他
リソースは一度に1つのスレッドしか使えない。
保持と待機
スレッドがリソースを保持しながら、他のリソースを待つ。
横取り不可
スレッドが持つリソースを強制的に奪えない。
循環待機
スレッドが循環的に互いのリソースを待つ。
対策1:ロックの順序を統一する
すべてのスレッドが同じ順序でロックを取得すれば、循環待機を防げます。
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread1():
with lock_a: # 常に lock_a を先に取得
print("Thread1: lock_a を取得")
time.sleep(0.1)
with lock_b:
print("Thread1: lock_b を取得")
def thread2():
with lock_a: # 同じ順序で lock_a を先に取得
print("Thread2: lock_a を取得")
time.sleep(0.1)
with lock_b:
print("Thread2: lock_b を取得")
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()
print("正常終了")対策2:タイムアウトを使う
ロック取得にタイムアウトを設定し、取得できなければ諦めて再試行します。
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def acquire_locks():
while True:
acquired_a = lock_a.acquire(timeout=0.1)
if acquired_a:
acquired_b = lock_b.acquire(timeout=0.1)
if acquired_b:
return True # 両方取得成功
lock_a.release() # lock_a を解放して再試行
time.sleep(0.01) # 少し待って再試行
def worker(name):
if acquire_locks():
try:
print(f"{name}: 両方のロックを取得")
time.sleep(0.1)
finally:
lock_b.release()
lock_a.release()
threads = [threading.Thread(target=worker, args=(f"Thread-{i}",)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()対策3:RLock を使う(同じスレッドの再入)
同じスレッドからの再入によるデッドロックは RLock で防げます。
import threading
# Lock だとデッドロック
lock = threading.Lock()
def outer():
with lock:
inner() # 同じスレッドでもデッドロック
def inner():
with lock: # ここでデッドロック
print("inner")
# RLock なら問題ない
rlock = threading.RLock()
def outer_safe():
with rlock:
inner_safe()
def inner_safe():
with rlock: # 同じスレッドなら取得可能
print("inner_safe")
outer_safe() # 正常動作対策4:ロックを最小限にする
ロックの範囲と期間を最小限にすることで、デッドロックのリスクを減らせます。
import threading
lock = threading.Lock()
data = []
# 悪い例:ロック範囲が広い
def bad_append(item):
with lock:
processed = heavy_processing(item) # ロック中に重い処理
data.append(processed)
# 良い例:ロック範囲を最小限に
def good_append(item):
processed = heavy_processing(item) # ロック外で処理
with lock:
data.append(processed) # データアクセスだけロック対策5:デッドロック検出
デバッグ時にデッドロックを検出するには、スレッドダンプを確認します。
import threading
import sys
import traceback
def dump_threads():
print("\n=== Thread Dump ===")
for thread_id, frame in sys._current_frames().items():
print(f"\nThread {thread_id}:")
traceback.print_stack(frame)
# シグナルでダンプを取る設定も可能
import signal
def handler(signum, frame):
dump_threads()
signal.signal(signal.SIGUSR1, handler)まとめ
| 対策 | 説明 |
|---|---|
| ロック順序の統一 | すべてのスレッドで同じ順序でロックを取得 |
| タイムアウト | ロック取得に時間制限を設けて再試行 |
| RLock | 同じスレッドからの再入を許可 |
| ロック範囲の最小化 | ロックを保持する時間と範囲を減らす |
デッドロックは発見が難しいバグです。設計段階から予防策を講じることが重要です。



