Python の Condition で待機と通知を行う
Condition(条件変数)は、スレッド間で「待機」と「通知」を行うための同期機構です。あるスレッドが条件が満たされるまで待機し、別のスレッドがその条件を満たしたことを通知できます。
Condition の基本
Condition は wait() で待機し、notify() または notify_all() で通知します。
import threading
import time
condition = threading.Condition()
data_ready = False
def consumer():
with condition:
print("Consumer: データを待機中...")
while not data_ready:
condition.wait() # 通知が来るまで待機
print("Consumer: データを受信")
def producer():
global data_ready
time.sleep(2) # データ準備に時間がかかる
with condition:
print("Producer: データ準備完了")
data_ready = True
condition.notify() # 待機中のスレッドに通知
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
t1.join()
t2.join()wait() と notify() の動作
Consumer: wait() でロックを解放して待機
Producer: ロックを取得して処理
Producer: notify() で通知してロックを解放
Consumer: ロックを再取得して処理を継続
wait() を呼ぶと、自動的にロックが解放され、他のスレッドがロックを取得できるようになります。
なぜ while ループが必要か
wait() は偽の起床(spurious wakeup)が発生する可能性があるため、条件を while でチェックする必要があります。
# 悪い例
with condition:
if not data_ready: # if だと偽の起床で問題になる
condition.wait()
process(data)
# 良い例
with condition:
while not data_ready: # while で再チェック
condition.wait()
process(data)notify() と notify_all()
notify()
待機中のスレッドを1つだけ起こす。
notify_all()
待機中のすべてのスレッドを起こす。
import threading
import time
condition = threading.Condition()
ready = False
def waiter(name):
with condition:
print(f"{name}: 待機開始")
while not ready:
condition.wait()
print(f"{name}: 起床")
def broadcaster():
global ready
time.sleep(1)
with condition:
ready = True
condition.notify_all() # 全員を起こす
threads = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(3)]
threads.append(threading.Thread(target=broadcaster))
for t in threads:
t.start()
for t in threads:
t.join()wait() のタイムアウト
wait() にタイムアウトを指定できます。
import threading
condition = threading.Condition()
def waiter():
with condition:
print("待機開始")
result = condition.wait(timeout=2) # 最大2秒待つ
if result:
print("通知を受信")
else:
print("タイムアウト")
thread = threading.Thread(target=waiter)
thread.start()
thread.join()実用例:Producer-Consumer パターン
複数のプロデューサーとコンシューマーが協調して動作する例です。
import threading
import time
import random
condition = threading.Condition()
queue = []
MAX_SIZE = 5
def producer(name):
for i in range(3):
time.sleep(random.uniform(0.1, 0.5))
with condition:
while len(queue) >= MAX_SIZE:
print(f"{name}: キューが満杯、待機")
condition.wait()
item = f"{name}-{i}"
queue.append(item)
print(f"{name}: {item} を追加 (キュー: {len(queue)})")
condition.notify()
def consumer(name):
for _ in range(3):
with condition:
while not queue:
print(f"{name}: キューが空、待機")
condition.wait()
item = queue.pop(0)
print(f"{name}: {item} を消費 (キュー: {len(queue)})")
condition.notify()
time.sleep(random.uniform(0.1, 0.3))
threads = [
threading.Thread(target=producer, args=("P1",)),
threading.Thread(target=producer, args=("P2",)),
threading.Thread(target=consumer, args=("C1",)),
threading.Thread(target=consumer, args=("C2",)),
]
for t in threads:
t.start()
for t in threads:
t.join()Condition は、スレッド間で複雑な協調動作が必要な場合に非常に有用です。



