kafka springboot 消费者 序列化配置
时间: 2023-07-21 21:39:44 浏览: 229
springboot 基于spring-kafka动态创建kafka消费者
在 Spring Boot 中使用 Kafka 消费者时,需要对消息进行反序列化,将其转换为 Java 对象。Spring Boot 使用的默认序列化器是 ByteArraySerializer,如果你需要使用其他的序列化器,可以通过配置来实现。以下是使用 String 序列化器的示例:
1. 添加依赖
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
```
2. 配置消费者
```
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
3. 创建消费者
```
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "${kafka.consumer.group-id}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
```
在上面的示例中,我们使用了 StringDeserializer 作为键和值的反序列化器,因此在消费者中直接可以使用 String 类型的消息。你可以根据需要选择其他的序列化器,如 JSON 序列化器等。
阅读全文