写一个java消费kafka的程序
时间: 2023-07-01 22:11:10 浏览: 99
好的,以下是一个简单的Java程序,演示如何消费Kafka消息:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class KafkaConsumerExample {
public static void main(String[] args) {
String topicName = "my-topic";
String groupName = "my-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (WakeupException e) {
// ignore
} finally {
consumer.close();
}
}
}
```
在这个例子中,我们创建了一个KafkaConsumer对象,指定了Kafka集群的地址和消费者组名。然后我们订阅了一个主题,使用poll()方法来拉取消息,处理拉取到的消息。如果拉取到的消息为空,则会进行阻塞等待,直到有新的消息到达为止。如果在等待过程中,我们需要退出消费者,则可以使用consumer.wakeup()方法来唤醒poll()方法,抛出WakeupException异常,从而退出消费者。最后,我们需要关闭消费者对象,释放资源。
以上是一个简单的Kafka消息消费程序,实际生产环境中需要根据具体需求进行优化和扩展。
阅读全文