Python の Queue でプロセス間通信する

Python の multiprocessing.Queue は、プロセス間で安全にデータをやり取りするためのキューです。複数のプロセスが同時にアクセスしても、データの整合性が保たれます。

Queue の基本

Queue はスレッドセーフかつプロセスセーフなキューです。put() でデータを追加し、get() で取り出します。

from multiprocessing import Process, Queue

def worker(q):
    q.put("Hello from child process")

if __name__ == "__main__":
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get())  # Hello from child process
    p.join()

親プロセスでキューを作成し、子プロセスに渡すことで双方向の通信が可能になります。

複数のデータを送受信する

キューには複数のデータを格納できます。

from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(i)
    q.put(None)  # 終了シグナル

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Received: {item}")

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

None を終了シグナルとして使うのは一般的なパターンです。

Queue の主なメソッド

put(item)キューにアイテムを追加
get()キューからアイテムを取得
put_nowait(item)ブロックせずに追加(満杯ならエラー)
get_nowait()ブロックせずに取得(空ならエラー)
empty()キューが空かどうか
full()キューが満杯かどうか
qsize()キューのサイズ(近似値)

empty()qsize() は、マルチプロセス環境では正確な値を返さない場合があるので注意が必要です。

タイムアウト付きの get

get() にはタイムアウトを指定できます。

from multiprocessing import Process, Queue
from queue import Empty

def worker(q):
    pass  # 何も送らない

if __name__ == "__main__":
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    
    try:
        item = q.get(timeout=2)
    except Empty:
        print("タイムアウト: データを受信できませんでした")
    
    p.join()

タイムアウトすると queue.Empty 例外が発生します。

サイズ制限付きのキュー

Queue のコンストラクタにサイズを指定すると、格納できるアイテム数を制限できます。

from multiprocessing import Queue

q = Queue(maxsize=3)
q.put(1)
q.put(2)
q.put(3)
# q.put(4)  # ブロックされる(空きができるまで待機)

満杯のキューに put() すると、空きができるまでブロックされます。put_nowait() を使うと、queue.Full 例外が発生します。

複数のプロデューサーとコンシューマー

Queue は複数のプロセスから同時にアクセスしても安全です。

from multiprocessing import Process, Queue
import time

def producer(q, name):
    for i in range(3):
        q.put(f"{name}-{i}")
        time.sleep(0.1)

def consumer(q, num_producers):
    finished = 0
    while finished < num_producers:
        item = q.get()
        if item is None:
            finished += 1
        else:
            print(f"Consumed: {item}")

if __name__ == "__main__":
    q = Queue()
    producers = [
        Process(target=producer, args=(q, f"P{i}"))
        for i in range(3)
    ]
    
    for p in producers:
        p.start()
    
    for p in producers:
        p.join()
        q.put(None)  # 各プロデューサーの終了を通知
    
    consumer(q, len(producers))

複数のプロデューサーが同時にデータを送信し、コンシューマーが順次処理するパターンはよく使われます。