Python の Condition で待機と通知を行う

Condition(条件変数)は、スレッド間で「待機」と「通知」を行うための同期機構です。あるスレッドが条件が満たされるまで待機し、別のスレッドがその条件を満たしたことを通知できます。

Condition の基本

Conditionwait() で待機し、notify() または notify_all() で通知します。

import threading
import time

condition = threading.Condition()
data_ready = False

def consumer():
    with condition:
        print("Consumer: データを待機中...")
        while not data_ready:
            condition.wait()  # 通知が来るまで待機
        print("Consumer: データを受信")

def producer():
    global data_ready
    time.sleep(2)  # データ準備に時間がかかる
    with condition:
        print("Producer: データ準備完了")
        data_ready = True
        condition.notify()  # 待機中のスレッドに通知

t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)

t1.start()
t2.start()
t1.join()
t2.join()

wait() と notify() の動作

Consumer: wait() でロックを解放して待機

Producer: ロックを取得して処理

Producer: notify() で通知してロックを解放

Consumer: ロックを再取得して処理を継続

wait() を呼ぶと、自動的にロックが解放され、他のスレッドがロックを取得できるようになります。

なぜ while ループが必要か

wait() は偽の起床(spurious wakeup)が発生する可能性があるため、条件を while でチェックする必要があります。

# 悪い例
with condition:
    if not data_ready:  # if だと偽の起床で問題になる
        condition.wait()
    process(data)

# 良い例
with condition:
    while not data_ready:  # while で再チェック
        condition.wait()
    process(data)

notify() と notify_all()

notify()

待機中のスレッドを1つだけ起こす。

notify_all()

待機中のすべてのスレッドを起こす。

import threading
import time

condition = threading.Condition()
ready = False

def waiter(name):
    with condition:
        print(f"{name}: 待機開始")
        while not ready:
            condition.wait()
        print(f"{name}: 起床")

def broadcaster():
    global ready
    time.sleep(1)
    with condition:
        ready = True
        condition.notify_all()  # 全員を起こす

threads = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(3)]
threads.append(threading.Thread(target=broadcaster))

for t in threads:
    t.start()
for t in threads:
    t.join()

wait() のタイムアウト

wait() にタイムアウトを指定できます。

import threading

condition = threading.Condition()

def waiter():
    with condition:
        print("待機開始")
        result = condition.wait(timeout=2)  # 最大2秒待つ
        if result:
            print("通知を受信")
        else:
            print("タイムアウト")

thread = threading.Thread(target=waiter)
thread.start()
thread.join()

実用例:Producer-Consumer パターン

複数のプロデューサーとコンシューマーが協調して動作する例です。

import threading
import time
import random

condition = threading.Condition()
queue = []
MAX_SIZE = 5

def producer(name):
    for i in range(3):
        time.sleep(random.uniform(0.1, 0.5))
        with condition:
            while len(queue) >= MAX_SIZE:
                print(f"{name}: キューが満杯、待機")
                condition.wait()
            item = f"{name}-{i}"
            queue.append(item)
            print(f"{name}: {item} を追加 (キュー: {len(queue)})")
            condition.notify()

def consumer(name):
    for _ in range(3):
        with condition:
            while not queue:
                print(f"{name}: キューが空、待機")
                condition.wait()
            item = queue.pop(0)
            print(f"{name}: {item} を消費 (キュー: {len(queue)})")
            condition.notify()
        time.sleep(random.uniform(0.1, 0.3))

threads = [
    threading.Thread(target=producer, args=("P1",)),
    threading.Thread(target=producer, args=("P2",)),
    threading.Thread(target=consumer, args=("C1",)),
    threading.Thread(target=consumer, args=("C2",)),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Condition は、スレッド間で複雑な協調動作が必要な場合に非常に有用です。