使用springboot集成kafka代码模板
时间: 2023-09-16 19:14:02 浏览: 100
kafka stream与interceptor、自定义partition、springboot集成kafka,亲测可用
以下是一个简单的Spring Boot集成Kafka的代码模板:
1. 在pom.xml中添加Kafka的依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>
```
2. 配置Kafka的连接信息和生产者/消费者的属性:
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
enable-auto-commit: false
producer:
acks: all
retries: 0
batch-size: 16384
buffer-memory: 33554432
```
3. 创建一个Kafka生产者:
```java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
```
4. 创建一个Kafka消费者:
```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
```
5. 在应用程序中使用生产者和消费者:
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class MyApp {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MyApp.class, args);
KafkaProducer producer = context.getBean(KafkaProducer.class);
producer.send("my-topic", "Hello, Kafka!");
KafkaConsumer consumer = context.getBean(KafkaConsumer.class);
// The consumer will listen for messages on the "my-topic" topic
}
}
```
这个代码模板可以帮助你快速集成Kafka到你的Spring Boot应用程序中,但请注意,这只是一个简单的示例,实际应用中可能需要更复杂的配置和逻辑。
阅读全文