java 一个从kafka获取数据并入库的方法如何写成一个多线程方法
时间: 2024-09-29 15:03:47 浏览: 42
在Java中,为了从Kafka消费者角度实现数据的并发处理和入库,你可以编写一个多线程方法来消费消息,并将每个接收到的消息异步地插入数据库。这里是一个简单的示例:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.sql.DataSource; // 假设已导入JDBC依赖
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class KafkaConsumer {
private final ExecutorService executorService;
private final DataSource dataSource;
private final String topicName;
public KafkaConsumer(DataSource dataSource, String topicName) {
this.dataSource = dataSource;
this.topicName = topicName;
this.executorService = Executors.newFixedThreadPool(5); // 创建固定大小的线程池
}
@KafkaListener(topics = {topicName})
public void consumeMessage(ConsumerRecord<String, String> record) {
executorService.submit(() -> processMessage(record)); // 异步提交任务到线程池
}
private void processMessage(ConsumerRecord<String, String> record) {
try (Connection connection = dataSource.getConnection()) {
String messageData = record.value();
// 这里假设我们有一个存储数据的SQL模板
String sql = "INSERT INTO your_table (column1, column2) VALUES (?, ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, record.key());
preparedStatement.setString(2, messageData);
preparedStatement.executeUpdate(); // 插入数据
// 打印日志或者跟踪操作
System.out.println("Inserted record: key - " + record.key() + ", data - " + messageData);
} catch (Exception e) {
// 数据库异常处理
System.err.println("Error inserting message: " + e.getMessage());
}
}
// 关闭线程池以释放资源
@PreDestroy
public void shutdownExecutor() {
executorService.shutdown();
}
}
```
在这个例子中,`@KafkaListener`注解用于监听指定的Kafka主题。每接收到一条消息,就将其提交给`executorService`中的一个新线程去处理。`processMessage`方法负责实际的数据处理,包括数据库连接、SQL插入等操作。
阅读全文