Python の Lock でスレッドを同期する

Lock(ロック)は、複数のスレッドが同じリソースに同時にアクセスすることを防ぐための同期機構です。排他制御を行うことで、データの競合を防ぎます。

なぜ Lock が必要か

複数のスレッドが同じ変数を同時に変更すると、予期しない結果になることがあります。

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"期待値: 500000")
print(f"実際の値: {counter}")  # 500000より小さいことがある

counter += 1 は実際には「読み込み → 加算 → 書き込み」の3ステップで、途中で他のスレッドが割り込む可能性があります。

Lock の基本的な使い方

Lock を使うと、一度に1つのスレッドだけがクリティカルセクションを実行できます。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        lock.acquire()  # ロックを取得
        try:
            counter += 1
        finally:
            lock.release()  # ロックを解放

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"結果: {counter}")  # 必ず 500000 になる

with 文で Lock を使う

Lock はコンテキストマネージャをサポートしているため、with 文で簡潔に書けます。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 自動で acquire/release
            counter += 1

with 文を使えば、例外が発生してもロックが確実に解放されます。

Lock の動作

acquire() でロックを取得

クリティカルセクションを実行

release() でロックを解放

待機中のスレッドがロックを取得

ロックが取得されている間、他のスレッドは acquire() でブロックされます。

ノンブロッキングでの取得

acquire()blocking=False を指定すると、ロックが取得できない場合にすぐに False を返します。

import threading
import time

lock = threading.Lock()

def worker(name):
    if lock.acquire(blocking=False):
        try:
            print(f"{name}: ロック取得成功")
            time.sleep(2)
        finally:
            lock.release()
    else:
        print(f"{name}: ロック取得失敗")

t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))

t1.start()
time.sleep(0.1)
t2.start()

t1.join()
t2.join()

タイムアウト付きの取得

timeout を指定すると、指定秒数だけ待機します。

import threading
import time

lock = threading.Lock()

def worker(name):
    if lock.acquire(timeout=1):
        try:
            print(f"{name}: ロック取得")
            time.sleep(3)
        finally:
            lock.release()
    else:
        print(f"{name}: タイムアウト")

t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))

t1.start()
time.sleep(0.1)
t2.start()

t1.join()
t2.join()

実用例:スレッドセーフなカウンター

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self.value += 1
    
    def decrement(self):
        with self._lock:
            self.value -= 1
    
    def get(self):
        with self._lock:
            return self.value

counter = Counter()

ロックを適切に使うことで、マルチスレッド環境でも安全にデータを共有できます。