能给我写一段配置springboot整合kafka的代码么?
时间: 2023-05-25 18:07:12 浏览: 123
当然可以,请参考下方示例代码。
首先,你需要在你的`pom.xml`文件中添加Kafka的依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.3</version>
</dependency>
```
然后,在你的Spring Boot应用程序中添加Kafka配置类,如下所示:
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
在代码中,我们配置了一个`ProducerFactory`,用于创建发送者实例,以及一个`ConsumerFactory`,用于创建消费者实例。我们还创建了一个Kafka监听器容器工厂,它使用上面定义的`ConsumerFactory`来创建一个消息监听容器。此外,我们为KafkaTemplate添加了一个生产者工厂,使其能够使用。
接下来,在Kafka生产者类中,我们将使用我们定义的`KafkaTemplate`发送消息,如下所示:
```java
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topicName) {
kafkaTemplate.send(topicName, message);
}
}
```
最后,我们在Kafka消费者类中编写实际的消息消费逻辑,如下所示:
```java
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "group_id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
```
在上述示例中,我们使用注释`@KafkaListener`指定要监听的主题和消费者组ID,并在处理接收到的消息的方法中打印该消息。
以上就是整合Spring Boot与Kafka的示例代码,帮助你快速入门。如果需要更详细的解释,可以参考官方文档:https://spring.io/projects/spring-kafka。
阅读全文