Python の multiprocessing.Queue は、プロセス間で安全にデータをやり取りするためのキューです。複数のプロセスが同時にアクセスしても、データの整合性が保たれます。
Queue の基本
Queue はスレッドセーフかつプロセスセーフなキューです。put() でデータを追加し、get() で取り出します。
from multiprocessing import Process, Queue def worker(q): q.put("Hello from child process") if __name__ == "__main__": q = Queue() p = Process(target=worker, args=(q,)) p.start() print(q.get()) # Hello from child process p.join()
親プロセスでキューを作成し、子プロセスに渡すことで双方向の通信が可能になります。
複数のデータを送受信する
キューには複数のデータを格納できます。
from multiprocessing import Process, Queue def producer(q): for i in range(5): q.put(i) q.put(None) # 終了シグナル def consumer(q): while True: item = q.get() if item is None: break print(f"Received: {item}") if __name__ == "__main__": q = Queue() p1 = Process(target=producer, args=(q,)) p2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join()
None を終了シグナルとして使うのは一般的なパターンです。
Queue の主なメソッド
| put(item) | キューにアイテムを追加 |
| get() | キューからアイテムを取得 |
| put_nowait(item) | ブロックせずに追加(満杯ならエラー) |
| get_nowait() | ブロックせずに取得(空ならエラー) |
| empty() | キューが空かどうか |
| full() | キューが満杯かどうか |
| qsize() | キューのサイズ(近似値) |
empty() や qsize() は、マルチプロセス環境では正確な値を返さない場合があるので注意が必要です。
タイムアウト付きの get
get() にはタイムアウトを指定できます。
from multiprocessing import Process, Queue from queue import Empty def worker(q): pass # 何も送らない if __name__ == "__main__": q = Queue() p = Process(target=worker, args=(q,)) p.start() try: item = q.get(timeout=2) except Empty: print("タイムアウト: データを受信できませんでした") p.join()
タイムアウトすると queue.Empty 例外が発生します。
サイズ制限付きのキュー
Queue のコンストラクタにサイズを指定すると、格納できるアイテム数を制限できます。
from multiprocessing import Queue q = Queue(maxsize=3) q.put(1) q.put(2) q.put(3) # q.put(4) # ブロックされる(空きができるまで待機)
満杯のキューに put() すると、空きができるまでブロックされます。put_nowait() を使うと、queue.Full 例外が発生します。
複数のプロデューサーとコンシューマー
Queue は複数のプロセスから同時にアクセスしても安全です。
from multiprocessing import Process, Queue import time def producer(q, name): for i in range(3): q.put(f"{name}-{i}") time.sleep(0.1) def consumer(q, num_producers): finished = 0 while finished < num_producers: item = q.get() if item is None: finished += 1 else: print(f"Consumed: {item}") if __name__ == "__main__": q = Queue() producers = [ Process(target=producer, args=(q, f"P{i}")) for i in range(3) ] for p in producers: p.start() for p in producers: p.join() q.put(None) # 各プロデューサーの終了を通知 consumer(q, len(producers))
複数のプロデューサーが同時にデータを送信し、コンシューマーが順次処理するパターンはよく使われます。