以下是一个kafka消费者代码,请逐行解释一下:“”
时间: 2023-11-22 17:54:48 浏览: 82
```java
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerExample {
private final static String TOPIC = "my-topic";
private final static String BOOTSTRAP_SERVERS ="localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
```
1. `import java.util.Properties;` - 导入Properties类,用于设置Kafka消费者所需的配置属性。
2. `import org.apache.kafka.clients.consumer.*;` - 导入Kafka消费者相关的类。
3. `private final static String TOPIC = "my-topic";` - 定义消费者所要消费的Kafka主题名称。
4. `private final static String BOOTSTRAP_SERVERS ="localhost:9092";` - 定义Kafka集群的地址,即Kafka服务器的IP地址及其对应的端口号。
5. `private final static String GROUP_ID = "test-group";` - 定义消费者所属的消费组ID。
6. `Properties props = new Properties();` - 创建一个Properties对象,用于存放消费者所需的Kafka配置属性。
7. `props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);` - 设置Kafka集群的地址。
8. `props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);` - 设置消费者所属的消费组ID。
9. `props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());` - 设置键的反序列化器。
10. `props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());` - 设置值的反序列化器。
11. `KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);` - 创建一个Kafka消费者实例,并传入Kafka的配置属性。
12. `consumer.subscribe(Collections.singletonList(TOPIC));` - 订阅所要消费的Kafka主题。
13. `while (true) { ... }` - 进入消费消息的循环。
14. `ConsumerRecords<String, String> records = consumer.poll(100);` - 从Kafka服务器中拉取一批消息,最多等待100ms。
15. `for (ConsumerRecord<String, String> record : records) { ... }` - 遍历获取到的消息。
16. `System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());` - 输出消息的偏移量、键和值。
阅读全文