Transformer做时间序列预测的代码是什么
时间: 2024-02-18 18:39:03 浏览: 73
基于TCN-Transformer模型的时间序列预测(Python完整源码)
5星 · 资源好评率100%
下面是使用Transformer模型进行时间序列预测的Python代码示例:
首先,我们需要导入必要的库和模块:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
```
然后,我们定义一个Transformer模型类:
```python
class TransformerModel(nn.Module):
def __init__(self, input_dim, output_dim, d_model, nhead, num_layers, dropout=0.5):
super(TransformerModel, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(d_model, dropout)
self.transformer_encoder = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model, nhead, dropout), num_layers)
self.encoder = nn.Linear(input_dim, d_model)
self.decoder = nn.Linear(d_model, output_dim)
self.init_weights()
def init_weights(self):
initrange = 0.1
self.encoder.bias.data.zero_()
self.encoder.weight.data.uniform_(-initrange, initrange)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-initrange, initrange)
def forward(self, src):
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
src = self.encoder(src) * np.sqrt(self.d_model)
src = self.pos_encoder(src)
output = self.transformer_encoder(src, self.src_mask)
output = self.decoder(output)
return output
def _generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
```
这个模型类实现了一个基本的Transformer模型,其中包括一个编码器和一个解码器。编码器将输入的时间序列数据转换为Transformer模型可以处理的形式,解码器将Transformer模型的输出映射回原始的输出空间。这个模型类还包括一个位置编码器(PositionalEncoding)和一个用于生成掩码的函数(_generate_square_subsequent_mask)。
接下来,我们定义一个帮助函数,用于生成模型的输入和输出序列:
```python
def create_sequences(data, seq_length):
xs = []
ys = []
for i in range(len(data)-seq_length-1):
x = data[i:(i+seq_length)]
y = data[i+seq_length]
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)
```
这个函数将原始的时间序列数据转换为一个输入序列和一个输出序列,以供模型训练使用。
接下来,我们读取数据并进行预处理:
```python
# 读取数据
data = np.sin(np.arange(200)*0.1)
# 标准化数据
data = (data - np.mean(data)) / np.std(data)
# 划分训练集和测试集
train_data = data[:150]
test_data = data[150:]
# 创建输入序列和输出序列
seq_length = 10
X_train, y_train = create_sequences(train_data, seq_length)
X_test, y_test = create_sequences(test_data, seq_length)
# 将数据转换为PyTorch张量
X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
```
然后,我们创建模型并进行训练:
```python
# 定义超参数
input_dim = 1
output_dim = 1
d_model = 64
nhead = 4
num_layers = 6
lr = 0.001
batch_size = 16
num_epochs = 100
# 创建模型并定义优化器和损失函数
model = TransformerModel(input_dim, output_dim, d_model, nhead, num_layers).cuda()
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
# 训练模型
train_losses = []
test_losses = []
for epoch in range(num_epochs):
model.train()
train_loss = 0
for i in range(0, len(X_train)-batch_size, batch_size):
optimizer.zero_grad()
output = model(X_train[i:i+batch_size].unsqueeze(2).cuda())
loss = criterion(output.squeeze(), y_train[i:i+batch_size].cuda())
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= (len(X_train) // batch_size)
train_losses.append(train_loss)
model.eval()
test_loss = 0
with torch.no_grad():
for i in range(0, len(X_test)-batch_size, batch_size):
output = model(X_test[i:i+batch_size].unsqueeze(2).cuda())
loss = criterion(output.squeeze(), y_test[i:i+batch_size].cuda())
test_loss += loss.item()
test_loss /= (len(X_test) // batch_size)
test_losses.append(test_loss)
print('Epoch [{}/{}], Train Loss: {:.4f}, Test Loss: {:.4f}'.format(epoch+1, num_epochs, train_loss, test_loss))
# 绘制训练过程中的损失曲线
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.legend()
plt.show()
```
最后,我们可以使用训练好的模型进行预测:
```python
# 使用训练好的模型进行预测
model.eval()
with torch.no_grad():
pred_train = model(X_train.unsqueeze(2).cuda()).squeeze().cpu().numpy()
pred_test = model(X_test.unsqueeze(2).cuda()).squeeze().cpu().numpy()
# 绘制训练集和测试集的真实值和预测值
plt.plot(np.arange(len(train_data)), train_data, label='True Train')
plt.plot(np.arange(seq_length, len(pred_train)+seq_length), pred_train, label='Pred Train')
plt.plot(np.arange(len(train_data), len(train_data)+len(test_data)), test_data, label='True Test')
plt.plot(np.arange(len(train_data)+seq_length, len(train_data)+len(test_data)+seq_length), pred_test, label='Pred Test')
plt.legend()
plt.show()
```
这个示例代码演示了如何使用Transformer模型进行时间序列预测。
阅读全文