Flask で時間のかかる処理(メール送信、画像処理、外部 API 呼び出しなど)をリクエスト内で実行すると、レスポンスが遅くなる。Celery を使えば、これらの処理をバックグラウンドで非同期実行できる。
Celery のセットアップ
pip install celery redis
# app/celery_app.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
Flask アプリとの統合
# app/__init__.py
from flask import Flask
from app.celery_app import make_celery
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(f'config.{config_name}')
return app
app = create_app()
celery = make_celery(app)
# config.py
class Config:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
タスクの定義
# app/tasks.py
from app import celery
from app.services.email_service import send_email_sync
@celery.task
def send_email_task(to, subject, body):
send_email_sync(to, subject, body)
@celery.task(bind=True, max_retries=3)
def process_image_task(self, image_id):
try:
# 画像処理
process_image(image_id)
except Exception as e:
# リトライ
self.retry(exc=e, countdown=60)
ビューからタスクを呼び出す
from app.tasks import send_email_task
@app.route('/register', methods=['POST'])
def register():
user = create_user(request.form)
# 非同期でメール送信
send_email_task.delay(
to=user.email,
subject='Welcome!',
body='Thank you for registering.'
)
return 'Registration complete'
.delay() を呼ぶと、タスクがキューに追加され、すぐにレスポンスを返せる。
Celery ワーカーの起動
celery -A app.celery worker --loglevel=info
タスクの状態確認
from app.tasks import process_image_task
@app.route('/process/<int:image_id>')
def process(image_id):
task = process_image_task.delay(image_id)
return {'task_id': task.id}
@app.route('/status/<task_id>')
def task_status(task_id):
task = process_image_task.AsyncResult(task_id)
return {
'state': task.state,
'result': task.result if task.ready() else None
}
定期実行(Celery Beat)
# config.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-every-hour': {
'task': 'app.tasks.cleanup_task',
'schedule': crontab(minute=0), # 毎時0分
},
}
celery -A app.celery beat --loglevel=info
本番環境での注意点
| Redis/RabbitMQ | メッセージブローカーの可用性を確保 |
| ワーカー数 | CPU コア数に応じて調整 |
| 監視 | Flower などでタスクを監視 |
Celery を導入することで、Flask アプリケーションのレスポンス性能を維持しつつ、重い処理をバックグラウンドで実行できる。