记录 Flask 与 Celery 集成实现异步任务执行的基本流程,适用于延时任务、耗时操作(如邮件发送、批量处理等)场景。
一、基础依赖安装
pip install celery
消息队列示例使用 Redis:
pip install redis
Redis 本地默认监听在 localhost:6379
二、Flask 与 Celery 简单整合
项目结构示意:
myapp/
├── app.py
├── celery_worker.py
├── tasks.py
app.py(Flask 主程序)
from flask import Flask, jsonify
from tasks import long_task
app = Flask(__name__)
@app.route('/start-task')
def start_task():
result = long_task.delay()
return jsonify({'task_id': result.id})
tasks.py(任务定义)
from celery import Celery
celery_app = Celery('myapp', broker='redis://localhost:6379/0')
@celery_app.task
def long_task():
import time
time.sleep(5)
return "任务完成"
celery_worker.py(启动 Celery)
from tasks import celery_app
# 执行:celery -A celery_worker worker --loglevel=info
说明:
broker
设置为 Redis 地址- 启动 Celery 时以 worker 方式运行
delay()
用于异步调用任务
三、返回结果查询(可选)
可以通过 AsyncResult
查询任务状态:
from celery.result import AsyncResult
@app.route('/task-status/<task_id>')
def get_status(task_id):
res = AsyncResult(task_id)
return jsonify({'status': res.status, 'result': res.result})
四、小结
- Flask 可通过 Celery 集成异步任务处理
- Redis 是常用的中间件(也可替换为 RabbitMQ 等)
- 任务使用
.delay()
异步触发,避免阻塞主线程