Django的celery
配置和使用Celery进行异步任务处理
Django项目的准备工作
为了使Django项目能够利用Celery执行异步任务,需先完成一些基础设置工作。这包括安装必要的软件包以及配置环境变量等。
对于Python依赖项而言,除了django
之外还需要安装celery
库及其消息代理(Broker),比如Redis或RabbitMQ。可以通过pip工具来安装这些组件:
pip install celery redis django-celery-results
Celery应用实例化
在Django项目的根目录下的__init__.py
文件中初始化Celery应用程序对象,并将其与当前的Django设置关联起来[^1]。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
这里需要注意的是替换'your_project'
为实际的项目名称;通过调用config_from_object()
方法加载来自Django配置中的所有以CELERY_
开头的关键字作为Celery配置参数;最后自动发现并注册位于各个应用内的任务模块(tasks.py
)。
设置后台存储
为了让客户端可以追踪任务的状态变化情况,在上述代码基础上还需指定一个合适的结果后端(Result Backend),用于持久化保存已完成工作的返回值和其他元数据信息。常见的选项有Redis、数据库表单(Django ORM/SQLAlchemy)等。
可以在Django settings.py 文件里添加如下配置:
CELERY_RESULT_BACKEND = "redis://localhost"
此行定义了结果后端使用的URL地址指向本地运行的一个Redis服务实例。
创建异步任务
接下来就是在具体的App内创建名为tasks.py
的新文件用来编写业务逻辑函数,该类函数即代表要交给worker进程去并发执行的具体操作。例如在一个电商网站上更新库存数量的操作就可以被设计成这样的形式[^3]:
@app.task(bind=True,default_retry_delay=60,max_retries=3)
def update_stock(self, product_id):
try:
# 更新商品库存...
pass
except Exception as exc:
raise self.retry(exc=exc)
这段代码展示了怎样声明一个新的异步任务——当遇到异常时允许重试三次,每次间隔一分钟的时间窗口等待恢复后再尝试继续执行未成功的部分。
启动Worker和服务
一切准备就绪之后便可通过命令行启动相应的守护程序监听待办事项列表并负责真正意义上的计算密集型作业分发给空闲的工作线程池去做。通常情况下会分别开启两个终端标签页分别输入下面两条指令来激活broker server 和 worker node :
# Terminal Tab 1 (for Redis Server)
$ redis-server
# Terminal Tab 2 (for Celery Worker Node)
$ celery -A your_project worker --loglevel=info
如果想要让某些特定的任务按照固定时间周期重复触发,则可借助于celery beat scheduler
特性实现自动化调度机制。只需编辑celery_beat_schedule.py
文件加入类似以下结构的数据映射关系即可达成目的[^2]:
from datetime import timedelta
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'add-every-minute': {
'task': 'my_app.tasks.add',
'schedule': crontab(),
'args': (16, 16),
},
}
相关推荐


















