Python のプロセス間で Lock を使う

Python の multiprocessing モジュールには、プロセス間で排他制御を行うための Lock クラスがあります。複数のプロセスが共有リソースにアクセスする際の競合状態を防ぐために使います。

なぜロックが必要か

複数のプロセスが同じデータを同時に更新すると、競合状態(race condition)が発生します。

from multiprocessing import Process, Value

def increment(counter):
    for _ in range(10000):
        counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)
    
    p1 = Process(target=increment, args=(counter,))
    p2 = Process(target=increment, args=(counter,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(counter.value)  # 20000 にならないことが多い

counter.value += 1 は内部的に「読み取り→加算→書き込み」の 3 ステップで実行されます。この間に別のプロセスが割り込むと、更新が失われます。

Lock の基本的な使い方

Lock を使うと、クリティカルセクション(同時に 1 つのプロセスだけが実行すべき部分)を保護できます。

from multiprocessing import Process, Value, Lock

def increment(counter, lock):
    for _ in range(10000):
        with lock:
            counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)
    lock = Lock()
    
    p1 = Process(target=increment, args=(counter, lock))
    p2 = Process(target=increment, args=(counter, lock))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(counter.value)  # 必ず 20000 になる

with lock: でロックを取得し、ブロックを抜けると自動的に解放されます。

acquire と release

with 文を使わない場合は、acquire()release() を明示的に呼びます。

from multiprocessing import Lock

lock = Lock()

lock.acquire()  # ロックを取得
try:
    # クリティカルセクション
    pass
finally:
    lock.release()  # ロックを解放

例外が発生してもロックが解放されるように、try-finally で囲むのが安全です。

ノンブロッキングな取得

acquire(block=False) を使うと、ロックが取得できない場合にブロックせずにすぐ戻ります。

from multiprocessing import Process, Lock
import time

def worker(lock, name):
    if lock.acquire(block=False):
        try:
            print(f"{name} acquired the lock")
            time.sleep(2)
        finally:
            lock.release()
    else:
        print(f"{name} couldn't acquire the lock")

if __name__ == "__main__":
    lock = Lock()
    
    p1 = Process(target=worker, args=(lock, "Process-1"))
    p2 = Process(target=worker, args=(lock, "Process-2"))
    
    p1.start()
    time.sleep(0.1)  # p1 がロックを取得する時間を確保
    p2.start()
    
    p1.join()
    p2.join()

戻り値が True ならロック取得成功、False なら失敗です。

タイムアウト付きの取得

acquire(timeout=秒数) で、指定時間だけ待機できます。

from multiprocessing import Process, Lock
import time

def worker(lock, name):
    acquired = lock.acquire(timeout=1)
    if acquired:
        try:
            print(f"{name} acquired the lock")
            time.sleep(3)
        finally:
            lock.release()
    else:
        print(f"{name} timed out")

if __name__ == "__main__":
    lock = Lock()
    
    p1 = Process(target=worker, args=(lock, "Process-1"))
    p2 = Process(target=worker, args=(lock, "Process-2"))
    
    p1.start()
    time.sleep(0.1)
    p2.start()
    
    p1.join()
    p2.join()

タイムアウト内にロックが取得できなければ False が返されます。

RLock(再入可能ロック)

RLock は同じプロセスが複数回取得できるロックです。

Lock

同じプロセスでも 2 回 acquire するとデッドロック。

RLock

同じプロセスなら何度でも acquire できる。release も同じ回数必要。

from multiprocessing import RLock

lock = RLock()

lock.acquire()
lock.acquire()  # Lock だとここでデッドロック、RLock は OK
lock.release()
lock.release()

再帰的な関数でロックを使う場合に便利です。