kafka依赖版本是3.4.0,使用java写出消费组和主题的消费情况
时间: 2024-02-16 12:03:19 浏览: 31
好的,我来回答你的问题。首先,你需要导入 Kafka 的 Java 客户端库依赖,版本与你使用的 Kafka 版本一致。在 Maven 项目中,你可以添加以下依赖:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
```
接着,你可以使用以下代码创建一个消费组,并消费指定主题的消息:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
String topicName = "test-topic";
String groupId = "test-group";
String bootstrapServers = "localhost:9092";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Received message: %s%n", record.value());
});
}
}
}
```
上面的代码创建了一个消费者实例,订阅了名为 `test-topic` 的主题,并不断地从 Kafka 中拉取消息并打印出来。你可以根据需要修改其中的配置参数和消息处理逻辑。