kafka消费者代码spring
时间: 2023-08-28 16:08:08 浏览: 116
spring-kafka源代码
5星 · 资源好评率100%
以下是使用 Spring Kafka 实现的 Kafka 消费者代码示例:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic.name}")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received message: " + record.value());
// TODO: 处理接收到的消息
}
}
```
其中 `@KafkaListener` 注解用于定义监听的 Kafka 主题,`${kafka.topic.name}` 可以从配置文件中读取,也可以直接写死。
你需要在 Spring 的配置文件中定义 Kafka 消费者的相关配置,例如:
```properties
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.bootstrap-servers=localhost:9092
kafka.topic.name=my-topic
```
其中 `group-id` 用于定义消费者组的名称,`auto-offset-reset` 用于控制消费者从哪个偏移量开始消费消息,`key-deserializer` 和 `value-deserializer` 分别是键和值的反序列化器,`bootstrap-servers` 是 Kafka 的服务器地址和端口号。
这样,当有消息发送到 `my-topic` 主题时,`listen()` 方法就会被自动调用,从而实现对消息的消费。
阅读全文