Python でプロセス間のデッドロックを防ぐ
Python のマルチプロセスでも、マルチスレッドと同様にデッドロックが発生する可能性があります。複数のプロセスがロックを取り合ったり、リソースを待ち合ったりすると、プログラムが永久に停止してしまいます。
デッドロックとは
デッドロックは、複数のプロセスが互いにリソースの解放を待ち合い、どれも進行できなくなる状態です。
from multiprocessing import Process, Lock
import time
def task1(lock_a, lock_b):
with lock_a:
print("Task1: Got lock_a")
time.sleep(0.5)
with lock_b: # lock_b を待つ
print("Task1: Got lock_b")
def task2(lock_a, lock_b):
with lock_b:
print("Task2: Got lock_b")
time.sleep(0.5)
with lock_a: # lock_a を待つ
print("Task2: Got lock_a")
if __name__ == "__main__":
lock_a = Lock()
lock_b = Lock()
p1 = Process(target=task1, args=(lock_a, lock_b))
p2 = Process(target=task2, args=(lock_a, lock_b))
p1.start()
p2.start()
p1.join()
p2.join()
# デッドロックで永久に終わらない
task1 は lock_a を取得して lock_b を待ち、task2 は lock_b を取得して lock_a を待ちます。どちらも相手のロックを待ち続けるため、デッドロックが発生します。
対策 1: ロックの順序を統一する
すべてのプロセスが同じ順序でロックを取得するようにします。
from multiprocessing import Process, Lock
import time
def task1(lock_a, lock_b):
with lock_a:
print("Task1: Got lock_a")
time.sleep(0.5)
with lock_b:
print("Task1: Got lock_b")
def task2(lock_a, lock_b):
with lock_a: # lock_a を先に取得
print("Task2: Got lock_a")
time.sleep(0.5)
with lock_b:
print("Task2: Got lock_b")
if __name__ == "__main__":
lock_a = Lock()
lock_b = Lock()
p1 = Process(target=task1, args=(lock_a, lock_b))
p2 = Process(target=task2, args=(lock_a, lock_b))
p1.start()
p2.start()
p1.join()
p2.join()
両方のタスクが lock_a → lock_b の順で取得するため、デッドロックは発生しません。
対策 2: タイムアウト付きでロックを取得する
acquire(timeout=秒数) を使って、一定時間でロック取得を諦めます。
from multiprocessing import Process, Lock
import time
def safe_task(lock_a, lock_b, name):
while True:
if lock_a.acquire(timeout=1):
try:
if lock_b.acquire(timeout=1):
try:
print(f"{name}: Got both locks")
return
finally:
lock_b.release()
finally:
lock_a.release()
print(f"{name}: Retrying...")
time.sleep(0.1)
if __name__ == "__main__":
lock_a = Lock()
lock_b = Lock()
p1 = Process(target=safe_task, args=(lock_a, lock_b, "Task1"))
p2 = Process(target=safe_task, args=(lock_a, lock_b, "Task2"))
p1.start()
p2.start()
p1.join()
p2.join()
ロックが取得できなければリトライすることで、デッドロックを回避できます。
対策 3: ロックをまとめて取得する
複数のロックを必要とする処理では、すべてのロックを一度に取得する方法があります。
from multiprocessing import Process, Lock
import time
def acquire_all(locks, timeout=5):
"""すべてのロックを取得する。失敗したらすべて解放"""
acquired = []
deadline = time.time() + timeout
for lock in locks:
remaining = deadline - time.time()
if remaining <= 0 or not lock.acquire(timeout=remaining):
# 取得済みのロックをすべて解放
for l in acquired:
l.release()
return False
acquired.append(lock)
return True
def release_all(locks):
for lock in locks:
lock.release()
def task(locks, name):
if acquire_all(locks):
try:
print(f"{name}: Got all locks")
time.sleep(1)
finally:
release_all(locks)
else:
print(f"{name}: Failed to acquire locks")
if __name__ == "__main__":
locks = [Lock() for _ in range(3)]
p1 = Process(target=task, args=(locks, "Task1"))
p2 = Process(target=task, args=(locks, "Task2"))
p1.start()
p2.start()
p1.join()
p2.join()
Queue によるデッドロック
ロック以外にも、Queue の使い方によってはデッドロックが発生します。
from multiprocessing import Process, Queue
def producer(q):
for i in range(1000000):
q.put(i) # キューが満杯になるとブロック
if __name__ == "__main__":
q = Queue()
p = Process(target=producer, args=(q,))
p.start()
p.join() # producer が q.put() でブロック → join も永久に待つ
子プロセスがキューに大量のデータを入れる
キューのバッファが満杯になる
子プロセスが put() でブロック
親プロセスが join() でブロック(デッドロック)
join() の前にキューを空にするか、join(timeout) を使ってください。
デッドロック検出の難しさ
デッドロックは発生条件が複雑で、テストで再現しにくいことがあります。設計段階でデッドロックを防ぐことが重要です。