いろは2990632 views
教育149005 views
高校国語786021 views
英語608783 views
Computer365661 views
MathPython492451 views
高校倫理1434270 views
高校化学2914654 views
数学講師2858583 views
小学社会308820 views
Help
Tools

English

Flask でバックグラウンドタスクを処理する(Celery 連携)

Flask で時間のかかる処理(メール送信、画像処理、外部 API 呼び出しなど)をリクエスト内で実行すると、レスポンスが遅くなる。Celery を使えば、これらの処理をバックグラウンドで非同期実行できる。

Celery のセットアップ

pip install celery redis
# app/celery_app.py
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
    
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    
    celery.Task = ContextTask
    return celery

Flask アプリとの統合

# app/__init__.py
from flask import Flask
from app.celery_app import make_celery

def create_app(config_name='default'):
    app = Flask(__name__)
    app.config.from_object(f'config.{config_name}')
    return app

app = create_app()
celery = make_celery(app)
# config.py
class Config:
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

タスクの定義

# app/tasks.py
from app import celery
from app.services.email_service import send_email_sync

@celery.task
def send_email_task(to, subject, body):
    send_email_sync(to, subject, body)

@celery.task(bind=True, max_retries=3)
def process_image_task(self, image_id):
    try:
        # 画像処理
        process_image(image_id)
    except Exception as e:
        # リトライ
        self.retry(exc=e, countdown=60)

ビューからタスクを呼び出す

from app.tasks import send_email_task

@app.route('/register', methods=['POST'])
def register():
    user = create_user(request.form)
    # 非同期でメール送信
    send_email_task.delay(
        to=user.email,
        subject='Welcome!',
        body='Thank you for registering.'
    )
    return 'Registration complete'

.delay() を呼ぶと、タスクがキューに追加され、すぐにレスポンスを返せる。

Celery ワーカーの起動

celery -A app.celery worker --loglevel=info

タスクの状態確認

from app.tasks import process_image_task

@app.route('/process/<int:image_id>')
def process(image_id):
    task = process_image_task.delay(image_id)
    return {'task_id': task.id}

@app.route('/status/<task_id>')
def task_status(task_id):
    task = process_image_task.AsyncResult(task_id)
    return {
        'state': task.state,
        'result': task.result if task.ready() else None
    }

定期実行(Celery Beat)

# config.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-every-hour': {
        'task': 'app.tasks.cleanup_task',
        'schedule': crontab(minute=0),  # 毎時0分
    },
}
celery -A app.celery beat --loglevel=info

本番環境での注意点

Redis/RabbitMQメッセージブローカーの可用性を確保
ワーカー数CPU コア数に応じて調整
監視Flower などでタスクを監視

Celery を導入することで、Flask アプリケーションのレスポンス性能を維持しつつ、重い処理をバックグラウンドで実行できる。