Python の multiprocessing モジュールには、プロセス間で排他制御を行うための Lock クラスがあります。複数のプロセスが共有リソースにアクセスする際の競合状態を防ぐために使います。
なぜロックが必要か
複数のプロセスが同じデータを同時に更新すると、競合状態(race condition)が発生します。
from multiprocessing import Process, Value def increment(counter): for _ in range(10000): counter.value += 1 if __name__ == "__main__": counter = Value('i', 0) p1 = Process(target=increment, args=(counter,)) p2 = Process(target=increment, args=(counter,)) p1.start() p2.start() p1.join() p2.join() print(counter.value) # 20000 にならないことが多い
counter.value += 1 は内部的に「読み取り→加算→書き込み」の 3 ステップで実行されます。この間に別のプロセスが割り込むと、更新が失われます。
Lock の基本的な使い方
Lock を使うと、クリティカルセクション(同時に 1 つのプロセスだけが実行すべき部分)を保護できます。
from multiprocessing import Process, Value, Lock def increment(counter, lock): for _ in range(10000): with lock: counter.value += 1 if __name__ == "__main__": counter = Value('i', 0) lock = Lock() p1 = Process(target=increment, args=(counter, lock)) p2 = Process(target=increment, args=(counter, lock)) p1.start() p2.start() p1.join() p2.join() print(counter.value) # 必ず 20000 になる
with lock: でロックを取得し、ブロックを抜けると自動的に解放されます。
acquire と release
with 文を使わない場合は、acquire() と release() を明示的に呼びます。
from multiprocessing import Lock lock = Lock() lock.acquire() # ロックを取得 try: # クリティカルセクション pass finally: lock.release() # ロックを解放
例外が発生してもロックが解放されるように、try-finally で囲むのが安全です。
ノンブロッキングな取得
acquire(block=False) を使うと、ロックが取得できない場合にブロックせずにすぐ戻ります。
from multiprocessing import Process, Lock import time def worker(lock, name): if lock.acquire(block=False): try: print(f"{name} acquired the lock") time.sleep(2) finally: lock.release() else: print(f"{name} couldn't acquire the lock") if __name__ == "__main__": lock = Lock() p1 = Process(target=worker, args=(lock, "Process-1")) p2 = Process(target=worker, args=(lock, "Process-2")) p1.start() time.sleep(0.1) # p1 がロックを取得する時間を確保 p2.start() p1.join() p2.join()
戻り値が True ならロック取得成功、False なら失敗です。
タイムアウト付きの取得
acquire(timeout=秒数) で、指定時間だけ待機できます。
from multiprocessing import Process, Lock import time def worker(lock, name): acquired = lock.acquire(timeout=1) if acquired: try: print(f"{name} acquired the lock") time.sleep(3) finally: lock.release() else: print(f"{name} timed out") if __name__ == "__main__": lock = Lock() p1 = Process(target=worker, args=(lock, "Process-1")) p2 = Process(target=worker, args=(lock, "Process-2")) p1.start() time.sleep(0.1) p2.start() p1.join() p2.join()
タイムアウト内にロックが取得できなければ False が返されます。
RLock(再入可能ロック)
RLock は同じプロセスが複数回取得できるロックです。
Lock
同じプロセスでも 2 回 acquire するとデッドロック。
RLock
同じプロセスなら何度でも acquire できる。release も同じ回数必要。
from multiprocessing import RLock lock = RLock() lock.acquire() lock.acquire() # Lock だとここでデッドロック、RLock は OK lock.release() lock.release()
再帰的な関数でロックを使う場合に便利です。