请用pytorch实现transformer并训练
时间: 2023-10-09 13:12:47 浏览: 92
好的,以下是使用PyTorch实现Transformer并进行训练的代码:
首先,我们需要导入必要的库和设置一些超参数:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import random
import math
import time
# 设置超参数
BATCH_SIZE = 64
NUM_EPOCHS = 10
LR = 0.0005
D_MODEL = 512
N_HEADS = 8
N_ENCODER_LAYERS = 6
N_DECODER_LAYERS = 6
D_FF = 2048
DROPOUT = 0.1
```
接下来,我们需要定义Transformer中常用的几个模块,包括Multi-head Attention、Position-wise Feedforward等:
```python
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads, dropout):
super(MultiHeadAttention, self).__init__()
self.d_model = d_model
self.n_heads = n_heads
self.query_linear = nn.Linear(d_model, d_model)
self.key_linear = nn.Linear(d_model, d_model)
self.value_linear = nn.Linear(d_model, d_model)
self.output_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# 计算Q、K、V
Q = self.query_linear(query)
K = self.key_linear(key)
V = self.value_linear(value)
# 将Q、K、V分成多个头
Q = Q.view(batch_size * self.n_heads, -1, self.d_model // self.n_heads)
K = K.view(batch_size * self.n_heads, -1, self.d_model // self.n_heads)
V = V.view(batch_size * self.n_heads, -1, self.d_model // self.n_heads)
# 计算Attention分数
attn_scores = torch.bmm(Q, K.transpose(1, 2))
attn_scores = attn_scores / math.sqrt(self.d_model // self.n_heads)
# 掩码操作
if mask is not None:
mask = mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1)
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
# 计算Attention权重
attn_weights = F.softmax(attn_scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# 计算输出
output = torch.bmm(attn_weights, V)
output = output.view(batch_size, -1, self.n_heads * (self.d_model // self.n_heads))
output = self.output_linear(output)
return output
class PositionwiseFeedforward(nn.Module):
def __init__(self, d_model, d_ff, dropout):
super(PositionwiseFeedforward, self).__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.linear1(x)
x = F.relu(x)
x = self.dropout(x)
x = self.linear2(x)
return x
```
接下来,我们需要定义Transformer的编码器和解码器:
```python
class Encoder(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout, n_layers):
super(Encoder, self).__init__()
self.layers = nn.ModuleList()
for _ in range(n_layers):
self.layers.append(nn.Sequential(
nn.LayerNorm(d_model),
MultiHeadAttention(d_model, n_heads, dropout),
nn.Dropout(dropout),
nn.LayerNorm(d_model),
PositionwiseFeedforward(d_model, d_ff, dropout),
nn.Dropout(dropout)
))
def forward(self, x, mask):
for layer in self.layers:
x = layer(x, x, x, mask)
return x
class Decoder(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout, n_layers):
super(Decoder, self).__init__()
self.layers = nn.ModuleList()
for _ in range(n_layers):
self.layers.append(nn.Sequential(
nn.LayerNorm(d_model),
MultiHeadAttention(d_model, n_heads, dropout),
nn.Dropout(dropout),
nn.LayerNorm(d_model),
MultiHeadAttention(d_model, n_heads, dropout),
nn.Dropout(dropout),
nn.LayerNorm(d_model),
PositionwiseFeedforward(d_model, d_ff, dropout),
nn.Dropout(dropout)
))
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, memory, tgt_mask)
return x
```
最后,我们需要定义完整的Transformer模型:
```python
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, dropout, n_encoder_layers, n_decoder_layers):
super(Transformer, self).__init__()
self.src_embedding = nn.Embedding(src_vocab_size, d_model)
self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.encoder = Encoder(d_model, n_heads, d_ff, dropout, n_encoder_layers)
self.decoder = Decoder(d_model, n_heads, d_ff, dropout, n_decoder_layers)
self.output_linear = nn.Linear(d_model, tgt_vocab_size)
def forward(self, src, tgt, src_mask, tgt_mask):
src_emb = self.src_embedding(src)
tgt_emb = self.tgt_embedding(tgt)
memory = self.encoder(src_emb, src_mask)
output = self.decoder(tgt_emb, memory, src_mask, tgt_mask)
output = self.output_linear(output)
return output
```
接下来,我们需要定义数据集和数据加载器:
```python
class TranslationDataset(Dataset):
def __init__(self, src_data, tgt_data):
self.src_data = src_data
self.tgt_data = tgt_data
def __getitem__(self, i):
return self.src_data[i], self.tgt_data[i]
def __len__(self):
return len(self.src_data)
def collate_fn(batch):
src_batch, tgt_batch = zip(*batch)
src_batch = nn.utils.rnn.pad_sequence(src_batch, batch_first=True)
tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True)
src_mask = (src_batch != 0).unsqueeze(-2)
tgt_mask = (tgt_batch != 0).unsqueeze(-2)
return src_batch, tgt_batch, src_mask, tgt_mask
train_dataset = TranslationDataset(train_src_data, train_tgt_data)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
```
接下来,我们需要定义损失函数和优化器:
```python
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=LR)
```
最后,我们可以开始训练模型了:
```python
for epoch in range(NUM_EPOCHS):
total_loss = 0
start_time = time.time()
for i, (src_batch, tgt_batch, src_mask, tgt_mask) in enumerate(train_loader):
src_batch = src_batch.to(device)
tgt_batch = tgt_batch.to(device)
src_mask = src_mask.to(device)
tgt_mask = tgt_mask.to(device)
optimizer.zero_grad()
output = model(src_batch, tgt_batch[:, :-1], src_mask, tgt_mask[:, :-1])
output = output.contiguous().view(-1, output.size(-1))
tgt_batch = tgt_batch[:, 1:].contiguous().view(-1)
loss = criterion(output, tgt_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (i + 1) % 100 == 0:
print('Epoch [%d/%d], Step [%d/%d], Loss: %.4f' % (epoch+1, NUM_EPOCHS, i+1, len(train_loader), total_loss / (i+1)))
end_time = time.time()
print('Epoch [%d/%d], Loss: %.4f, Time: %.2fs' % (epoch+1, NUM_EPOCHS, total_loss / len(train_loader), end_time - start_time))
```
在训练完成后,我们就可以使用训练好的模型进行推理了,具体方法可以参考这篇文章:[使用Transformer进行翻译](https://www.cnblogs.com/pinard/p/11131803.html)。
阅读全文