写一段python代码,读取一个csv文件,该文件没有表头,第一列为时间编号,编号以1,2,3……这样自然数的形式出现,第二列为充电量数值,将该数据集的前90%划分为训练集,其余部分设置为测试集,利用遗传算法改进的lstm模型预测该时间序列,并打印rmse作为评价指标,并分别绘制训练集的真实值和预测值图片以及测试集的真实值和预测值图片,两张图片的横坐标单位为time/h,纵坐标单位为kwh
时间: 2024-02-29 11:51:13 浏览: 71
好的,以下是代码实现:
```python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Dense, LSTM
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
# 读取数据
df = pd.read_csv('data.csv', header=None)
data = df.iloc[:, 1].values.reshape(-1, 1)
# 划分训练集和测试集
train_size = int(len(data) * 0.9)
train_data = data[:train_size]
test_data = data[train_size:]
# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)
# 定义LSTM模型
def create_model(input_shape):
model = Sequential()
model.add(LSTM(50, input_shape=input_shape))
model.add(Dense(1))
optimizer = Adam(lr=0.001)
model.compile(loss='mean_squared_error', optimizer=optimizer)
return model
# 定义遗传算法参数
pop_size = 30
chromo_size = 50
pc = 0.8
pm = 0.1
max_iter = 50
# 定义适应度函数(RMSE)
def fitness(y_true, y_pred):
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
return 1 / (rmse + 1e-6)
# 定义遗传算法函数
def genetic_algorithm(model, train_data):
input_shape = (train_data.shape[1], 1)
pop = np.random.uniform(-1, 1, size=(pop_size, chromo_size))
fitness_history = []
best_chromo = None
best_fitness = 0
for i in range(max_iter):
# 计算适应度
fitness_values = []
for j in range(pop_size):
weights = pop[j]
model.set_weights(weights.reshape(input_shape[0], input_shape[1]))
y_pred = model.predict(train_data.reshape(-1, input_shape[0], 1))
y_pred = scaler.inverse_transform(y_pred)
fitness_values.append(fitness(train_data[input_shape[0]:], y_pred[:-1]))
if fitness_values[-1] > best_fitness:
best_chromo = weights
best_fitness = fitness_values[-1]
fitness_history.append(best_fitness)
# 选择
selected_pop = []
for j in range(pop_size):
idx = np.random.choice(range(pop_size), size=2, replace=False)
if fitness_values[idx[0]] >= fitness_values[idx[1]]:
selected_pop.append(pop[idx[0]])
else:
selected_pop.append(pop[idx[1]])
# 交叉
for j in range(0, pop_size, 2):
if np.random.rand() < pc:
idx = np.random.choice(range(chromo_size), size=2, replace=False)
selected_pop[j][idx[0]:idx[1]] = selected_pop[j+1][idx[0]:idx[1]]
selected_pop[j+1][idx[0]:idx[1]] = selected_pop[j][idx[0]:idx[1]]
# 变异
for j in range(pop_size):
if np.random.rand() < pm:
idx = np.random.choice(range(chromo_size))
selected_pop[j][idx] = np.random.uniform(-1, 1)
pop = selected_pop
return best_chromo, fitness_history
# 训练模型
input_shape = (train_data.shape[1], 1)
model = create_model(input_shape)
best_chromo, fitness_history = genetic_algorithm(model, train_data)
model.set_weights(best_chromo.reshape(input_shape[0], input_shape[1]))
y_pred_train = model.predict(train_data.reshape(-1, input_shape[0], 1))
y_pred_train = scaler.inverse_transform(y_pred_train)
rmse_train = np.sqrt(mean_squared_error(train_data[input_shape[0]:], y_pred_train[:-1]))
# 预测测试集
y_pred_test = []
last_seq = train_data[-input_shape[0]:]
for i in range(len(test_data)):
pred = model.predict(last_seq.reshape(1, input_shape[0], 1))
y_pred_test.append(pred[0][0])
last_seq = np.concatenate([last_seq[1:], pred], axis=0)
y_pred_test = np.array(y_pred_test).reshape(-1, 1)
y_pred_test = scaler.inverse_transform(y_pred_test)
rmse_test = np.sqrt(mean_squared_error(test_data, y_pred_test))
# 绘制图像
time_train = np.arange(input_shape[0], input_shape[0]+len(train_data), 1)
time_test = np.arange(input_shape[0]+len(train_data), input_shape[0]+len(data), 1)
plt.figure(figsize=(12, 6))
plt.plot(time_train, train_data, label='Actual Train')
plt.plot(time_train, y_pred_train[:-1], label='Predicted Train')
plt.plot(time_test, test_data, label='Actual Test')
plt.plot(time_test, y_pred_test, label='Predicted Test')
plt.xlabel('Time/h')
plt.ylabel('kwh')
plt.title('LSTM Predict')
plt.legend()
plt.show()
print('RMSE of train:', rmse_train)
print('RMSE of test:', rmse_test)
```
需要注意的是,由于遗传算法的随机性,每次运行结果可能不一样。
阅读全文