kafka 消费者 代码java
时间: 2024-03-28 15:33:33 浏览: 71
Java实现Kafka生产者消费者代码实例
5星 · 资源好评率100%
Kafka是一个分布式流处理平台,它具有高吞吐量、可扩展性和容错性等特点。Kafka的消费者是用于从Kafka集群中读取数据的客户端程序。下面是一个简单的Java代码示例,用于创建一个Kafka消费者:
```java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "your_topic_name";
private static final String BOOTSTRAP_SERVERS = "your_bootstrap_servers";
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
```
上述代码中,我们首先配置了消费者的属性,包括Kafka集群的地址、消费者组ID以及键值的反序列化器。然后创建了一个KafkaConsumer实例,并订阅了指定的主题。最后,在一个无限循环中,我们使用poll()方法从Kafka集群中拉取消息,并对每条消息进行处理。
阅读全文