Duration.ofMillis
时间: 2024-06-14 11:07:59 浏览: 9
Duration.ofMillis是Java 8中的一个方法,用于创建一个以毫秒为单位的持续时间。它接受一个long类型的参数,表示要创建的持续时间的毫秒数。该方法返回一个Duration对象,该对象表示指定的持续时间。
例如,如果我们想要创建一个持续时间为500毫秒的Duration对象,我们可以使用以下代码:
Duration duration = Duration.ofMillis(500);
这将创建一个持续时间为500毫秒的Duration对象,并将其分配给变量duration。
Duration类还提供了其他方法来处理持续时间,例如可以使用plusMillis()方法增加指定的毫秒数,使用minusMillis()方法减去指定的毫秒数,以及使用toMillis()方法将持续时间转换为毫秒数等。
相关问题
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));这句代码每部分的含义
这段代码使用 Kafka Consumer API 从 Kafka 集群中消费消息,每次调用 `poll` 方法会从 Kafka 中拉取一批消息,返回一个 `ConsumerRecords` 对象,其中包含了一组 `ConsumerRecord` 对象。每个 `ConsumerRecord` 对象表示一条消息,包含了消息的内容和元数据。
具体来说,这段代码的含义如下:
- `consumer`:Kafka Consumer 对象,用于从 Kafka 集群中消费消息。
- `poll(Duration.ofMillis(1000))`:从 Kafka 集群中拉取消息,等待时间为 1000 毫秒。如果在等待时间内没有拉取到任何消息,则返回空的 `ConsumerRecords` 对象。
- `ConsumerRecords<String, String>`:表示一组消息,其中的泛型参数 `<String, String>` 分别表示消息的键和值的类型,这里都是字符串类型。
- `records`:表示从 Kafka 集群中拉取到的消息,是一个 `ConsumerRecords` 对象。
kafka消费不到数据 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.101:9092"); // Kafka地址 // 每个消费者分配独立的组号 props.put("group.id", "consumer01"); // 如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); // 设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); // 设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); // 指定CURRENT-OFFSET的数值, 自动重置offset, 当读取的 offset 不在当前 kafka topic 范围内,将会从头开始读取 props.put("auto.offset.reset", "earliest"); // latest earliest props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "mrt"; //mrt-java consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
这段代码中只有一次调用了 `consumer.poll()` 方法获取消息,而没有在循环中不断地调用该方法,因此只能消费到一次消息。你需要在一个循环中不断地调用 `consumer.poll()` 方法,以便持续地获取新消息。你可以参考以下代码示例:
```
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
该代码会持续地从 Kafka 中获取消息,并输出每条消息的偏移量、键和值。你可以根据实际需求,对消息做出相应的处理。