複数のプロセスが同じファイルに同時に書き込むと、データが混在したり破損したりする。安全に書き込むにはファイルロックや排他制御が必要だ。
問題の例
# ❌ 複数プロセスが同時に実行すると壊れる def log_message(message): with open('app.log', 'a') as f: f.write(f'{message}\n')
複数プロセスが同時にこの関数を呼ぶと、メッセージが混在したり、一部が上書きされたりする可能性がある。
ファイルロックを使う
filelock ライブラリで排他制御を行う。
pip install filelock
from filelock import FileLock def log_message_safe(message): lock = FileLock('app.log.lock') with lock: with open('app.log', 'a') as f: f.write(f'{message}\n')
ロックを取得したプロセスだけが書き込みでき、他のプロセスは待機する。
logging モジュールを使う
Python 標準の logging モジュールはマルチプロセス対応のハンドラを持っている。
import logging from logging.handlers import RotatingFileHandler # プロセスごとに別のログファイルに書く方法 import os handler = RotatingFileHandler( f'app_{os.getpid()}.log', maxBytes=10*1024*1024, backupCount=5 ) logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info('メッセージ')
QueueHandler を使う(推奨)
マルチプロセスで単一のログファイルに書く場合は、QueueHandler と QueueListener を使う。
import logging from logging.handlers import QueueHandler, QueueListener from multiprocessing import Queue def setup_logging(): log_queue = Queue() # ファイルハンドラ(リスナー側) file_handler = logging.FileHandler('app.log') file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(processName)s - %(message)s' )) # リスナーを開始 listener = QueueListener(log_queue, file_handler) listener.start() # 各プロセスはキューに書き込む queue_handler = QueueHandler(log_queue) logger = logging.getLogger() logger.addHandler(queue_handler) logger.setLevel(logging.INFO) return listener listener = setup_logging() # プログラム終了時に listener.stop() を呼ぶ
fcntl を使う(Unix 系)
Unix 系 OS では fcntl で低レベルなファイルロックができる。
import fcntl def append_with_lock(filepath, data): with open(filepath, 'a') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: f.write(data) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN)
プロセスごとにファイルを分ける
ロックを避けるシンプルな方法は、プロセスごとに別のファイルに書き、後でマージすることだ。
import os from multiprocessing import Pool def worker(task): pid = os.getpid() with open(f'result_{pid}.txt', 'a') as f: result = process(task) f.write(f'{result}\n') return result # 各ワーカーが別ファイルに書く with Pool(4) as pool: pool.map(worker, tasks) # 後でマージ import glob with open('final_result.txt', 'w') as out: for filepath in glob.glob('result_*.txt'): with open(filepath) as f: out.write(f.read())
アトミック書き込み + ユニークファイル名
各書き込みを別ファイルにし、ディレクトリで管理する方法もある。
import uuid import os def atomic_append(directory, data): # ユニークなファイル名を生成 filename = f'{uuid.uuid4()}.txt' filepath = os.path.join(directory, filename) # アトミック書き込み with open(filepath, 'w') as f: f.write(data)
この方法ならロック不要で、高い並列性を実現できる。
データベースを使う
複雑な並行書き込みが必要な場合は、SQLite などのデータベースを使う方が安全だ。SQLite は適切なロック機構を内蔵している。
import sqlite3 def log_to_db(message): conn = sqlite3.connect('app.db') cursor = conn.cursor() cursor.execute( 'INSERT INTO logs (message, timestamp) VALUES (?, datetime("now"))', (message,) ) conn.commit() conn.close()