springmvc项目集成kaflka代码示例
时间: 2023-07-12 21:14:40 浏览: 156
好的,下面是一个简单的 Spring MVC 项目集成 Kafka 的示例代码:
1. 添加 Maven 依赖
在 pom.xml 文件中添加以下 Maven 依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
```
2. 配置 Kafka 生产者
在 Spring MVC 的配置文件中添加以下 Kafka 生产者的配置:
```xml
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<props>
<prop key="bootstrap.servers">localhost:9092</prop>
<prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
<prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
</props>
</constructor-arg>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
</bean>
```
其中,`bootstrap.servers` 属性指定 Kafka 服务器地址和端口,`key.serializer` 和 `value.serializer` 属性指定消息的序列化方式。
3. 编写 Kafka 生产者代码
在 Spring MVC 的控制器中注入 KafkaTemplate 对象,并编写发送消息的代码:
```java
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(String message) {
kafkaTemplate.send("test_topic", message);
return "Message sent successfully";
}
```
其中,`test_topic` 是 Kafka 中的主题名称,`message` 是要发送的消息内容。
4. 配置 Kafka 消费者
在 Spring MVC 的配置文件中添加以下 Kafka 消费者的配置:
```xml
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<props>
<prop key="bootstrap.servers">localhost:9092</prop>
<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
<prop key="group.id">test_group</prop>
<prop key="auto.offset.reset">earliest</prop>
</props>
</constructor-arg>
</bean>
<bean id="kafkaListenerContainerFactory" class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
<property name="consumerFactory" ref="consumerFactory" />
<property name="autoStartup" value="true" />
</bean>
```
其中,`bootstrap.servers` 属性指定 Kafka 服务器地址和端口,`key.deserializer` 和 `value.deserializer` 属性指定消息的反序列化方式,`group.id` 属性指定消费者组的名称,`auto.offset.reset` 属性指定消费者从哪个偏移量开始消费消息。
5. 编写 Kafka 消费者代码
在 Spring MVC 的控制器中定义 Kafka 消费者的监听方法,并使用 `@KafkaListener` 注解指定监听的主题名称:
```java
@KafkaListener(topics = "test_topic")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
```
当 Kafka 中有新的消息到达时,`receiveMessage` 方法会自动被调用,并打印出接收到的消息内容。
以上就是一个简单的 Spring MVC 项目集成 Kafka 的示例代码,希望对你有帮助!
阅读全文