Python のプロセス間で Lock を使う
Python の multiprocessing モジュールには、プロセス間で排他制御を行うための Lock クラスがあります。複数のプロセスが共有リソースにアクセスする際の競合状態を防ぐために使います。
なぜロックが必要か
複数のプロセスが同じデータを同時に更新すると、競合状態(race condition)が発生します。
from multiprocessing import Process, Value
def increment(counter):
for _ in range(10000):
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0)
p1 = Process(target=increment, args=(counter,))
p2 = Process(target=increment, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(counter.value) # 20000 にならないことが多い
counter.value += 1 は内部的に「読み取り→加算→書き込み」の 3 ステップで実行されます。この間に別のプロセスが割り込むと、更新が失われます。
Lock の基本的な使い方
Lock を使うと、クリティカルセクション(同時に 1 つのプロセスだけが実行すべき部分)を保護できます。
from multiprocessing import Process, Value, Lock
def increment(counter, lock):
for _ in range(10000):
with lock:
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0)
lock = Lock()
p1 = Process(target=increment, args=(counter, lock))
p2 = Process(target=increment, args=(counter, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(counter.value) # 必ず 20000 になる
with lock: でロックを取得し、ブロックを抜けると自動的に解放されます。
acquire と release
with 文を使わない場合は、acquire() と release() を明示的に呼びます。
from multiprocessing import Lock
lock = Lock()
lock.acquire() # ロックを取得
try:
# クリティカルセクション
pass
finally:
lock.release() # ロックを解放
例外が発生してもロックが解放されるように、try-finally で囲むのが安全です。
ノンブロッキングな取得
acquire(block=False) を使うと、ロックが取得できない場合にブロックせずにすぐ戻ります。
from multiprocessing import Process, Lock
import time
def worker(lock, name):
if lock.acquire(block=False):
try:
print(f"{name} acquired the lock")
time.sleep(2)
finally:
lock.release()
else:
print(f"{name} couldn't acquire the lock")
if __name__ == "__main__":
lock = Lock()
p1 = Process(target=worker, args=(lock, "Process-1"))
p2 = Process(target=worker, args=(lock, "Process-2"))
p1.start()
time.sleep(0.1) # p1 がロックを取得する時間を確保
p2.start()
p1.join()
p2.join()
戻り値が True ならロック取得成功、False なら失敗です。
タイムアウト付きの取得
acquire(timeout=秒数) で、指定時間だけ待機できます。
from multiprocessing import Process, Lock
import time
def worker(lock, name):
acquired = lock.acquire(timeout=1)
if acquired:
try:
print(f"{name} acquired the lock")
time.sleep(3)
finally:
lock.release()
else:
print(f"{name} timed out")
if __name__ == "__main__":
lock = Lock()
p1 = Process(target=worker, args=(lock, "Process-1"))
p2 = Process(target=worker, args=(lock, "Process-2"))
p1.start()
time.sleep(0.1)
p2.start()
p1.join()
p2.join()
タイムアウト内にロックが取得できなければ False が返されます。
RLock(再入可能ロック)
RLock は同じプロセスが複数回取得できるロックです。
Lock
同じプロセスでも 2 回 acquire するとデッドロック。
RLock
同じプロセスなら何度でも acquire できる。release も同じ回数必要。
from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire() # Lock だとここでデッドロック、RLock は OK
lock.release()
lock.release()
再帰的な関数でロックを使う場合に便利です。