Python の Queue でスレッド間通信する

queue.Queue は、スレッド間でデータを安全にやり取りするためのキューです。内部で同期処理が行われているため、ロックを意識せずにスレッド間通信ができます。

Queue の基本

Queue は FIFO(先入れ先出し)のデータ構造です。put() でデータを入れ、get() で取り出します。

from queue import Queue
import threading
import time

q = Queue()

def producer():
    for i in range(5):
        item = f"item-{i}"
        q.put(item)
        print(f"Producer: {item} を追加")
        time.sleep(0.5)

def consumer():
    while True:
        item = q.get()
        print(f"Consumer: {item} を取得")
        q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, daemon=True)

t1.start()
t2.start()
t1.join()
q.join()  # すべてのタスクが処理されるまで待機
print("完了")

Queue の主なメソッド

put(item, block=True, timeout=None)

アイテムをキューに追加。キューが満杯ならブロック。

get(block=True, timeout=None)

アイテムをキューから取り出す。キューが空ならブロック。

task_done()

get() したアイテムの処理完了を通知。

join()

すべてのアイテムが処理されるまで待機。

ブロッキングとノンブロッキング

デフォルトでは put()get() はブロッキングですが、ノンブロッキングにもできます。

from queue import Queue, Empty, Full

q = Queue(maxsize=2)  # 最大2個まで

# ノンブロッキングで追加
q.put("a", block=False)
q.put("b", block=False)

try:
    q.put("c", block=False)  # キューが満杯
except Full:
    print("キューが満杯です")

# ノンブロッキングで取得
print(q.get(block=False))  # a
print(q.get(block=False))  # b

try:
    q.get(block=False)  # キューが空
except Empty:
    print("キューが空です")

タイムアウト

タイムアウトを指定すると、一定時間だけ待機します。

from queue import Queue, Empty

q = Queue()

try:
    item = q.get(timeout=2)  # 最大2秒待つ
except Empty:
    print("タイムアウト: キューが空のまま")

実用例:ワーカープール

複数のワーカースレッドがキューからタスクを取得して処理するパターンです。

from queue import Queue
import threading
import time
import random

def worker(name, task_queue):
    while True:
        task = task_queue.get()
        if task is None:  # 終了シグナル
            break
        print(f"{name}: {task} を処理中")
        time.sleep(random.uniform(0.5, 1.5))
        print(f"{name}: {task} を完了")
        task_queue.task_done()

task_queue = Queue()

# ワーカースレッドを作成
workers = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"Worker-{i}", task_queue))
    t.start()
    workers.append(t)

# タスクを追加
for i in range(10):
    task_queue.put(f"Task-{i}")

# すべてのタスクの完了を待機
task_queue.join()

# ワーカーを終了
for _ in workers:
    task_queue.put(None)
for t in workers:
    t.join()

print("すべて完了")

他のキュータイプ

クラス順序
QueueFIFO(先入れ先出し)
LifoQueueLIFO(後入れ先出し、スタック)
PriorityQueue優先度順
from queue import Queue, LifoQueue, PriorityQueue

# FIFO
fifo = Queue()
for i in [1, 2, 3]:
    fifo.put(i)
print([fifo.get() for _ in range(3)])  # [1, 2, 3]

# LIFO(スタック)
lifo = LifoQueue()
for i in [1, 2, 3]:
    lifo.put(i)
print([lifo.get() for _ in range(3)])  # [3, 2, 1]

# 優先度(小さい値が先)
pq = PriorityQueue()
pq.put((3, "low"))
pq.put((1, "high"))
pq.put((2, "medium"))
print([pq.get() for _ in range(3)])
# [(1, 'high'), (2, 'medium'), (3, 'low')]

Queue はスレッドセーフなので、マルチスレッド環境でのデータ受け渡しに最適です。