LSTM股票预测使用麻雀搜索算法优化参数代码示例
时间: 2023-10-24 08:04:33 浏览: 81
下面是使用麻雀搜索算法优化LSTM股票预测模型参数的示例代码:
```python
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
# 定义LSTM模型
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size, dropout):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = dropout
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(1), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(1), self.hidden_size).to(x.device)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[-1, :, :])
return out
# 数据预处理
def prepare_data(data, seq_len):
data = np.array(data)
scaler = MinMaxScaler()
data = scaler.fit_transform(data.reshape(-1, 1)).reshape(-1)
x = []
y = []
for i in range(len(data) - seq_len):
x.append(data[i:i+seq_len])
y.append(data[i+seq_len])
x = np.array(x).reshape(-1, seq_len, 1)
y = np.array(y).reshape(-1, 1)
return x, y, scaler
# 训练模型
def train_model(x_train, y_train, x_val, y_val, params):
input_size = 1
hidden_size = params['hidden_size']
num_layers = params['num_layers']
output_size = 1
dropout = params['dropout']
learning_rate = params['learning_rate']
batch_size = params['batch_size']
num_epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTM(input_size, hidden_size, num_layers, output_size, dropout).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_loss = []
val_loss = []
for epoch in range(num_epochs):
model.train()
for i in range(0, len(x_train), batch_size):
x_batch = torch.FloatTensor(x_train[i:i+batch_size]).to(device)
y_batch = torch.FloatTensor(y_train[i:i+batch_size]).to(device)
optimizer.zero_grad()
output = model(x_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
train_loss.append(loss.item())
model.eval()
with torch.no_grad():
x_val_batch = torch.FloatTensor(x_val).to(device)
y_val_batch = torch.FloatTensor(y_val).to(device)
val_output = model(x_val_batch)
val_loss.append(criterion(val_output, y_val_batch).item())
return model, train_loss, val_loss
# 预测并计算适应度
def predict_and_fitness(model, x_test, y_test, scaler):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.eval()
with torch.no_grad():
x_test_batch = torch.FloatTensor(x_test).to(device)
y_test_batch = torch.FloatTensor(y_test).to(device)
output = model(x_test_batch)
y_pred = scaler.inverse_transform(output.cpu().numpy())
y_true = scaler.inverse_transform(y_test_batch.cpu().numpy())
fitness = 1 / np.sqrt(mean_squared_error(y_true, y_pred))
return fitness
# 麻雀搜索算法
def sparrow_search(data, seq_len):
# 定义超参数搜索空间
hidden_size_list = [16, 32, 64, 128]
num_layers_list = [1, 2, 3]
dropout_list = [0, 0.1, 0.2, 0.3]
learning_rate_list = [0.001, 0.01, 0.1]
batch_size_list = [32, 64, 128]
# 定义搜索次数和停止条件
max_iter = 100
stop_fitness = 0.95
# 随机初始化超参数组合
best_params = {}
best_fitness = 0
for param in ['hidden_size', 'num_layers', 'dropout', 'learning_rate', 'batch_size']:
best_params[param] = random.choice(eval(param+'_list'))
# 开始搜索
for i in range(max_iter):
# 划分训练集、验证集和测试集
train_size = int(len(data) * 0.7)
val_size = int(len(data) * 0.2)
test_size = len(data) - train_size - val_size
train_data = data[:train_size]
val_data = data[train_size:train_size+val_size]
test_data = data[train_size+val_size:]
# 数据预处理
x_train, y_train, scaler = prepare_data(train_data, seq_len)
x_val, y_val, _ = prepare_data(val_data, seq_len)
x_test, y_test, _ = prepare_data(test_data, seq_len)
# 训练模型并计算适应度
model, train_loss, val_loss = train_model(x_train, y_train, x_val, y_val, best_params)
fitness = predict_and_fitness(model, x_test, y_test, scaler)
# 更新最优超参数组合
if fitness > best_fitness:
best_fitness = fitness
print('Iteration %d, best fitness: %.4f' % (i+1, best_fitness))
best_model = model
best_scaler = scaler
# 判断是否达到停止条件
if best_fitness >= stop_fitness:
print('Search stopped, best fitness: %.4f' % best_fitness)
break
# 随机选取一组超参数组合,并更新搜索空间
new_params = {}
for param in ['hidden_size', 'num_layers', 'dropout', 'learning_rate', 'batch_size']:
new_params[param] = random.choice(eval(param+'_list'))
if new_params[param] != best_params[param]:
eval(param+'_list').append(best_params[param])
eval(param+'_list').remove(new_params[param])
best_params = new_params
# 在测试集上输出预测结果和真实值
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
best_model.eval()
with torch.no_grad():
x_test_batch = torch.FloatTensor(x_test).to(device)
y_test_batch = torch.FloatTensor(y_test).to(device)
output = best_model(x_test_batch)
y_pred = best_scaler.inverse_transform(output.cpu().numpy())
y_true = best_scaler.inverse_transform(y_test_batch.cpu().numpy())
result = pd.DataFrame({'y_true': y_true.reshape(-1), 'y_pred': y_pred.reshape(-1)})
print(result)
return best_model, best_scaler
```
其中,`data`为股票价格序列,`seq_len`为序列长度。在`sparrow_search`函数中,先根据超参数搜索空间定义初始的超参数组合,然后不断使用麻雀搜索算法在搜索空间中随机采样一组超参数组合,并训练LSTM模型。每次训练后,在测试集上计算适应度,并更新最优超参数组合。如果达到停止条件,则停止搜索,返回最优的LSTM模型和数据归一化器。最后,在测试集上输出预测结果和真实值。
阅读全文