複数のプロセスが同じファイルに同時に書き込むと、データが混在したり破損したりする。安全に書き込むにはファイルロックや排他制御が必要だ。
問題の例
# ❌ 複数プロセスが同時に実行すると壊れる
def log_message(message):
with open('app.log', 'a') as f:
f.write(f'{message}\n')
複数プロセスが同時にこの関数を呼ぶと、メッセージが混在したり、一部が上書きされたりする可能性がある。
ファイルロックを使う
filelock ライブラリで排他制御を行う。
pip install filelock
from filelock import FileLock
def log_message_safe(message):
lock = FileLock('app.log.lock')
with lock:
with open('app.log', 'a') as f:
f.write(f'{message}\n')
ロックを取得したプロセスだけが書き込みでき、他のプロセスは待機する。
logging モジュールを使う
Python 標準の logging モジュールはマルチプロセス対応のハンドラを持っている。
import logging
from logging.handlers import RotatingFileHandler
# プロセスごとに別のログファイルに書く方法
import os
handler = RotatingFileHandler(
f'app_{os.getpid()}.log',
maxBytes=10*1024*1024,
backupCount=5
)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info('メッセージ')
QueueHandler を使う(推奨)
マルチプロセスで単一のログファイルに書く場合は、QueueHandler と QueueListener を使う。
import logging
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue
def setup_logging():
log_queue = Queue()
# ファイルハンドラ(リスナー側)
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(processName)s - %(message)s'
))
# リスナーを開始
listener = QueueListener(log_queue, file_handler)
listener.start()
# 各プロセスはキューに書き込む
queue_handler = QueueHandler(log_queue)
logger = logging.getLogger()
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
return listener
listener = setup_logging()
# プログラム終了時に listener.stop() を呼ぶ
fcntl を使う(Unix 系)
Unix 系 OS では fcntl で低レベルなファイルロックができる。
import fcntl
def append_with_lock(filepath, data):
with open(filepath, 'a') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
f.write(data)
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
プロセスごとにファイルを分ける
ロックを避けるシンプルな方法は、プロセスごとに別のファイルに書き、後でマージすることだ。
import os
from multiprocessing import Pool
def worker(task):
pid = os.getpid()
with open(f'result_{pid}.txt', 'a') as f:
result = process(task)
f.write(f'{result}\n')
return result
# 各ワーカーが別ファイルに書く
with Pool(4) as pool:
pool.map(worker, tasks)
# 後でマージ
import glob
with open('final_result.txt', 'w') as out:
for filepath in glob.glob('result_*.txt'):
with open(filepath) as f:
out.write(f.read())
アトミック書き込み + ユニークファイル名
各書き込みを別ファイルにし、ディレクトリで管理する方法もある。
import uuid
import os
def atomic_append(directory, data):
# ユニークなファイル名を生成
filename = f'{uuid.uuid4()}.txt'
filepath = os.path.join(directory, filename)
# アトミック書き込み
with open(filepath, 'w') as f:
f.write(data)
この方法ならロック不要で、高い並列性を実現できる。
データベースを使う
複雑な並行書き込みが必要な場合は、SQLite などのデータベースを使う方が安全だ。SQLite は適切なロック機構を内蔵している。
import sqlite3
def log_to_db(message):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute(
'INSERT INTO logs (message, timestamp) VALUES (?, datetime("now"))',
(message,)
)
conn.commit()
conn.close()