java实现消费kafka数据 批量插入clickhouse
时间: 2023-08-26 15:07:04 浏览: 214
golang接受 kafka 日志数据 格式转化后 保存到clickhouse 批量 高速 结构化日志保存
以下是使用Java实现消费Kafka数据并批量插入ClickHouse的示例代码:
```
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.*;
import ru.yandex.clickhouse.*;
import ru.yandex.clickhouse.settings.*;
import ru.yandex.clickhouse.util.*;
public class KafkaClickHouseConsumer {
private static final String KAFKA_TOPIC = "test";
private static final String KAFKA_BROKER = "localhost:9092";
private static final String CLICKHOUSE_URL = "jdbc:clickhouse://localhost:8123/test";
private static final String CLICKHOUSE_USER = "default";
private static final String CLICKHOUSE_PASSWORD = "";
private static final String CLICKHOUSE_TABLE = "test";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKER);
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(KAFKA_TOPIC));
ClickHouseDataSource dataSource = new ClickHouseDataSource(CLICKHOUSE_URL, new ClickHouseProperties());
try (ClickHouseConnection conn = dataSource.getConnection(CLICKHOUSE_USER, CLICKHOUSE_PASSWORD)) {
conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + CLICKHOUSE_TABLE + " (id Int32, name String)");
conn.createStatement().execute("ALTER TABLE " + CLICKHOUSE_TABLE + " DELETE WHERE 1=1");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() == 0) {
continue;
}
List<ClickHouseRowBinaryStream> streams = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
int id = Integer.parseInt(record.key());
String name = record.value();
Object[] row = new Object[] { id, name };
ClickHouseRowBinaryStream stream = new ClickHouseRowBinaryStreamImpl(new ClickHouseColumnTypes.Object[] {
ClickHouseColumnTypes.Int32, ClickHouseColumnTypes.String }, new ClickHouseProperties());
stream.writeRow(row);
streams.add(stream);
}
try (ClickHousePreparedStatement statement = conn.prepareStatement("INSERT INTO " + CLICKHOUSE_TABLE + " VALUES (?, ?)")) {
statement.setStreams(streams);
statement.executeBatch();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
```
代码的实现过程如下:
1. 配置Kafka消费者的属性,包括Kafka主题、Kafka代理、自动提交偏移量间隔等等。
2. 创建Kafka消费者并订阅主题。
3. 创建ClickHouse数据源并获取连接。
4. 创建ClickHouse表并清空所有数据。
5. 循环处理Kafka记录,将每个记录转换为ClickHouse行二进制流。
6. 使用ClickHouse预处理语句批量插入数据。
7. 处理异常并关闭Kafka消费者。
需要注意的是,此示例使用了ClickHouse JDBC驱动程序和ClickHouse JDBC行二进制流实现批量插入。您需要将以下依赖项添加到您的项目中:
```
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc-shaded</artifactId>
<version>0.2.4</version>
</dependency>
```
此外,您需要根据实际情况修改示例代码中的常量,例如Kafka主题、Kafka代理、ClickHouse URL、ClickHouse用户和密码以及ClickHouse表名。
阅读全文