kafka消费不到数据 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.101:9092"); // Kafka地址 // 每个消费者分配独立的组号 props.put("group.id", "consumer01"); // 如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); // 设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); // 设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); // 指定CURRENT-OFFSET的数值, 自动重置offset, 当读取的 offset 不在当前 kafka topic 范围内,将会从头开始读取 props.put("auto.offset.reset", "earliest"); // latest earliest props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "mrt"; //mrt-java consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
时间: 2024-02-03 09:01:57 浏览: 89
这段代码中只有一次调用了 `consumer.poll()` 方法获取消息,而没有在循环中不断地调用该方法,因此只能消费到一次消息。你需要在一个循环中不断地调用 `consumer.poll()` 方法,以便持续地获取新消息。你可以参考以下代码示例:
```
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
该代码会持续地从 Kafka 中获取消息,并输出每条消息的偏移量、键和值。你可以根据实际需求,对消息做出相应的处理。
相关问题
kafka消费当前数据
要消费 Kafka 中的当前数据,你需要使用 Kafka Consumer API。首先,你需要创建一个 Kafka Consumer 实例并配置它所连接的 Kafka 集群的地址。然后,你需要订阅一个或多个 Kafka 主题,以便从这些主题中接收数据。最后,你需要在启动 Consumer 实例后循环调用 poll() 方法来获取数据记录。
以下是一个使用 Java Kafka Consumer API 消费当前数据的示例代码:
```java
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
record.key(), record.value(), record.partition(), record.offset());
});
}
}
}
```
在此示例中,我们创建了一个 Kafka Consumer 实例,并配置它所连接的 Kafka 集群的地址和消费者组 ID。然后,我们订阅了一个名为 "test-topic" 的 Kafka 主题。最后,我们在一个无限循环中调用 poll() 方法来获取 Kafka 中的当前数据,并将每个数据记录的关键字、值、分区和偏移量打印到控制台上。
用new String(Files.readAllBytes(Paths.get("mas.txt")));为什么推不进kafka
使用`new String(Files.readAllBytes(Paths.get("mas.txt")))`读取文件内容并传递给Kafka producer发送消息是可行的。但是,你需要确保在发送之前,Kafka producer已经正确配置和初始化。
下面是一个示例代码,展示了如何使用`new String(Files.readAllBytes(Paths.get("mas.txt")))`将文件内容发送到Kafka:
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
public class KafkaFileProducer {
public static void main(String[] args) {
// Kafka producer配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka producer实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 从文件中读取字符串
String message = new String(Files.readAllBytes(Paths.get("mas.txt")));
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", message);
producer.send(record);
System.out.println("Message sent successfully");
} catch (IOException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
```
上述代码使用了Java的`Files.readAllBytes()`方法来读取文件内容,并将其作为字符串传递给Kafka producer发送消息。`bootstrap.servers`属性指定了Kafka集群的地址,`key.serializer`和`value.serializer`属性指定了消息的键和值的序列化方式。
请确保在使用该代码之前,你已经提供了正确的Kafka集群地址,并且已经添加了相关依赖,如Kafka的客户端库。
阅读全文