Python のマルチプロセスのデバッグ方法

Python のマルチプロセスは、複数のプロセスが独立して動作するため、デバッグが難しくなりがちです。ログ出力、例外のトレース、デバッガの活用など、効果的なデバッグ方法を紹介します。

print デバッグの問題

複数のプロセスから print() すると、出力が混ざって読みにくくなります。

from multiprocessing import Process

def worker(name):
    for i in range(3):
        print(f"{name}: {i}")

if __name__ == "__main__":
    processes = [Process(target=worker, args=(f"P{i}",)) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

出力が入り乱れて、どのプロセスがどの順番で処理したか分かりにくくなります。

logging モジュールを使う

logging モジュールを使うと、プロセス名やタイムスタンプを含めた整理された出力が得られます。

from multiprocessing import Process, current_process
import logging

def setup_logging():
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(processName)s %(message)s'
    )

def worker():
    setup_logging()
    logging.debug("Started")
    logging.info("Working...")
    logging.debug("Finished")

if __name__ == "__main__":
    setup_logging()
    
    processes = [Process(target=worker, name=f"Worker-{i}") for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

%(processName)s でプロセス名が出力に含まれ、どのプロセスのログか識別できます。

マルチプロセス用のログ設定

ファイルに出力する場合、複数プロセスから同時に書き込むと問題が起きることがあります。QueueHandler を使うと安全です。

from multiprocessing import Process, Queue
import logging
from logging.handlers import QueueHandler, QueueListener

def worker(log_queue):
    handler = QueueHandler(log_queue)
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)
    
    logger.info("Worker started")
    logger.debug("Processing...")

if __name__ == "__main__":
    log_queue = Queue()
    
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(processName)s: %(message)s'))
    listener = QueueListener(log_queue, handler)
    listener.start()
    
    p = Process(target=worker, args=(log_queue,))
    p.start()
    p.join()
    
    listener.stop()

すべてのログを 1 つのキューに集約し、リスナーが出力を担当します。

子プロセスの例外をトレースする

子プロセスで例外が発生しても、親プロセスには伝わりません。traceback を明示的に取得します。

from multiprocessing import Process, Queue
import traceback

def worker(result_queue):
    try:
        # 何かの処理
        raise ValueError("Something went wrong")
    except Exception:
        result_queue.put(traceback.format_exc())

if __name__ == "__main__":
    result_queue = Queue()
    p = Process(target=worker, args=(result_queue,))
    p.start()
    p.join()
    
    if not result_queue.empty():
        print("Exception in child process:")
        print(result_queue.get())

ProcessPoolExecutor でスタックトレースを取得

ProcessPoolExecutor では、result() を呼ぶと例外とスタックトレースが再送出されます。

from concurrent.futures import ProcessPoolExecutor
import traceback

def buggy_task(x):
    return 1 / x

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        future = executor.submit(buggy_task, 0)
        
        try:
            result = future.result()
        except Exception:
            print(traceback.format_exc())

デバッガの使用

マルチプロセスでデバッガを使う場合、子プロセスにアタッチする必要があります。

from multiprocessing import Process
import pdb

def worker():
    x = 10
    pdb.set_trace()  # ここでブレーク
    print(x)

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()
    p.join()

ただし、pdb は子プロセスで標準入力が使えない場合があります。IDE のリモートデバッグ機能を使うほうが確実です。

プロセスの状態を監視する

current_process()active_children() でプロセスの状態を確認できます。

from multiprocessing import Process, current_process, active_children
import time

def worker():
    print(f"Worker PID: {current_process().pid}")
    time.sleep(2)

if __name__ == "__main__":
    processes = [Process(target=worker) for _ in range(3)]
    for p in processes:
        p.start()
    
    time.sleep(0.5)
    print(f"Active children: {len(active_children())}")
    
    for p in processes:
        p.join()
current_process()現在のプロセスオブジェクト
active_children()アクティブな子プロセスのリスト
os.getpid()現在のプロセス ID
os.getppid()親プロセス ID

シンプルに保つ

マルチプロセスのデバッグは複雑になりがちです。まずはシングルプロセスで動作確認してから並列化することで、バグの原因を特定しやすくなります。

最初はプロセス数 1 でテストする
ログを詳細に出力する
再現性のあるテストケースを作る
共有リソースを最小限にする