写一个seq2seq加入点积attention机制进行时间序列预测代码用pytorch
时间: 2024-05-09 12:15:26 浏览: 169
好的,以下是一个简单的基于PyTorch实现的Seq2Seq模型,带有点积attention机制,用于时间序列预测:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
embedded = self.dropout(self.embedding(src))
outputs, (hidden, cell) = self.rnn(embedded)
return hidden, cell
class Attention(nn.Module):
def __init__(self, hid_dim):
super().__init__()
self.attn = nn.Linear((hid_dim * 2), hid_dim)
self.v = nn.Linear(hid_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs):
src_len = encoder_outputs.shape[0]
hidden = hidden.repeat(src_len, 1, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
return torch.softmax(attention, dim=0)
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
super().__init__()
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.LSTM((hid_dim * 2) + emb_dim, hid_dim, n_layers, dropout=dropout)
self.out = nn.Linear((hid_dim * 2) + hid_dim + emb_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, cell, encoder_outputs):
input = input.unsqueeze(0)
embedded = self.dropout(self.embedding(input))
a = self.attention(hidden, encoder_outputs)
a = a.unsqueeze(1)
weighted = torch.bmm(a, encoder_outputs.permute(1, 0, 2)).permute(1, 0, 2)
rnn_input = torch.cat((embedded, weighted), dim=2)
output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
output = torch.cat((embedded, hidden, weighted), dim=2)
prediction = self.out(output.squeeze(0))
return prediction, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super().__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = trg.shape[1]
max_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(device)
encoder_outputs, (hidden, cell) = self.encoder(src)
input = trg[0,:]
for t in range(1, max_len):
output, hidden, cell = self.decoder(input, hidden, cell, encoder_outputs)
outputs[t] = output
teacher_force = np.random.random() < teacher_forcing_ratio
top1 = output.argmax(1)
input = trg[t] if teacher_force else top1
return outputs
```
接下来是数据预处理和模型训练的代码:
```python
# 定义一些超参数
INPUT_DIM = 1
OUTPUT_DIM = 1
ENC_EMB_DIM = 32
DEC_EMB_DIM = 32
HID_DIM = 64
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
N_EPOCHS = 50
BATCH_SIZE = 64
LEARNING_RATE = 0.001
TEACHER_FORCING_RATIO = 0.5
# 加载数据
data = np.loadtxt('data.csv', delimiter=',')
train_data = data[:int(len(data) * 0.8)]
test_data = data[int(len(data) * 0.8):]
# 将数据转换为张量
train_data = torch.FloatTensor(train_data).unsqueeze(1)
test_data = torch.FloatTensor(test_data).unsqueeze(1)
# 定义模型
encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
attention = Attention(HID_DIM)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attention)
model = Seq2Seq(encoder, decoder).to(device)
# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.MSELoss()
# 训练模型
for epoch in range(N_EPOCHS):
train_loss = 0
model.train()
for i in range(0, len(train_data) - BATCH_SIZE, BATCH_SIZE):
src = train_data[i:i+BATCH_SIZE, :-1].to(device)
trg = train_data[i:i+BATCH_SIZE, 1:].to(device)
optimizer.zero_grad()
output = model(src, trg, TEACHER_FORCING_RATIO)
loss = criterion(output[1:].view(-1, OUTPUT_DIM), trg[1:].view(-1))
loss.backward()
optimizer.step()
train_loss += loss.item() * BATCH_SIZE
train_loss /= len(train_data)
print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')
# 测试模型
model.eval()
with torch.no_grad():
src = test_data[:-1, :-1].to(device)
trg = test_data[:-1, 1:].to(device)
output = model(src, trg, 0)
test_loss = criterion(output[1:].view(-1, OUTPUT_DIM), trg[1:].view(-1))
print(f'Test Loss: {test_loss:.3f}')
# 绘制预测结果和真实结果的图像
output = output.cpu().numpy()
trg = trg.cpu().numpy()
plt.plot(output.flatten(), label='Predicted')
plt.plot(trg.flatten(), label='Ground Truth')
plt.legend(loc='best')
plt.show()
```
注意,在这个例子中,我们使用了一个简单的时间序列数据集,其中每个样本只包含一个单独的时间步长。在实践中,你可能需要使用更复杂的数据集,并相应地调整模型的输入和输出维度。
阅读全文