rabiit mq 消费怎么写
时间: 2024-05-03 07:23:20 浏览: 6
RabbitMQ消费的代码示例可以参考以下Python代码:
```python
import pika
# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='hello')
# 定义消息处理函数
def callback(ch, method, properties, body):
print("Received %r" % body)
# 消费消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
在上述代码中,我们首先连接了RabbitMQ服务器,然后声明了一个名为“hello”的队列。接下来,我们定义了一个名为“callback”的函数,用于处理收到的消息。最后,我们通过调用basic_consume()方法开始消费消息。
需要注意的是,这里的auto_ack参数设置为True,表示消费者会自动确认收到的消息。如果设置为False,则需要在处理完消息后手动确认消息。
相关问题
rabiit mq 消费的java怎么写
下面是一个简单的 RabbitMQ 消费者示例,使用 Java 编写:
```
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置 RabbitMQ 服务器地址
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明要消费的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 开始消费消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
```
这个消费者示例将连接到本地的 RabbitMQ 服务器,并从名为 "hello" 的队列中读取消息。处理收到的消息的方法是通过覆盖 Consumer 接口中的 handleDelivery() 方法来实现的。最后,调用 basicConsume() 方法开始消费消息。
Java代码mq消费数据写到clickhouse
下面是一个简单的 Java 代码示例,用于将消息队列中的数据消费并写入 ClickHouse 数据库中:
```Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumer {
private static final String CLICKHOUSE_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
private static final String CLICKHOUSE_DB_URL = "jdbc:clickhouse://localhost:8123/default";
public static void main(String[] args) throws Exception {
// 设置 RocketMQ 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("Topic", "*");
// 注册 RocketMQ 消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
java.util.List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 将消息转换成 JSON 对象
String jsonString = new String(msg.getBody());
JSONObject json = JSON.parseObject(jsonString);
// 构造 ClickHouse SQL 语句
String sql = "INSERT INTO mytable (field1, field2, field3) VALUES (?, ?, ?)";
// 获取 ClickHouse 连接
try (Connection conn = DriverManager.getConnection(CLICKHOUSE_DB_URL);
PreparedStatement pstmt = conn.prepareStatement(sql)) {
// 设置 SQL 参数
pstmt.setString(1, json.getString("field1"));
pstmt.setInt(2, json.getIntValue("field2"));
pstmt.setDouble(3, json.getDoubleValue("field3"));
// 执行 SQL 语句
pstmt.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动 RocketMQ 消费者
consumer.start();
System.out.println("Consumer started");
}
}
```
在这个示例中,我们使用了 RocketMQ 的 DefaultMQPushConsumer 类来消费消息,然后将消息转换成 JSON 对象,并构造 ClickHouse 的 SQL 语句。接下来,我们获取 ClickHouse 的连接,并使用 PreparedStatement 对象设置 SQL 参数并执行 SQL 语句。最后,我们返回消费状态并启动消费者。
需要注意的是,在使用 ClickHouse JDBC 驱动程序时,需要将 ClickHouse JDBC 驱动程序添加到类路径中。可以从 ClickHouse 官网下载 ClickHouse JDBC 驱动程序,并将其添加到项目的 classpath 下。