Python でプロデューサー・コンシューマーパターンを実装する
プロデューサー・コンシューマーパターンは、データを生成するスレッド(プロデューサー)と、それを消費するスレッド(コンシューマー)が協調して動作する設計パターンです。マルチスレッドプログラミングで最もよく使われるパターンの1つです。
パターンの概要
プロデューサーがデータを生成
キューにデータを追加
コンシューマーがキューから取り出し
データを処理
プロデューサーとコンシューマーは独立して動作し、キューを介して通信します。
基本的な実装
queue.Queue を使った実装です。
from queue import Queue
import threading
import time
import random
def producer(queue, name):
for i in range(5):
item = f"{name}-item-{i}"
time.sleep(random.uniform(0.1, 0.5)) # 生成に時間がかかる
queue.put(item)
print(f"Producer {name}: {item} を追加")
print(f"Producer {name}: 完了")
def consumer(queue, name):
while True:
item = queue.get()
if item is None: # 終了シグナル
break
time.sleep(random.uniform(0.1, 0.3)) # 処理に時間がかかる
print(f"Consumer {name}: {item} を処理")
queue.task_done()
print(f"Consumer {name}: 終了")
queue = Queue()
# プロデューサーとコンシューマーを作成
producers = [threading.Thread(target=producer, args=(queue, f"P{i}")) for i in range(2)]
consumers = [threading.Thread(target=consumer, args=(queue, f"C{i}")) for i in range(3)]
# 開始
for t in producers + consumers:
t.start()
# プロデューサーの終了を待つ
for t in producers:
t.join()
# すべてのアイテムの処理を待つ
queue.join()
# コンシューマーに終了シグナルを送る
for _ in consumers:
queue.put(None)
# コンシューマーの終了を待つ
for t in consumers:
t.join()
print("すべて完了")キューのサイズ制限
Queue にサイズ制限を設けると、プロデューサーが速すぎる場合にブロックされます。
from queue import Queue
import threading
import time
queue = Queue(maxsize=3) # 最大3個まで
def producer():
for i in range(10):
item = f"item-{i}"
queue.put(item) # キューが満杯ならブロック
print(f"Producer: {item} を追加 (size={queue.qsize()})")
def consumer():
for _ in range(10):
time.sleep(0.5) # 処理が遅い
item = queue.get()
print(f"Consumer: {item} を処理")
queue.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()複数プロデューサー・複数コンシューマー
大規模なシステムでは、複数のプロデューサーとコンシューマーを使います。
from queue import Queue
import threading
import time
import random
class TaskSystem:
def __init__(self, num_producers, num_consumers):
self.queue = Queue(maxsize=10)
self.producers = []
self.consumers = []
self.running = True
for i in range(num_producers):
t = threading.Thread(target=self._producer, args=(i,))
self.producers.append(t)
for i in range(num_consumers):
t = threading.Thread(target=self._consumer, args=(i,))
self.consumers.append(t)
def _producer(self, pid):
while self.running:
item = f"P{pid}-{random.randint(0, 100)}"
self.queue.put(item)
print(f"Producer {pid}: {item}")
time.sleep(random.uniform(0.5, 1))
def _consumer(self, cid):
while self.running or not self.queue.empty():
try:
item = self.queue.get(timeout=0.5)
print(f"Consumer {cid}: {item} を処理")
self.queue.task_done()
except:
pass
def start(self):
for t in self.producers + self.consumers:
t.start()
def stop(self):
self.running = False
for t in self.producers + self.consumers:
t.join()
system = TaskSystem(num_producers=2, num_consumers=3)
system.start()
time.sleep(3)
system.stop()
print("システム停止")ThreadPoolExecutor を使った実装
ThreadPoolExecutor を使うとよりシンプルに書けます。
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time
def process_item(item):
time.sleep(0.1)
return f"{item} 処理完了"
# アイテムを生成
items = [f"item-{i}" for i in range(10)]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
for result in results:
print(result)優先度付きキュー
PriorityQueue を使うと、優先度の高いタスクを先に処理できます。
from queue import PriorityQueue
import threading
pq = PriorityQueue()
def consumer():
while True:
priority, item = pq.get()
if item is None:
break
print(f"処理: {item} (優先度: {priority})")
pq.task_done()
threading.Thread(target=consumer, daemon=True).start()
# 優先度の低い順に処理される
pq.put((3, "低優先度タスク"))
pq.put((1, "高優先度タスク"))
pq.put((2, "中優先度タスク"))
pq.join()
pq.put((0, None)) # 終了シグナルプロデューサー・コンシューマーパターンは、非同期処理やパイプライン処理の基本となる重要なパターンです。












