雑学1472593 views
LaTeX957300 views
中学社会667106 views
中学英語808712 views
高校化学2913383 views
高校国語785655 views
MathPython491378 views
ヒストリア284143 views
高校生物549842 views
数学講師2852771 views
Help
Tools

English

Flask でバックグラウンドタスクを処理する(Celery 連携)

Flask で時間のかかる処理(メール送信、画像処理、外部 API 呼び出しなど)をリクエスト内で実行すると、レスポンスが遅くなる。Celery を使えば、これらの処理をバックグラウンドで非同期実行できる。

Celery のセットアップ

pip install celery redis
# app/celery_app.py
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
    
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    
    celery.Task = ContextTask
    return celery

Flask アプリとの統合

# app/__init__.py
from flask import Flask
from app.celery_app import make_celery

def create_app(config_name='default'):
    app = Flask(__name__)
    app.config.from_object(f'config.{config_name}')
    return app

app = create_app()
celery = make_celery(app)
# config.py
class Config:
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

タスクの定義

# app/tasks.py
from app import celery
from app.services.email_service import send_email_sync

@celery.task
def send_email_task(to, subject, body):
    send_email_sync(to, subject, body)

@celery.task(bind=True, max_retries=3)
def process_image_task(self, image_id):
    try:
        # 画像処理
        process_image(image_id)
    except Exception as e:
        # リトライ
        self.retry(exc=e, countdown=60)

ビューからタスクを呼び出す

from app.tasks import send_email_task

@app.route('/register', methods=['POST'])
def register():
    user = create_user(request.form)
    # 非同期でメール送信
    send_email_task.delay(
        to=user.email,
        subject='Welcome!',
        body='Thank you for registering.'
    )
    return 'Registration complete'

.delay() を呼ぶと、タスクがキューに追加され、すぐにレスポンスを返せる。

Celery ワーカーの起動

celery -A app.celery worker --loglevel=info

タスクの状態確認

from app.tasks import process_image_task

@app.route('/process/<int:image_id>')
def process(image_id):
    task = process_image_task.delay(image_id)
    return {'task_id': task.id}

@app.route('/status/<task_id>')
def task_status(task_id):
    task = process_image_task.AsyncResult(task_id)
    return {
        'state': task.state,
        'result': task.result if task.ready() else None
    }

定期実行(Celery Beat)

# config.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-every-hour': {
        'task': 'app.tasks.cleanup_task',
        'schedule': crontab(minute=0),  # 毎時0分
    },
}
celery -A app.celery beat --loglevel=info

本番環境での注意点

Redis/RabbitMQメッセージブローカーの可用性を確保
ワーカー数CPU コア数に応じて調整
監視Flower などでタスクを監視

Celery を導入することで、Flask アプリケーションのレスポンス性能を維持しつつ、重い処理をバックグラウンドで実行できる。