Python のマルチプロセスでも、マルチスレッドと同様にデッドロックが発生する可能性があります。複数のプロセスがロックを取り合ったり、リソースを待ち合ったりすると、プログラムが永久に停止してしまいます。
デッドロックとは
デッドロックは、複数のプロセスが互いにリソースの解放を待ち合い、どれも進行できなくなる状態です。
from multiprocessing import Process, Lock import time def task1(lock_a, lock_b): with lock_a: print("Task1: Got lock_a") time.sleep(0.5) with lock_b: # lock_b を待つ print("Task1: Got lock_b") def task2(lock_a, lock_b): with lock_b: print("Task2: Got lock_b") time.sleep(0.5) with lock_a: # lock_a を待つ print("Task2: Got lock_a") if __name__ == "__main__": lock_a = Lock() lock_b = Lock() p1 = Process(target=task1, args=(lock_a, lock_b)) p2 = Process(target=task2, args=(lock_a, lock_b)) p1.start() p2.start() p1.join() p2.join() # デッドロックで永久に終わらない
task1 は lock_a を取得して lock_b を待ち、task2 は lock_b を取得して lock_a を待ちます。どちらも相手のロックを待ち続けるため、デッドロックが発生します。
対策 1: ロックの順序を統一する
すべてのプロセスが同じ順序でロックを取得するようにします。
from multiprocessing import Process, Lock import time def task1(lock_a, lock_b): with lock_a: print("Task1: Got lock_a") time.sleep(0.5) with lock_b: print("Task1: Got lock_b") def task2(lock_a, lock_b): with lock_a: # lock_a を先に取得 print("Task2: Got lock_a") time.sleep(0.5) with lock_b: print("Task2: Got lock_b") if __name__ == "__main__": lock_a = Lock() lock_b = Lock() p1 = Process(target=task1, args=(lock_a, lock_b)) p2 = Process(target=task2, args=(lock_a, lock_b)) p1.start() p2.start() p1.join() p2.join()
両方のタスクが lock_a → lock_b の順で取得するため、デッドロックは発生しません。
対策 2: タイムアウト付きでロックを取得する
acquire(timeout=秒数) を使って、一定時間でロック取得を諦めます。
from multiprocessing import Process, Lock import time def safe_task(lock_a, lock_b, name): while True: if lock_a.acquire(timeout=1): try: if lock_b.acquire(timeout=1): try: print(f"{name}: Got both locks") return finally: lock_b.release() finally: lock_a.release() print(f"{name}: Retrying...") time.sleep(0.1) if __name__ == "__main__": lock_a = Lock() lock_b = Lock() p1 = Process(target=safe_task, args=(lock_a, lock_b, "Task1")) p2 = Process(target=safe_task, args=(lock_a, lock_b, "Task2")) p1.start() p2.start() p1.join() p2.join()
ロックが取得できなければリトライすることで、デッドロックを回避できます。
対策 3: ロックをまとめて取得する
複数のロックを必要とする処理では、すべてのロックを一度に取得する方法があります。
from multiprocessing import Process, Lock import time def acquire_all(locks, timeout=5): """すべてのロックを取得する。失敗したらすべて解放""" acquired = [] deadline = time.time() + timeout for lock in locks: remaining = deadline - time.time() if remaining <= 0 or not lock.acquire(timeout=remaining): # 取得済みのロックをすべて解放 for l in acquired: l.release() return False acquired.append(lock) return True def release_all(locks): for lock in locks: lock.release() def task(locks, name): if acquire_all(locks): try: print(f"{name}: Got all locks") time.sleep(1) finally: release_all(locks) else: print(f"{name}: Failed to acquire locks") if __name__ == "__main__": locks = [Lock() for _ in range(3)] p1 = Process(target=task, args=(locks, "Task1")) p2 = Process(target=task, args=(locks, "Task2")) p1.start() p2.start() p1.join() p2.join()
Queue によるデッドロック
ロック以外にも、Queue の使い方によってはデッドロックが発生します。
from multiprocessing import Process, Queue def producer(q): for i in range(1000000): q.put(i) # キューが満杯になるとブロック if __name__ == "__main__": q = Queue() p = Process(target=producer, args=(q,)) p.start() p.join() # producer が q.put() でブロック → join も永久に待つ
子プロセスがキューに大量のデータを入れる
キューのバッファが満杯になる
子プロセスが put() でブロック
親プロセスが join() でブロック(デッドロック)
join() の前にキューを空にするか、join(timeout) を使ってください。
デッドロック検出の難しさ
デッドロックは発生条件が複雑で、テストで再現しにくいことがあります。設計段階でデッドロックを防ぐことが重要です。
ロックの順序を文書化して統一する
必要最小限のロックだけを使う
可能なら Queue を使って明示的な同期を避ける
タイムアウトを設定して無限待機を防ぐ