怎样设计celery任务在调用 revoke时记录任务状态
时间: 2023-03-02 12:14:44 浏览: 244
你可以通过在任务函数中手动记录任务状态并在 revoke 时进行更新。可以使用数据库(如 MySQL、PostgreSQL 或 MongoDB)存储任务状态信息,然后在 revoke 时对其进行更新。下面是一个示例代码:
```
from celery import Celery
import MySQLdb
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
# 连接数据库并记录任务状态
conn = MySQLdb.connect(user='root', passwd='root', host='localhost', db='test')
cursor = conn.cursor()
cursor.execute("UPDATE tasks SET status='revoked' WHERE task_id=%s", task.request.id)
conn.commit()
conn.close()
return x + y
```
在上面的代码中,任务状态被记录在一个名为 "tasks" 的 MySQL 数据表中,该表具有一个 "status" 字段,用于记录任务的状态。在 revoke 时,将更新该字段以表示任务已被撤销。
相关问题
django celery 异步任务中 调用 User model
在Django Celery异步任务中,如果你需要访问`User`模型,你需要首先确保已将Celery配置与你的Django项目关联,并通过Django ORM进行操作。以下是一个简单的步骤:
1. **安装依赖**:
首先,确保已经安装了`celery`和`django-celery`包。如果还没有,可以运行`pip install celery[redis]`(假设你使用Redis作为消息队列)。
2. **在settings.py中配置**:
在Django项目的`settings.py`文件中,添加`CELERY_BROKER_URL`和`CELERY_RESULT_BACKEND`等Celery设置,并将`djcelery.Celery`应用到你的`INSTALLED_APPS`列表中。
3. **创建任务**:
使用`@app.task`装饰器定义一个异步任务函数,例如:
```python
from django.core.mail import send_mail
from .tasks import app
@app.task
def send_email(user_id):
user = User.objects.get(id=user_id) # 查询用户实例
subject = 'Your custom email subject'
message = f"Hello {user.username}, this is a message sent asynchronously."
send_mail(subject, message, None, [user.email], fail_silently=True)
```
4. **调度任务**:
调用这个任务可以通过直接调用它,或者使用`apply_async`方法,比如:
```python
send_email.apply_async(args=[user.id], queue='high') # 根据需求选择合适的队列
```
5. **处理异常**:
异步任务可能会遇到各种错误,记得处理可能的异常并记录日志。
JAVA 通过rabbitMQ调用celery 任务的代码示例
以下是通过 RabbitMQ 调用 Celery 任务的 Java 代码示例:
```java
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RabbitMQClient {
private final static String QUEUE_NAME = "celery";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 调用 Celery 任务
CeleryClient.runTask(message);
}
}
}
```
在这个示例中,我们使用 RabbitMQ 的 Java 客户端库创建了一个消费者,等待来自名为 "celery" 的队列的消息。当收到消息时,我们将其传递给 Celery 客户端的 runTask() 方法进行处理。
以下是 Celery 客户端的示例代码:
```java
import io.github.cdimascio.dotenv.Dotenv;
import io.github.cdimascio.dotenv.DotenvBuilder;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class CeleryClient {
private static final Logger logger = LoggerFactory.getLogger(CeleryClient.class);
private static final String TASK_QUEUE_NAME = "celery";
private static final String APP_NAME = "myapp";
private static final String DEFAULT_EXCHANGE = "celery";
private static final String DEFAULT_ROUTING_KEY = "celery";
private static final String DEFAULT_BACKEND = "rpc://";
private static final String DEFAULT_RESULT_EXPIRES = "3600";
private static final String DEFAULT_TASK_TRACK_STARTED = "true";
private static final String DEFAULT_TASK_TIME_LIMIT = "600";
private static final String DEFAULT_TASK_SOFT_TIME_LIMIT = "600";
private static final Dotenv dotenv = new DotenvBuilder().ignoreIfMissing().load();
private static final String CELERY_BROKER_URL = dotenv.get("CELERY_BROKER_URL");
private static final String CELERY_RESULT_BACKEND = dotenv.get("CELERY_RESULT_BACKEND");
public static void runTask(String message) {
try {
JSONObject json = new JSONObject(message);
String taskName = json.getString("task");
JSONObject args = json.getJSONObject("args");
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(CELERY_BROKER_URL);
try (Connection connection = factory.newConnection()) {
try (Channel channel = connection.createChannel()) {
String taskId = UUID.randomUUID().toString();
Map<String, Object> headers = new HashMap<>();
headers.put("task", taskName);
headers.put("id", taskId);
headers.put("app", APP_NAME);
headers.put("lang", "java");
headers.put("taskset", "celery");
headers.put("shadow", null);
headers.put("eta", null);
headers.put("expires", DEFAULT_RESULT_EXPIRES);
headers.put("group", null);
headers.put("retries", null);
headers.put("timelimit", new int[]{Integer.parseInt(DEFAULT_TASK_TIME_LIMIT), Integer.parseInt(DEFAULT_TASK_SOFT_TIME_LIMIT)});
headers.put("callbacks", null);
headers.put("errbacks", null);
headers.put("reply_to", null);
headers.put("correlation_id", null);
headers.put("delivery_info", null);
headers.put("kwargsrepr", "{}");
headers.put("utc", true);
channel.basicPublish(DEFAULT_EXCHANGE, DEFAULT_ROUTING_KEY, null, args.toString().getBytes("UTF-8"), headers);
logger.info("Task sent: " + taskName + " - " + taskId);
}
}
} catch (Exception e) {
logger.error("Error while sending task to Celery", e);
}
}
}
```
在这个示例中,我们使用 Celery Java 客户端库创建了一个方法,接收 RabbitMQ 消息并将其作为 Celery 任务发送。我们使用 dotenv 库从环境变量中读取 Celery 的 URL 和结果后端 URL,并使用这些值创建 Celery 连接。
注意,这里的 runTask() 方法只是将消息发送到 Celery,而实际任务的执行是在 Celery 服务器上完成的。因此,需要在 Celery 服务器上配置任务并启动 Celery 工作人员。
阅读全文