java 消费kafka代码
时间: 2023-08-20 12:10:24 浏览: 119
以下是一个简单的Java代码示例,用于消费Kafka消息:
首先,你需要添加Kafka客户端的依赖。你可以在你的项目的pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
```
接下来,你可以使用以下代码来消费Kafka消息:
```java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "your_topic_name";
private static final String BOOTSTRAP_SERVERS = "your_bootstrap_servers";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("Received message: " + record.value());
});
}
}
}
```
请替换`TOPIC_NAME`和`BOOTSTRAP_SERVERS`为你实际的Kafka主题和启动服务器地址。
这个示例创建了一个Kafka消费者,订阅了指定的主题,并持续地从Kafka集群中拉取消息。收到的消息将在控制台上打印出来。
记得在使用完毕后,调用`consumer.close()`来关闭消费者。
阅读全文