kafka已生产消息查看
时间: 2023-07-28 11:10:17 浏览: 45
你可以通过消费者来查看Kafka中已经生产的消息。首先,启动一个消费者,命令如下:
```bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic-name> --from-beginning
```
其中,`<topic-name>`是你要查看的topic名称。执行这个命令后,就可以看到Kafka中已经生产的消息了。
相关问题
作为Kafka生产者如何查看已发送的消息
作为Kafka生产者,你可以通过设置回调函数来获取消息发送的结果。回调函数会在消息发送成功或者失败时被调用。
下面是一个示例代码:
```java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "test";
String key = "key1";
String value = "value1";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Failed to send message to Kafka: " + exception.getMessage());
} else {
System.out.printf("Message sent to topic=%s, partition=%d, offset=%d\n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
producer.close();
}
}
```
当消息发送成功时,`onCompletion()`方法会被调用,并输出消息发送的信息,包括topic、partition和offset。如果消息发送失败,则会输出错误信息。通过这种方式,你可以查看已经成功发送的消息。
kafka查看实时消息
要查看Kafka的实时消息,可以使用以下命令:
1. 启动Kafka消费者组:
```
bin/kafka-console-consumer.sh --bootstrap-server <kafka-broker>:<port> --topic <topic-name> --from-beginning
```
这将启动一个Kafka消费者组,它将开始从指定的主题中读取消息。使用`--from-beginning`选项可以从主题的开头开始读取消息。
2. 发送消息到主题:
```
bin/kafka-console-producer.sh --broker-list <kafka-broker>:<port> --topic <topic-name>
```
这将启动一个Kafka生产者,它将等待用户输入消息。一旦输入了消息,它将发送到指定的主题中。
3. 查看实时消息:
在生产者发送消息后,您可以在消费者组中看到实时消息。每当生产者发送消息时,消费者组会接收到该消息并将其打印到控制台上。
注意:要查看实时消息,您需要同时运行消费者组和生产者。