简单的transformer时间序列预测模型代码
时间: 2023-08-01 07:14:31 浏览: 95
基于Transformer模型的时间序列预测python源码(高分项目).zip
5星 · 资源好评率100%
以下是一个简单的Transformer时间序列预测模型的代码实现:
```python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
class TimeSeriesDataset(Dataset):
def __init__(self, data, seq_len):
self.data = data
self.seq_len = seq_len
def __len__(self):
return len(self.data) - self.seq_len
def __getitem__(self, idx):
x = torch.tensor(self.data[idx:idx+self.seq_len], dtype=torch.float32)
y = torch.tensor(self.data[idx+self.seq_len], dtype=torch.float32)
return x, y
class TransformerModel(nn.Module):
def __init__(self, input_size, output_size, d_model=256, nhead=8, num_layers=3):
super().__init__()
self.encoder = nn.Linear(input_size, d_model)
self.pos_encoder = nn.Sequential(
nn.Linear(d_model, d_model),
nn.ReLU(),
nn.Linear(d_model, d_model)
)
self.decoder = nn.Sequential(
nn.Linear(d_model, output_size)
)
self.transformer = nn.Transformer(d_model=d_model, nhead=nhead, num_encoder_layers=num_layers, num_decoder_layers=num_layers)
def forward(self, x):
x = self.encoder(x)
x = self.pos_encoder(x)
x = x.permute(1, 0, 2)
output = self.transformer(x, x)
output = output.permute(1, 0, 2)
output = self.decoder(output)
return output.squeeze(-1)
def train(model, train_loader, optimizer, criterion, device):
model.train()
train_loss = 0
for x, y in train_loader:
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
output = model(x)
loss = criterion(output, y)
loss.backward()
optimizer.step()
train_loss += loss.item()
return train_loss / len(train_loader)
def evaluate(model, val_loader, criterion, device):
model.eval()
val_loss = 0
with torch.no_grad():
for x, y in val_loader:
x, y = x.to(device), y.to(device)
output = model(x)
loss = criterion(output, y)
val_loss += loss.item()
return val_loss / len(val_loader)
def predict_future(model, data, seq_len, device, future_len):
model.eval()
predictions = []
with torch.no_grad():
for i in range(future_len):
x = torch.tensor(data[-seq_len:], dtype=torch.float32).unsqueeze(0).to(device)
output = model(x)
predictions.append(output.item())
data.append(output.item())
return predictions
if __name__ == '__main__':
# 超参数
seq_len = 10
input_size = 1
output_size = 1
batch_size = 32
lr = 0.001
epochs = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 数据准备
data = [i * 0.1 for i in range(100)]
train_dataset = TimeSeriesDataset(data[:70], seq_len=seq_len)
val_dataset = TimeSeriesDataset(data[70:80], seq_len=seq_len)
test_dataset = TimeSeriesDataset(data[80:], seq_len=seq_len)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# 模型构建
model = TransformerModel(input_size, output_size).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
# 模型训练
for epoch in range(epochs):
train_loss = train(model, train_loader, optimizer, criterion, device)
val_loss = evaluate(model, val_loader, criterion, device)
print(f'epoch {epoch+1}, train loss: {train_loss:.4f}, val loss: {val_loss:.4f}')
# 测试集预测
test_loss = evaluate(model, test_loader, criterion, device)
print(f'test loss: {test_loss:.4f}')
# 未来预测
future_len = 10
predictions = predict_future(model, data, seq_len, device, future_len)
print(f'predictions: {predictions}')
```
这个模型使用了一个简单的Transformer结构,包括一个encoder和一个decoder。encoder接受输入序列并将其嵌入到一个高维空间中,然后通过一个位置编码器对位置信息进行编码。接下来,这个嵌入和位置编码的结果被输入到decoder中,并生成对下一个时间步的预测。在训练过程中,使用了均方误差损失函数和Adam优化器。在测试阶段,使用了evaluate函数来计算模型在测试集上的损失,并使用predict_future函数来生成未来的预测。
阅读全文