Flask で時間のかかる処理(メール送信、画像処理、外部 API 呼び出しなど)をリクエスト内で実行すると、レスポンスが遅くなる。Celery を使えば、これらの処理をバックグラウンドで非同期実行できる。
Celery のセットアップ
pip install celery redis
# app/celery_app.py from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery
Flask アプリとの統合
# app/__init__.py from flask import Flask from app.celery_app import make_celery def create_app(config_name='default'): app = Flask(__name__) app.config.from_object(f'config.{config_name}') return app app = create_app() celery = make_celery(app)
# config.py class Config: CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
タスクの定義
# app/tasks.py from app import celery from app.services.email_service import send_email_sync @celery.task def send_email_task(to, subject, body): send_email_sync(to, subject, body) @celery.task(bind=True, max_retries=3) def process_image_task(self, image_id): try: # 画像処理 process_image(image_id) except Exception as e: # リトライ self.retry(exc=e, countdown=60)
ビューからタスクを呼び出す
from app.tasks import send_email_task @app.route('/register', methods=['POST']) def register(): user = create_user(request.form) # 非同期でメール送信 send_email_task.delay( to=user.email, subject='Welcome!', body='Thank you for registering.' ) return 'Registration complete'
.delay() を呼ぶと、タスクがキューに追加され、すぐにレスポンスを返せる。
Celery ワーカーの起動
celery -A app.celery worker --loglevel=info
タスクの状態確認
from app.tasks import process_image_task @app.route('/process/<int:image_id>') def process(image_id): task = process_image_task.delay(image_id) return {'task_id': task.id} @app.route('/status/<task_id>') def task_status(task_id): task = process_image_task.AsyncResult(task_id) return { 'state': task.state, 'result': task.result if task.ready() else None }
定期実行(Celery Beat)
# config.py from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'cleanup-every-hour': { 'task': 'app.tasks.cleanup_task', 'schedule': crontab(minute=0), # 毎時0分 }, }
celery -A app.celery beat --loglevel=info
本番環境での注意点
| Redis/RabbitMQ | メッセージブローカーの可用性を確保 |
| ワーカー数 | CPU コア数に応じて調整 |
| 監視 | Flower などでタスクを監視 |
Celery を導入することで、Flask アプリケーションのレスポンス性能を維持しつつ、重い処理をバックグラウンドで実行できる。