django celery 定时任务 demo
时间: 2023-07-18 20:30:48 浏览: 219
好的,这里是一个基于 Django 和 Celery 的定时任务示例:
首先,你需要安装 Django 和 Celery:
```sh
pip install django celery
```
然后,在 Django 项目的 settings.py 中添加以下配置:
```python
# celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Django app 配置
INSTALLED_APPS = [
# ...
'django_celery_beat',
]
# celery beat 配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
```
接下来,在 Django 项目的 urls.py 中添加以下配置:
```python
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
path('task/', views.task, name='task'),
]
```
然后,定义一个任务:
```python
# tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
```
为了让 Celery 能够自动发现任务,需要在 Django 项目的 __init__.py 中添加以下代码:
```python
# __init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
```
接下来,创建一个 Celery 实例:
```python
# celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
```
最后,在 Django 项目中创建一个视图函数,用于触发任务:
```python
# views.py
from django.shortcuts import render
from .tasks import add
def index(request):
return render(request, 'index.html')
def task(request):
result = add.delay(1, 2)
return render(request, 'task.html', {'task_id': result.task_id})
```
这个视图函数会触发 add 任务,并返回任务 ID。
最后,在 Django 项目中创建两个模板:
index.html
```html
<!DOCTYPE html>
<html>
<head>
<title>Index</title>
</head>
<body>
<a href="{% url 'task' %}">Start Task</a>
</body>
</html>
```
task.html
```html
<!DOCTYPE html>
<html>
<head>
<title>Task</title>
</head>
<body>
Task ID: {{ task_id }}
</body>
</html>
```
这样就完成了一个基于 Django 和 Celery 的定时任务示例。你可以通过 Celery 的定时任务功能来调度你的任务,并在视图函数中触发它们。
阅读全文