celery,flask
时间: 2025-01-09 16:47:25 浏览: 19
如何在Flask中使用Celery进行异步任务处理
配置Celery实例
为了使 Celery 能够与 Flask 协同工作,需创建一个配置良好的 Celery 实例。通常会单独建立 tasks.py
文件用于定义 Celery 对象及其任务函数[^3]。
from celery import Celery
celery_app = Celery("ihome")
celery_app.conf.broker_url = 'redis://127.0.0.1:6379/1'
这段代码初始化了一个名为 "ihome"
的 Celery 应用,并指定了 Redis 作为消息代理服务器的位置。
定义异步任务
接着,在同一文件内可以定义具体的异步任务逻辑:
@celery_app.task
def send_sms(mobile, datas, temp_id):
print("进入任务")
ccp = CCP()
result = ccp.sendTemplateSMS(mobile, datas, temp_id)
if result == 0:
print("发送成功")
else:
print("发送失败")
此部分展示了如何通过装饰器 @celery_app.task
将普通函数转变为可被远程调用的任务函数。
创建路由接口以触发任务
为了让前端能够请求并启动这些后台作业,可以在 Flask 主应用中设置相应的 API 接口来接收参数并向 Celery 提交新任务:
@app.route('/send-sms', methods=['POST'])
def trigger_send_sms():
mobile = request.json.get('mobile')
datas = request.json.get('datas')
temp_id = request.json.get('temp_id')
task_result = send_sms.delay(mobile, datas, temp_id)
return jsonify({"status": "success", "task_id": str(task_result)})
上述代码片段实现了当接收到 POST 请求时,解析 JSON 数据中的手机号码、模板 ID 和数据列表,并将其传递给 send_sms()
函数执行;同时返回该次操作对应的任务唯一标识符以便后续跟踪状态变化[^1]。
查询任务进度或结果
对于已经提交的任务,可以通过其唯一的ID号获取当前的状态信息或者最终的结果:
@app.route('/task/<string:id>', methods=['GET'])
def get_task_status(id):
async_result = AsyncResult(id=id, app=celery_app)
response_data = {
'state': async_result.state,
'result': async_result.result
}
return jsonify(response_data), 200
这里利用了 Celery 自带的 AsyncResult
类来进行查询操作,它允许应用程序实时监控任何已知任务的生命历程以及可能产生的输出值。
启动项目组件
最后一步是要确保所有必要的服务都处于活动状态才能正常运作整个体系结构。这包括但不限于安装依赖包、开启 Redis 服务端进程以及分别运行 Flask web server 和 Celery workers 进程[^4]:
- 窗口1 (App): 执行命令
python app.py
- 窗口2 (Tests): 如果有测试脚本的话,则在此处运行它们,比如
python test_send_email.py
- 窗口3 (Worker): 使用指令
celery -A tasks worker --pool=solo --loglevel=info
来激活工作者节点监听来自客户端的新任务分配
相关推荐


















