Python で複数プロセスから同じファイルに安全に書き込む

複数のプロセスが同じファイルに同時に書き込むと、データが混在したり破損したりする。安全に書き込むにはファイルロックや排他制御が必要だ。

問題の例

# ❌ 複数プロセスが同時に実行すると壊れる
def log_message(message):
    with open('app.log', 'a') as f:
        f.write(f'{message}\n')

複数プロセスが同時にこの関数を呼ぶと、メッセージが混在したり、一部が上書きされたりする可能性がある。

ファイルロックを使う

filelock ライブラリで排他制御を行う。

pip install filelock
from filelock import FileLock

def log_message_safe(message):
    lock = FileLock('app.log.lock')
    
    with lock:
        with open('app.log', 'a') as f:
            f.write(f'{message}\n')

ロックを取得したプロセスだけが書き込みでき、他のプロセスは待機する。

logging モジュールを使う

Python 標準の logging モジュールはマルチプロセス対応のハンドラを持っている。

import logging
from logging.handlers import RotatingFileHandler

# プロセスごとに別のログファイルに書く方法
import os

handler = RotatingFileHandler(
    f'app_{os.getpid()}.log',
    maxBytes=10*1024*1024,
    backupCount=5
)

logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.info('メッセージ')

QueueHandler を使う(推奨)

マルチプロセスで単一のログファイルに書く場合は、QueueHandlerQueueListener を使う。

import logging
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue

def setup_logging():
    log_queue = Queue()
    
    # ファイルハンドラ(リスナー側)
    file_handler = logging.FileHandler('app.log')
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s - %(processName)s - %(message)s'
    ))
    
    # リスナーを開始
    listener = QueueListener(log_queue, file_handler)
    listener.start()
    
    # 各プロセスはキューに書き込む
    queue_handler = QueueHandler(log_queue)
    logger = logging.getLogger()
    logger.addHandler(queue_handler)
    logger.setLevel(logging.INFO)
    
    return listener

listener = setup_logging()
# プログラム終了時に listener.stop() を呼ぶ

fcntl を使う(Unix 系)

Unix 系 OS では fcntl で低レベルなファイルロックができる。

import fcntl

def append_with_lock(filepath, data):
    with open(filepath, 'a') as f:
        fcntl.flock(f.fileno(), fcntl.LOCK_EX)
        try:
            f.write(data)
        finally:
            fcntl.flock(f.fileno(), fcntl.LOCK_UN)

プロセスごとにファイルを分ける

ロックを避けるシンプルな方法は、プロセスごとに別のファイルに書き、後でマージすることだ。

import os
from multiprocessing import Pool

def worker(task):
    pid = os.getpid()
    with open(f'result_{pid}.txt', 'a') as f:
        result = process(task)
        f.write(f'{result}\n')
    return result

# 各ワーカーが別ファイルに書く
with Pool(4) as pool:
    pool.map(worker, tasks)

# 後でマージ
import glob

with open('final_result.txt', 'w') as out:
    for filepath in glob.glob('result_*.txt'):
        with open(filepath) as f:
            out.write(f.read())

アトミック書き込み + ユニークファイル名

各書き込みを別ファイルにし、ディレクトリで管理する方法もある。

import uuid
import os

def atomic_append(directory, data):
    # ユニークなファイル名を生成
    filename = f'{uuid.uuid4()}.txt'
    filepath = os.path.join(directory, filename)
    
    # アトミック書き込み
    with open(filepath, 'w') as f:
        f.write(data)

この方法ならロック不要で、高い並列性を実現できる。

データベースを使う

複雑な並行書き込みが必要な場合は、SQLite などのデータベースを使う方が安全だ。SQLite は適切なロック機構を内蔵している。

import sqlite3

def log_to_db(message):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()
    cursor.execute(
        'INSERT INTO logs (message, timestamp) VALUES (?, datetime("now"))',
        (message,)
    )
    conn.commit()
    conn.close()