写一段kafka消费端代码
时间: 2023-05-23 10:05:37 浏览: 111
A:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// Kafka服务地址
String bootstrapServers = "localhost:9092";
// 消费组ID
String groupId = "test-group";
// 消费者配置信息
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("test-topic"));
// 消费数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
});
}
}
}
```
这段代码使用了Kafka的Java客户端API,创建了一个消费者对象,并订阅了一个名为"test-topic"的主题。然后,使用`poll()`方法从Kafka集群获取数据,打印出每条消息的offset、key和value。注意到在这个代码中,我们没有提交offset,因此每次重启消费者,它都会从最新的offset开始消费消息。如果你想要手动提交offset,可以使用`commitSync()`或`commitAsync()`方法。
阅读全文