手把手教你用pytorch代码实现Transformer模型
时间: 2023-12-13 10:31:32 浏览: 159
Pytorch 实现Transformer的预训练模型 单独py文件
5星 · 资源好评率100%
以下是手把手教你用PyTorch代码实现Transformer模型的步骤:
1.导入必要的库和模块
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
```
2.定义Transformer模型的编码器和解码器
```python
class Encoder(nn.Module):
def __init__(self, input_dim, hid_dim, n_layers, n_heads, pf_dim, dropout, device):
super().__init__()
self.device = device
self.tok_embedding = nn.Embedding(input_dim, hid_dim)
self.pos_embedding = nn.Embedding(1000, hid_dim)
self.layers = nn.ModuleList([EncoderLayer(hid_dim, n_heads, pf_dim, dropout, device) for _ in range(n_layers)])
self.dropout = nn.Dropout(dropout)
self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
def forward(self, src, src_mask):
# src = [batch size, src len]
# src_mask = [batch size, 1, 1, src len]
batch_size = src.shape[0]
src_len = src.shape[1]
pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
for layer in self.layers:
src = layer(src, src_mask)
return src
class EncoderLayer(nn.Module):
def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
super().__init__()
self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
self.ff_layer_norm = nn.LayerNorm(hid_dim)
self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
self.positionwise_feedforward = PositionwiseFeedforwardLayer(hid_dim, pf_dim, dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, src, src_mask):
# src = [batch size, src len, hid dim]
# src_mask = [batch size, 1, 1, src len]
# self attention
_src, _ = self.self_attention(src, src, src, src_mask)
# dropout, residual connection and layer norm
src = self.self_attn_layer_norm(src + self.dropout(_src))
# positionwise feedforward
_src = self.positionwise_feedforward(src)
# dropout, residual and layer norm
src = self.ff_layer_norm(src + self.dropout(_src))
return src
class MultiHeadAttentionLayer(nn.Module):
def __init__(self, hid_dim, n_heads, dropout, device):
super().__init__()
assert hid_dim % n_heads == 0
self.hid_dim = hid_dim
self.n_heads = n_heads
self.head_dim = hid_dim // n_heads
self.fc_q = nn.Linear(hid_dim, hid_dim)
self.fc_k = nn.Linear(hid_dim, hid_dim)
self.fc_v = nn.Linear(hid_dim, hid_dim)
self.fc_o = nn.Linear(hid_dim, hid_dim)
self.dropout = nn.Dropout(dropout)
self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
def forward(self, query, key, value, mask=None):
batch_size = query.shape[0]
Q = self.fc_q(query)
K = self.fc_k(key)
V = self.fc_v(value)
# Q = [batch size, query len, hid dim]
# K = [batch size, key len, hid dim]
# V = [batch size, value len, hid dim]
Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
# Q = [batch size, n heads, query len, head dim]
# K = [batch size, n heads, key len, head dim]
# V = [batch size, n heads, value len, head dim]
energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
# energy = [batch size, n heads, query len, key len]
if mask is not None:
energy = energy.masked_fill(mask == 0, -1e10)
attention = torch.softmax(energy, dim=-1)
# attention = [batch size, n heads, query len, key len]
x = torch.matmul(self.dropout(attention), V)
# x = [batch size, n heads, query len, head dim]
x = x.permute(0, 2, 1, 3).contiguous()
# x = [batch size, query len, n heads, head dim]
x = x.view(batch_size, -1, self.hid_dim)
# x = [batch size, query len, hid dim]
x = self.fc_o(x)
# x = [batch size, query len, hid dim]
return x, attention
class PositionwiseFeedforwardLayer(nn.Module):
def __init__(self, hid_dim, pf_dim, dropout):
super().__init__()
self.fc_1 = nn.Linear(hid_dim, pf_dim)
self.fc_2 = nn.Linear(pf_dim, hid_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x = [batch size, seq len, hid dim]
x = self.dropout(torch.relu(self.fc_1(x)))
# x = [batch size, seq len, pf dim]
x = self.fc_2(x)
# x = [batch size, seq len, hid dim]
return x
class Decoder(nn.Module):
def __init__(self, output_dim, hid_dim, n_layers, n_heads, pf_dim, dropout, device):
super().__init__()
self.device = device
self.tok_embedding = nn.Embedding(output_dim, hid_dim)
self.pos_embedding = nn.Embedding(1000, hid_dim)
self.layers = nn.ModuleList([DecoderLayer(hid_dim, n_heads, pf_dim, dropout, device) for _ in range(n_layers)])
self.fc_out = nn.Linear(hid_dim, output_dim)
self.dropout = nn.Dropout(dropout)
self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
def forward(self, trg, enc_src, trg_mask, src_mask):
# trg = [batch size, trg len]
# enc_src = [batch size, src len, hid dim]
# trg_mask = [batch size, 1, trg len, trg len]
# src_mask = [batch size, 1, 1, src len]
batch_size = trg.shape[0]
trg_len = trg.shape[1]
pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))
for layer in self.layers:
trg, attention = layer(trg, enc_src, trg_mask, src_mask)
output = self.fc_out(trg)
return output, attention
class DecoderLayer(nn.Module):
def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
super().__init__()
self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
self.enc_attn_layer_norm = nn.LayerNorm(hid_dim)
self.ff_layer_norm = nn.LayerNorm(hid_dim)
self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
self.encoder_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
self.positionwise_feedforward = PositionwiseFeedforwardLayer(hid_dim, pf_dim, dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, trg, enc_src, trg_mask, src_mask):
# trg = [batch size, trg len, hid dim]
# enc_src = [batch size, src len, hid dim]
# trg_mask = [batch size, 1, trg len, trg len]
# src_mask = [batch size, 1, 1, src len]
# self attention
_trg, _ = self.self_attention(trg, trg, trg, trg_mask)
# dropout, residual connection and layer norm
trg = self.self_attn_layer_norm(trg + self.dropout(_trg))
# encoder attention
_trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
# dropout, residual connection and layer norm
trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))
# positionwise feedforward
_trg = self.positionwise_feedforward(trg)
# dropout, residual and layer norm
trg = self.ff_layer_norm(trg + self.dropout(_trg))
return trg, attention
```
3.定义完模型后,我们需要定义一些辅助函数,如下所示:
```python
def get_pad_mask(seq, pad_idx):
return (seq != pad_idx).unsqueeze(-2)
def get_subsequent_mask(seq):
sz_b, len_s = seq.size()
subsequent_mask = (1 - torch.triu(torch.ones((1, len_s, len_s), device=seq.device), diagonal=1)).bool()
return subsequent_mask
def get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
def greedy_decode(model, src, src_mask, max_len, start_symbol):
memory = model.encode(src, src_mask)
ys = torch.ones(1, 1).fill_(start_symbol).type_as(src.data)
for i in range(max_len - 1):
out = model.decode(memory, src_mask, ys, subsequent_mask)
prob = model.generator(out[:, -1])
_, next_word = torch.max(prob, dim=1)
next_word = next_word.item()
ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
if next_word == 2:
break
return ys
```
4.定义完辅助函数后,我们需要定义完整的Transformer模型,如下所示:
```python
class Transformer(nn.Module):
def __init__(self, src_vocab, trg_vocab, hid_dim, n_layers, n_heads, pf_dim, dropout, device, max_length=100):
super().__init__()
self.device = device
self.src_vocab = src_vocab
self.trg_vocab = trg_vocab
self.hid_dim = hid_dim
self.n_layers = n_layers
self.n_heads = n_heads
self.pf_dim = pf_dim
self.dropout = dropout
self.max_length = max_length
self.encoder = Encoder(src_vocab, hid_dim, n_layers, n_heads, pf_dim, dropout, device)
self.decoder = Decoder(trg_vocab, hid_dim, n_layers, n_heads, pf_dim, dropout, device)
self.src_pad_idx = src_vocab.stoi['<pad>']
self.trg_pad_idx = trg_vocab.stoi['<pad>']
self.device = device
def make_src_mask(self, src):
# src = [batch size, src len]
src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
# src_mask = [batch size, 1, 1, src len]
return src_mask
def make_trg_mask(self, trg):
# trg = [batch size, trg len]
trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
# trg_pad_mask = [batch size, 1, 1, trg len]
trg_len = trg.shape[1]
trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=self.device)).bool()
# trg_sub_mask = [trg len, trg len]
trg_mask = trg_pad_mask & trg_sub_mask
# trg_mask = [batch size, 1, trg len, trg len]
return trg_mask
def forward(self, src, trg):
# src = [batch size, src len]
# trg = [batch size, trg len]
src_mask = self.make_src_mask(src)
trg_mask = self.make_trg_mask(trg)
enc_src = self.encoder(src, src_mask)
output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)
return output, attention
def encode(self, src, src_mask):
# src = [batch size, src len]
# src_mask = [batch size, 1, 1, src len]
enc_src = self.encoder(src, src_mask)
return enc_src
def decode(self, memory, src_mask, trg, trg_mask):
# memory = [batch size, src len, hid dim]
# src_mask = [batch size, 1, 1, src len]
# trg = [batch size, trg
阅读全文