SpringBoot 集成kafka的例子
时间: 2023-07-11 10:47:31 浏览: 70
好的,下面是一个基于Spring Boot的Kafka应用程序的示例代码。这个示例应用程序包含了一个生产者和一个消费者,用于发送和接收Kafka消息。
首先,我们需要添加以下依赖项到我们的pom.xml文件中:
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
接下来,我们需要配置Kafka相关的bean。在这个示例中,我们使用了KafkaAutoConfiguration和KafkaProperties来配置KafkaTemplate和其他Kafka相关的bean。我们还定义了一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory bean,用于配置消息监听器容器。
```
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> producerConfigProps = new HashMap<>();
producerConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(producerConfigProps);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> consumerConfigProps = new HashMap<>();
consumerConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerConfigProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigProps);
}
}
```
接下来,我们可以定义一个生产者类,用于发送Kafka消息。在这个示例中,我们使用了一个Rest API来触发生产者发送消息的操作。
```
@RestController
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/kafka/send")
public void sendMessage(@RequestParam String message) {
kafkaTemplate.send("test-topic", message);
}
}
```
最后,我们可以定义一个消费者类,用于接收Kafka消息。在这个示例中,我们使用了@KafkaListener注解来声明一个消息监听器方法,用于接收Kafka消息。
```
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
```
这就是一个基于Spring Boot的Kafka应用程序的示例代码。我们可以运行这个示例应用程序,并使用Rest API来触发生产者发送消息的操作,然后在控制台上查看消费者接收到的消息。