编写SpringBoot程序,实现多消费者从Kafka中同一Topic主题中批量读取数据,然后将数据分批写入到mysql数据库的功能。要求:1.可以从数据库连接池中获取数据库连接,避免数据库连接频繁创建销毁带来的开销影响数据插入效率;2.实现启动三个消费者同时消费同一Topic。
时间: 2024-02-11 10:04:17 浏览: 248
IDEA搭建一个SpringBoot项目——十分详细(web+mysql)(csdn)————程序.pdf
好的,下面是一个简单的实现方式:
首先,在pom.xml中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
```
然后,在application.yml中添加以下配置:
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092 #kafka服务器地址
consumer:
group-id: my-group #消费者组ID
auto-offset-reset: earliest #从最早的消息开始消费
max-poll-records: 1000 #每次拉取的最大消息数
listener:
concurrency: 3 #设置3个消费者实例
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&allowPublicKeyRetrieval=true
username: root
password: 123456
hikari:
maximum-pool-size: 5 #连接池最大连接数
```
然后,编写Kafka消息监听器:
```java
@Component
public class KafkaListener {
private final Logger logger = LoggerFactory.getLogger(KafkaListener.class);
private final ExecutorService executorService = Executors.newFixedThreadPool(3); //线程池
@Autowired
private DataSource dataSource;
@KafkaListener(topics = "test-topic")
public void onMessage(List<ConsumerRecord<String, String>> records) {
logger.info("Received {} messages", records.size());
for (List<ConsumerRecord<String, String>> partitionRecords : partition(records)) { //将消息按照分区分批处理
executorService.submit(() -> {
try (Connection connection = dataSource.getConnection()) { //从连接池中获取连接
connection.setAutoCommit(false);
try (PreparedStatement statement = connection.prepareStatement("INSERT INTO test_table(id, name) VALUES (?, ?)")) {
for (ConsumerRecord<String, String> record : partitionRecords) {
statement.setString(1, record.key());
statement.setString(2, record.value());
statement.addBatch();
}
statement.executeBatch();
connection.commit();
}
} catch (SQLException e) {
logger.error("Error writing messages to database", e);
}
});
}
}
private List<List<ConsumerRecord<String, String>>> partition(List<ConsumerRecord<String, String>> records) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> partitions = records.stream().collect(Collectors.groupingBy(record -> new TopicPartition(record.topic(), record.partition())));
return new ArrayList<>(partitions.values());
}
}
```
最后,编写启动类:
```java
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&allowPublicKeyRetrieval=true");
config.setUsername("root");
config.setPassword("123456");
config.setMaximumPoolSize(5);
return new HikariDataSource(config);
}
}
```
这样,就完成了一个多消费者从Kafka中批量读取数据,然后将数据分批写入到mysql数据库的功能,且可以从数据库连接池中获取数据库连接,避免数据库连接频繁创建销毁带来的开销影响数据插入效率,并且实现了启动三个消费者同时消费同一Topic。
阅读全文