使用django_celery_beat实现可前端添加定时任务
时间: 2023-12-10 17:06:00 浏览: 159
使用`django_celery_beat`可以很方便地实现在前端添加定时任务。
首先需要在`settings.py`中配置`django_celery_beat`的数据库信息:
```python
INSTALLED_APPS = [
# ...
'django_celery_beat',
# ...
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
```
在`urls.py`中配置任务添加和删除的路由:
```python
from django.urls import path
from . import views
urlpatterns = [
path('add_task/', views.add_task, name='add_task'),
path('delete_task/<int:task_id>/', views.delete_task, name='delete_task'),
]
```
然后在`views.py`中实现添加和删除任务的方法:
```python
from django.shortcuts import render, redirect
from django.urls import reverse
from django.utils.timezone import localtime
from django_celery_beat.models import PeriodicTask, CrontabSchedule
from datetime import datetime, timedelta
def add_task(request):
if request.method == 'POST':
task_name = request.POST.get('task_name')
task_url = request.POST.get('task_url')
task_time = request.POST.get('task_time')
task_args = request.POST.get('task_args')
task_kwargs = request.POST.get('task_kwargs')
task_enabled = True if request.POST.get('task_enabled') == 'on' else False
crontab, created = CrontabSchedule.objects.get_or_create(
minute=task_time.split(':')[1],
hour=task_time.split(':')[0],
day_of_week='*',
day_of_month='*',
month_of_year='*'
)
periodic_task = PeriodicTask.objects.create(
name=task_name,
task='tasks.run_task',
args=task_args,
kwargs=task_kwargs,
crontab=crontab,
enabled=task_enabled
)
return redirect(reverse('task_list'))
return render(request, 'add_task.html')
def delete_task(request, task_id):
task = PeriodicTask.objects.get(id=task_id)
task.delete()
return redirect(reverse('task_list'))
```
在`add_task.html`模板中添加表单用于添加任务:
```html
<form method="post" action="{% url 'add_task' %}">
{% csrf_token %}
<div class="form-group">
<label for="task_name">Task Name:</label>
<input type="text" class="form-control" id="task_name" name="task_name" required>
</div>
<div class="form-group">
<label for="task_url">Task URL:</label>
<input type="text" class="form-control" id="task_url" name="task_url" required>
</div>
<div class="form-group">
<label for="task_time">Task Time:</label>
<input type="time" class="form-control" id="task_time" name="task_time" required>
</div>
<div class="form-group">
<label for="task_args">Task Args:</label>
<input type="text" class="form-control" id="task_args" name="task_args">
</div>
<div class="form-group">
<label for="task_kwargs">Task Kwargs:</label>
<input type="text" class="form-control" id="task_kwargs" name="task_kwargs">
</div>
<div class="form-check">
<input type="checkbox" class="form-check-input" id="task_enabled" name="task_enabled" checked>
<label class="form-check-label" for="task_enabled">Enabled</label>
</div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
```
最后在`templates`目录下创建`task_list.html`模板用于展示所有定时任务:
```html
{% extends 'base.html' %}
{% block content %}
<h1>Task List</h1>
<table class="table table-striped">
<thead>
<tr>
<th scope="col">#</th>
<th scope="col">Name</th>
<th scope="col">Task</th>
<th scope="col">Args</th>
<th scope="col">KWArgs</th>
<th scope="col">Time</th>
<th scope="col">Enabled</th>
<th scope="col">Action</th>
</tr>
</thead>
<tbody>
{% for task in tasks %}
<tr>
<th scope="row">{{ task.id }}</th>
<td>{{ task.name }}</td>
<td>{{ task.task }}</td>
<td>{{ task.args }}</td>
<td>{{ task.kwargs }}</td>
<td>{{ task.crontab.minute }}:{{ task.crontab.hour }}</td>
<td>{% if task.enabled %}Yes{% else %}No{% endif %}</td>
<td>
<a href="{% url 'delete_task' task.id %}" class="btn btn-danger">Delete</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}
```
在`tasks.py`中实现定时任务的具体操作:
```python
from celery import shared_task
@shared_task
def run_task(url, args=None, kwargs=None):
# do something
pass
```
最后在`views.py`中实现展示所有定时任务的方法:
```python
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask
def task_list(request):
tasks = PeriodicTask.objects.all()
context = {
'tasks': tasks
}
return render(request, 'task_list.html', context)
```
这样就可以在前端添加定时任务了。
阅读全文