Python の with 文でロックを管理する

マルチスレッドプログラミングでは、複数のスレッドが同じリソースにアクセスする際にデータの整合性を保つ必要があります。threading.Lock はコンテキストマネージャとして使えるため、with 文で安全にロック管理ができます。

Lock の基本

Lock はコンテキストマネージャプロトコルを実装しているため、with 文で使えます。

from threading import Lock

lock = Lock()

with lock:
    # ここはクリティカルセクション
    # 同時に1つのスレッドだけが実行できる
    print("排他的な処理")

with ブロックに入るとロックを取得し、ブロックを抜けると自動的にロックを解放します。

with を使わない場合との比較

with 文あり

自動でロック解放。例外発生時も安全。コードが簡潔。

with 文なし

手動で release() が必要。例外時に解放忘れのリスク。コードが冗長。

from threading import Lock

lock = Lock()

# with を使わない場合(危険)
lock.acquire()
try:
    print("処理中")
finally:
    lock.release()

# with を使う場合(安全)
with lock:
    print("処理中")

実用例:スレッドセーフなカウンター

複数のスレッドから安全にアクセスできるカウンターです。

from threading import Thread, Lock
import time

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = Lock()
    
    def increment(self):
        with self.lock:
            current = self.value
            time.sleep(0.001)  # 競合を発生させるための遅延
            self.value = current + 1

counter = Counter()

def worker():
    for _ in range(100):
        counter.increment()

threads = [Thread(target=worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"カウンター: {counter.value}")  # 1000(ロックがないと不正確になる)

RLock(再入可能ロック)

同じスレッドから複数回ロックを取得できる RLockwith 文で使えます。

from threading import RLock

lock = RLock()

def outer():
    with lock:
        print("outer")
        inner()

def inner():
    with lock:  # 同じスレッドなので再取得可能
        print("inner")

outer()

通常の Lock では、同じスレッドが2回ロックを取得しようとするとデッドロックになります。

Semaphore(セマフォ)

同時にアクセスできるスレッド数を制限する Semaphore も同様に使えます。

from threading import Thread, Semaphore
import time

# 同時に3つまでアクセス可能
semaphore = Semaphore(3)

def worker(n):
    with semaphore:
        print(f"Worker {n} 開始")
        time.sleep(1)
        print(f"Worker {n} 終了")

threads = [Thread(target=worker, args=(i,)) for i in range(6)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Condition(条件変数)

スレッド間で条件を待機・通知する Conditionwith 文で使えます。

from threading import Thread, Condition

condition = Condition()
data = []

def consumer():
    with condition:
        while not data:
            condition.wait()  # データが来るまで待機
        print(f"消費: {data.pop()}")

def producer():
    with condition:
        data.append("アイテム")
        condition.notify()  # 待機中のスレッドに通知

Thread(target=consumer).start()
import time; time.sleep(0.1)
Thread(target=producer).start()

threading モジュールの同期プリミティブは、すべてコンテキストマネージャとして使えます。with 文を使うことで、ロックの解放忘れを防ぎ、安全な並行処理が書けます。