Python の Pipe でプロセス間通信する

Python の multiprocessing.Pipe は、2 つのプロセス間で双方向の通信を行うためのシンプルな仕組みです。Queue と比べて軽量で、1 対 1 の通信に適しています。

Pipe の基本

Pipe() は 2 つの接続オブジェクト(コネクション)を返します。それぞれの端点で send()recv() を使ってデータをやり取りします。

from multiprocessing import Process, Pipe

def child(conn):
    conn.send("Hello from child")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=child, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # Hello from child
    p.join()

Pipe() が返す 2 つのオブジェクトは、パイプの両端を表します。

双方向通信

デフォルトでは、両方の端点から送受信ができます。

from multiprocessing import Process, Pipe

def worker(conn):
    msg = conn.recv()  # 親からのメッセージを受信
    conn.send(f"Received: {msg}")  # 親に返信
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    
    parent_conn.send("Hello")
    reply = parent_conn.recv()
    print(reply)  # Received: Hello
    
    p.join()

親が send() でメッセージ送信

子が recv() でメッセージ受信

子が send() で返信

親が recv() で返信を受信

片方向パイプ

Pipe(duplex=False) で片方向パイプを作成できます。

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("One-way message")
    conn.close()

if __name__ == "__main__":
    recv_conn, send_conn = Pipe(duplex=False)
    p = Process(target=sender, args=(send_conn,))
    p.start()
    print(recv_conn.recv())  # One-way message
    p.join()

duplex=False の場合、最初の戻り値(recv_conn)は受信専用、2 番目(send_conn)は送信専用になります。

Connection の主なメソッド

send(obj)オブジェクトを送信
recv()オブジェクトを受信
poll(timeout)データがあるか確認
close()接続を閉じる
fileno()ファイルディスクリプタを取得

poll でデータの有無を確認する

poll() を使うと、ブロックせずにデータの有無を確認できます。

from multiprocessing import Process, Pipe
import time

def slow_sender(conn):
    time.sleep(2)
    conn.send("Delayed message")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=slow_sender, args=(child_conn,))
    p.start()
    
    while not parent_conn.poll(timeout=0.5):
        print("Waiting...")
    
    print(parent_conn.recv())
    p.join()

poll() は指定した秒数だけ待機し、データがあれば True、なければ False を返します。

Queue と Pipe の使い分け

Queue

複数のプロセスが同時にアクセスする場合に適している。スレッドセーフ。

Pipe

1 対 1 の通信に適している。軽量で高速。

複数のワーカープロセスとの通信には Queue を、親子間の単純な通信には Pipe を使うのが一般的です。

注意点

Pipe は同じ端点から同時に複数のプロセスが読み書きすると、データが破損する可能性があります。

# 危険な例(複数プロセスが同じ端点を共有)
# conn を複数のプロセスに渡して同時に recv() すると問題が起きる

1 つの端点は 1 つのプロセスだけが使うようにしてください。複数プロセスとの通信には Queue を使いましょう。