KafkaListener接收参数
时间: 2023-10-24 12:27:09 浏览: 92
KafkaListener 是 Spring Kafka 提供的注解,用于指示一个方法是 Kafka 消费者。当消费者接收到消息时,会触发被 @KafkaListener 注解的方法执行。在方法中,可以定义不同类型的参数来接收 Kafka 消息。
常用的参数类型包括:
1. 字符串类型(String):直接通过方法参数接收 Kafka 消息的内容。
```java
@KafkaListener(topics = "myTopic")
public void consumeMessage(String message) {
// 处理 Kafka 消息
System.out.println("Received message: " + message);
}
```
2. 字节数组类型(byte[]):通过字节数组接收 Kafka 消息的内容。
```java
@KafkaListener(topics = "myTopic")
public void consumeMessage(byte[] message) {
// 处理 Kafka 消息
System.out.println("Received message: " + new String(message));
}
```
3. 消息对象类型(自定义类):通过定义一个自定义类来接收 Kafka 消息的内容,需确保消息内容可以反序列化为该类的实例。
```java
@KafkaListener(topics = "myTopic")
public void consumeMessage(MyMessage message) {
// 处理 Kafka 消息
System.out.println("Received message: " + message);
}
```
除了消息内容,还可以通过添加其他参数来接收更多相关信息:
- ConsumerRecord:通过 ConsumerRecord 对象获取消息的详细信息,例如偏移量、分区等。
```java
@KafkaListener(topics = "myTopic")
public void consumeMessage(ConsumerRecord<String, String> record) {
// 处理 Kafka 消息
System.out.println("Received message: " + record.value());
System.out.println("Offset: " + record.offset());
System.out.println("Partition: " + record.partition());
}
```
- Acknowledgment:通过 Acknowledgment 对象手动提交消费确认,用于控制消息的提交。
```java
@KafkaListener(topics = "myTopic")
public void consumeMessage(String message, Acknowledgment acknowledgment) {
// 处理 Kafka 消息
System.out.println("Received message: " + message);
// 手动提交消费确认
acknowledgment.acknowledge();
}
```
以上是常见的 KafkaListener 接收参数的示例,根据具体业务需求,可以选择适合的参数类型来接收 Kafka 消息。
阅读全文