Java代码mq消费数据写到clickhouse
时间: 2023-09-15 13:19:43 浏览: 151
下面是一个简单的 Java 代码示例,用于将消息队列中的数据消费并写入 ClickHouse 数据库中:
```Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumer {
private static final String CLICKHOUSE_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
private static final String CLICKHOUSE_DB_URL = "jdbc:clickhouse://localhost:8123/default";
public static void main(String[] args) throws Exception {
// 设置 RocketMQ 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("Topic", "*");
// 注册 RocketMQ 消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
java.util.List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 将消息转换成 JSON 对象
String jsonString = new String(msg.getBody());
JSONObject json = JSON.parseObject(jsonString);
// 构造 ClickHouse SQL 语句
String sql = "INSERT INTO mytable (field1, field2, field3) VALUES (?, ?, ?)";
// 获取 ClickHouse 连接
try (Connection conn = DriverManager.getConnection(CLICKHOUSE_DB_URL);
PreparedStatement pstmt = conn.prepareStatement(sql)) {
// 设置 SQL 参数
pstmt.setString(1, json.getString("field1"));
pstmt.setInt(2, json.getIntValue("field2"));
pstmt.setDouble(3, json.getDoubleValue("field3"));
// 执行 SQL 语句
pstmt.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动 RocketMQ 消费者
consumer.start();
System.out.println("Consumer started");
}
}
```
在这个示例中,我们使用了 RocketMQ 的 DefaultMQPushConsumer 类来消费消息,然后将消息转换成 JSON 对象,并构造 ClickHouse 的 SQL 语句。接下来,我们获取 ClickHouse 的连接,并使用 PreparedStatement 对象设置 SQL 参数并执行 SQL 语句。最后,我们返回消费状态并启动消费者。
需要注意的是,在使用 ClickHouse JDBC 驱动程序时,需要将 ClickHouse JDBC 驱动程序添加到类路径中。可以从 ClickHouse 官网下载 ClickHouse JDBC 驱动程序,并将其添加到项目的 classpath 下。
阅读全文