java 代码统计kafka topic 每小时的数据量
时间: 2023-08-04 14:33:23 浏览: 131
Kafka使用Java客户端进行访问的示例代码
要统计Kafka Topic每小时的数据量,您可以使用Kafka提供的Java API和时间相关的类来实现。以下是实现的步骤:
1. 创建KafkaConsumer对象并指定要订阅的topic。
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
```
2. 创建一个HashMap来存储每小时的数据量。
```java
HashMap<String, Long> countMap = new HashMap<String, Long>();
```
3. 使用poll()方法从Kafka集群获取消息,并将每条消息的时间戳转换为小时级别的字符串。
```java
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
long timestamp = record.timestamp();
String hour = getHour(timestamp);
if (countMap.containsKey(hour)) {
long count = countMap.get(hour);
countMap.put(hour, count + 1);
} else {
countMap.put(hour, 1L);
}
}
```
其中getHour()方法可以通过时间戳获取小时级别的字符串:
```java
private static String getHour(long timestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
sdf.setTimeZone(TimeZone.getTimeZone("GMT+8"));
Date date = new Date(timestamp);
return sdf.format(date);
}
```
4. 输出每小时的数据量。
```java
for (Map.Entry<String, Long> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
```
5. 关闭KafkaConsumer对象。
```java
consumer.close();
```
完整的示例代码如下:
```java
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaTopicHourlyDataCount {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
HashMap<String, Long> countMap = new HashMap<String, Long>();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
long timestamp = record.timestamp();
String hour = getHour(timestamp);
if (countMap.containsKey(hour)) {
long count = countMap.get(hour);
countMap.put(hour, count + 1);
} else {
countMap.put(hour, 1L);
}
}
for (Map.Entry<String, Long> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
consumer.close();
}
private static String getHour(long timestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
sdf.setTimeZone(TimeZone.getTimeZone("GMT+8"));
Date date = new Date(timestamp);
return sdf.format(date);
}
}
```
注意:这段代码只能统计一小时内的数据量,如果要统计更长的时间段,需要将该代码放在一个无限循环中,并且在每一次循环中进行累加。
阅读全文