我的数据为excle,有两列,一列是RS,一列是LS,使用时间序列预测,前10个RS和LS预测接下来一个LS,方法使用基于pytorch的BP神经网络 有测试集,训练集,验证集请你给出代码
时间: 2024-03-12 21:43:18 浏览: 109
好的,以下是使用 PyTorch 实现 BP 神经网络进行时间序列预测的代码,包含训练集、验证集和测试集:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
# 自定义数据集类
class TimeSeriesDataset(Dataset):
def __init__(self, rs, ls, input_size):
self.rs = rs
self.ls = ls
self.input_size = input_size
def __len__(self):
return len(self.rs) - self.input_size
def __getitem__(self, idx):
inputs = np.concatenate([self.rs[idx:idx+self.input_size], self.ls[idx:idx+self.input_size]])
labels = self.ls[idx+self.input_size]
return torch.from_numpy(inputs).float(), torch.tensor(labels).float()
# 读取数据
data = pd.read_excel('data.xlsx')
rs = data['RS'].values
ls = data['LS'].values
# 数据归一化
rs_mean = np.mean(rs)
rs_std = np.std(rs)
rs = (rs - rs_mean) / rs_std
ls_mean = np.mean(ls)
ls_std = np.std(ls)
ls = (ls - ls_mean) / ls_std
# 定义超参数
input_size = 10
hidden_size = 64
output_size = 1
learning_rate = 0.001
num_epochs = 1000
batch_size = 32
# 划分训练集、验证集和测试集
train_size = int(len(rs) * 0.6)
val_size = int(len(rs) * 0.2)
test_size = len(rs) - train_size - val_size
train_rs = rs[:train_size]
train_ls = ls[:train_size]
val_rs = rs[train_size:train_size+val_size]
val_ls = ls[train_size:train_size+val_size]
test_rs = rs[train_size+val_size:]
test_ls = ls[train_size+val_size:]
# 创建数据集和数据加载器
train_dataset = TimeSeriesDataset(train_rs, train_ls, input_size)
val_dataset = TimeSeriesDataset(val_rs, val_ls, input_size)
test_dataset = TimeSeriesDataset(test_rs, test_ls, input_size)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 定义模型
class BP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(BP, self).__init__()
self.fc1 = nn.Linear(input_size*2, hidden_size)
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
out = torch.relu(self.fc1(x))
out = self.fc2(out)
return out
model = BP(input_size, hidden_size, output_size)
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 训练模型
best_val_loss = float('inf')
for epoch in range(num_epochs):
# 训练模型
model.train()
train_loss = 0
for inputs, labels in train_loader:
optimizer.zero_grad()
inputs = inputs.view(-1, input_size*2)
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
train_loss /= len(train_dataset)
# 验证模型
model.eval()
val_loss = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs = inputs.view(-1, input_size*2)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
val_loss /= len(val_dataset)
# 记录最好的验证集损失和模型参数
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_params = model.state_dict()
# 打印训练过程中的损失和验证集损失
if (epoch+1) % 100 == 0:
print('Epoch [{}/{}], Train Loss: {:.4f}, Val Loss: {:.4f}'.format(epoch+1, num_epochs, train_loss, val_loss))
# 加载最好的模型参数并测试模型
model.load_state_dict(best_model_params)
model.eval()
test_loss = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.view(-1, input_size*2)
outputs = model(inputs)
loss = criterion(outputs, labels)
test_loss += loss.item() * inputs.size(0)
test_loss /= len(test_dataset)
test_inputs = torch.from_numpy(test_rs).float().view(-1, input_size*2)
predicted_ls = model(test_inputs).detach().numpy().reshape(-1)
predicted_ls = predicted_ls * ls_std + ls_mean
test_ls = test_ls * ls_std + ls_mean
rmse = np.sqrt(np.mean((predicted_ls - test_ls) ** 2))
print('Test Loss: {:.4f}, RMSE: {:.4f}'.format(test_loss, rmse))
```
代码中首先定义了一个自定义数据集类 `TimeSeriesDataset`,用于将数据转换成 PyTorch 的数据集格式,并分别划分训练集、验证集和测试集。然后定义了模型、损失函数和优化器,并开始训练模型。在每个 epoch 中,分别对训练集和验证集进行迭代,计算损失并更新模型。训练完毕后,加载最好的模型参数,并用测试集进行预测,计算 RMSE。
阅读全文