kafka java 测试连通性 kafka,编写java测试代码
时间: 2023-12-28 07:24:50 浏览: 157
以下是一个使用Java编写的Kafka测试代码,用于测试Kafka的连通性:
```java
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
public class KafkaTest {
public static void main(String[] args) {
// Kafka 服务器地址和端口
String bootstrapServers = "localhost:9092";
// Kafka 主题
String topic = "test";
// 生产者配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("group.id", "test-group");
// 创建生产者和消费者
Producer<String, String> producer = new KafkaProducer<>(producerProps);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 发送消息到Kafka
String message = "Hello Kafka!";
producer.send(new ProducerRecord<>(topic, message));
// 从Kafka消费消息
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
// 关闭生产者和消费者
producer.close();
consumer.close();
}
}
```
请注意,你需要在代码中替换`localhost:9092`为你的Kafka服务器地址和端口,并确保已经创建了名为`test`的主题。
阅读全文