java连接kafka
时间: 2023-12-16 19:28:38 浏览: 81
以下是使用Java连接Kafka的示例代码:
1. 首先,您需要在项目的pom.xml文件中添加Kafka的依赖:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
```
2. 接下来,您可以使用以下Java代码来创建Kafka生产者和消费者:
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
// 创建Kafka生产者
public class KafkaProducerExample {
public static void main(String[] args) {
String topicName = "test-topic";
String key = "key1";
String value = "value1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
producer.close();
}
}
// 创建Kafka消费者
public class KafkaConsumerExample {
public static void main(String[] args) {
String topicName = "test-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
```
这些代码示例演示了如何使用Java连接Kafka,创建生产者和消费者,并发送/接收消息。您可以根据自己的需求进行相应的修改和扩展。
阅读全文