consumeMessages()方法中如何从Kafka中拉取消息
时间: 2024-03-17 20:44:56 浏览: 61
在consumeMessages()方法中,你需要使用KafkaConsumer来拉取消息。你可以通过在Spring Boot的配置文件(application.properties或application.yml)中配置Kafka的相关属性,如Kafka的地址、端口、消费者组ID等。
下面是一个示例配置:
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
```
在你的KafkaConsumer类中,你需要使用@Value注解来获取配置文件中的属性值,并使用KafkaConsumer实例来拉取消息。下面是一个示例代码:
```java
@Service
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
private KafkaConsumer<String, String> kafkaConsumer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList("test_topic"));
}
@KafkaListener(topics = "test_topic", groupId = "my-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
@Scheduled(fixedDelay = 5000)
public void consumeMessages() {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
```
上述代码中,我们使用@Value注解获取配置文件中的属性值,并在init()方法中创建KafkaConsumer实例。在consumeMessages()方法中,我们使用kafkaConsumer.poll()方法来拉取消息,并使用for循环遍历ConsumerRecords并打印出消息。
希望这能帮助你理解如何从Kafka中拉取消息。
阅读全文