java 连接kafka 测试状态
时间: 2023-07-01 22:17:09 浏览: 106
要测试Java连接Kafka的状态,可以编写一个简单的Java程序来连接到Kafka集群并发送/接收消息。以下是一个简单示例:
```java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class KafkaTest {
public static void main(String[] args) {
// 设置生产者属性
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "your.kafka.host:9092");
producerProps.put("acks", "all");
producerProps.put("retries", 0);
producerProps.put("batch.size", 16384);
producerProps.put("linger.ms", 1);
producerProps.put("buffer.memory", 33554432);
producerProps.put("key.serializer", StringSerializer.class.getName());
producerProps.put("value.serializer", StringSerializer.class.getName());
// 发送一条消息到Kafka
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("test_topic", "test_key", "test_value"));
producer.close();
// 设置消费者属性
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "your.kafka.host:9092");
consumerProps.put("group.id", "test_group");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("auto.commit.interval.ms", "1000");
consumerProps.put("key.deserializer", StringDeserializer.class.getName());
consumerProps.put("value.deserializer", StringDeserializer.class.getName());
// 从Kafka消费一条消息
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("test_topic"));
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
consumer.close();
}
}
```
在运行这个程序之前,需要先确保Kafka已经在运行,并且主机名和端口号已经正确地设置。程序会发送一条消息到名为"test_topic"的主题,并从该主题中消费一条消息。如果程序能够正常地发送和接收消息,那么Java连接Kafka的状态就是正常的。
阅读全文