中学理科1626207 views
英語607877 views
小学算数1194618 views
いろは2986023 views
高校生物549842 views
小学社会308636 views
LaTeX957300 views
Computer365120 views
高校化学2913383 views
小学理科717236 views
Help
Tools

English

Python でプロデューサー・コンシューマーパターンを実装する

プロデューサー・コンシューマーパターンは、データを生成するスレッド(プロデューサー)と、それを消費するスレッド(コンシューマー)が協調して動作する設計パターンです。マルチスレッドプログラミングで最もよく使われるパターンの1つです。

パターンの概要

プロデューサーがデータを生成

キューにデータを追加

コンシューマーがキューから取り出し

データを処理

プロデューサーとコンシューマーは独立して動作し、キューを介して通信します。

基本的な実装

queue.Queue を使った実装です。

from queue import Queue
import threading
import time
import random

def producer(queue, name):
    for i in range(5):
        item = f"{name}-item-{i}"
        time.sleep(random.uniform(0.1, 0.5))  # 生成に時間がかかる
        queue.put(item)
        print(f"Producer {name}: {item} を追加")
    print(f"Producer {name}: 完了")

def consumer(queue, name):
    while True:
        item = queue.get()
        if item is None:  # 終了シグナル
            break
        time.sleep(random.uniform(0.1, 0.3))  # 処理に時間がかかる
        print(f"Consumer {name}: {item} を処理")
        queue.task_done()
    print(f"Consumer {name}: 終了")

queue = Queue()

# プロデューサーとコンシューマーを作成
producers = [threading.Thread(target=producer, args=(queue, f"P{i}")) for i in range(2)]
consumers = [threading.Thread(target=consumer, args=(queue, f"C{i}")) for i in range(3)]

# 開始
for t in producers + consumers:
    t.start()

# プロデューサーの終了を待つ
for t in producers:
    t.join()

# すべてのアイテムの処理を待つ
queue.join()

# コンシューマーに終了シグナルを送る
for _ in consumers:
    queue.put(None)

# コンシューマーの終了を待つ
for t in consumers:
    t.join()

print("すべて完了")

キューのサイズ制限

Queue にサイズ制限を設けると、プロデューサーが速すぎる場合にブロックされます。

from queue import Queue
import threading
import time

queue = Queue(maxsize=3)  # 最大3個まで

def producer():
    for i in range(10):
        item = f"item-{i}"
        queue.put(item)  # キューが満杯ならブロック
        print(f"Producer: {item} を追加 (size={queue.qsize()})")

def consumer():
    for _ in range(10):
        time.sleep(0.5)  # 処理が遅い
        item = queue.get()
        print(f"Consumer: {item} を処理")
        queue.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

複数プロデューサー・複数コンシューマー

大規模なシステムでは、複数のプロデューサーとコンシューマーを使います。

from queue import Queue
import threading
import time
import random

class TaskSystem:
    def __init__(self, num_producers, num_consumers):
        self.queue = Queue(maxsize=10)
        self.producers = []
        self.consumers = []
        self.running = True
        
        for i in range(num_producers):
            t = threading.Thread(target=self._producer, args=(i,))
            self.producers.append(t)
        
        for i in range(num_consumers):
            t = threading.Thread(target=self._consumer, args=(i,))
            self.consumers.append(t)
    
    def _producer(self, pid):
        while self.running:
            item = f"P{pid}-{random.randint(0, 100)}"
            self.queue.put(item)
            print(f"Producer {pid}: {item}")
            time.sleep(random.uniform(0.5, 1))
    
    def _consumer(self, cid):
        while self.running or not self.queue.empty():
            try:
                item = self.queue.get(timeout=0.5)
                print(f"Consumer {cid}: {item} を処理")
                self.queue.task_done()
            except:
                pass
    
    def start(self):
        for t in self.producers + self.consumers:
            t.start()
    
    def stop(self):
        self.running = False
        for t in self.producers + self.consumers:
            t.join()

system = TaskSystem(num_producers=2, num_consumers=3)
system.start()
time.sleep(3)
system.stop()
print("システム停止")

ThreadPoolExecutor を使った実装

ThreadPoolExecutor を使うとよりシンプルに書けます。

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time

def process_item(item):
    time.sleep(0.1)
    return f"{item} 処理完了"

# アイテムを生成
items = [f"item-{i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))

for result in results:
    print(result)

優先度付きキュー

PriorityQueue を使うと、優先度の高いタスクを先に処理できます。

from queue import PriorityQueue
import threading

pq = PriorityQueue()

def consumer():
    while True:
        priority, item = pq.get()
        if item is None:
            break
        print(f"処理: {item} (優先度: {priority})")
        pq.task_done()

threading.Thread(target=consumer, daemon=True).start()

# 優先度の低い順に処理される
pq.put((3, "低優先度タスク"))
pq.put((1, "高優先度タスク"))
pq.put((2, "中優先度タスク"))

pq.join()
pq.put((0, None))  # 終了シグナル

プロデューサー・コンシューマーパターンは、非同期処理やパイプライン処理の基本となる重要なパターンです。