用java代码实现使用线程池分批次插入一千万条数据到数据库,最好事务控制
时间: 2024-05-06 13:19:56 浏览: 172
下面是用Java代码实现使用线程池分批次插入一千万条数据到数据库,并实现事务控制的示例:
```java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BatchInsertDemo {
// 数据库连接信息
private static final String URL = "jdbc:mysql://localhost:3306/test?useSSL=false";
private static final String USERNAME = "root";
private static final String PASSWORD = "root";
// 每批次插入的数据量
private static final int BATCH_SIZE = 1000;
// 线程池大小
private static final int THREAD_POOL_SIZE = 10;
public static void main(String[] args) {
// 生成一千万条数据
List<Data> dataList = generateData(10000000);
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 创建数据库连接
try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
// 关闭自动提交事务
connection.setAutoCommit(false);
// 创建预处理语句
String sql = "INSERT INTO data (id, name) VALUES (?, ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
// 分批次插入数据
for (int i = 0; i < dataList.size(); i += BATCH_SIZE) {
// 提交一批次数据
executorService.submit(new BatchInsertTask(i, i + BATCH_SIZE, dataList, preparedStatement));
// 等待所有任务完成
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
// 提交事务
connection.commit();
}
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
// 生成指定量的数据
private static List<Data> generateData(int count) {
List<Data> dataList = new ArrayList<>();
for (int i = 1; i <= count; i++) {
dataList.add(new Data(i, "Data " + i));
}
return dataList;
}
// 数据类
private static class Data {
private int id;
private String name;
public Data(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
// 批次插入任务
private static class BatchInsertTask implements Runnable {
private int startIndex;
private int endIndex;
private List<Data> dataList;
private PreparedStatement preparedStatement;
public BatchInsertTask(int startIndex, int endIndex, List<Data> dataList, PreparedStatement preparedStatement) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.dataList = dataList;
this.preparedStatement = preparedStatement;
}
@Override
public void run() {
try {
// 批次插入数据
for (int i = startIndex; i < endIndex && i < dataList.size(); i++) {
Data data = dataList.get(i);
preparedStatement.setInt(1, data.getId());
preparedStatement.setString(2, data.getName());
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
```
上面的示例中,我们首先生成一千万条数据,然后创建一个大小为10的线程池,每次从数据列表中取出1000条数据进行批次插入,并使用线程池中的线程去执行插入任务。每次插入完成后,我们提交事务,等待所有任务完成后再进行下一批次插入。注意在插入数据时,我们需要关闭自动提交事务,并使用预处理语句进行插入,以提高插入性能和安全性。
阅读全文