pytorch实现机器翻译
时间: 2023-12-13 21:34:12 浏览: 228
以下是使用PyTorch实现机器翻译的步骤:
1.读取和预处理数据
2.定义编码器和解码器模型
3.定义注意力机制
4.定义损失函数和优化器
5.训练模型
6.使用模型进行翻译
具体实现细节可以参考以下代码:
```python
import os
import zipfile
import collections
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch import optim
from d2l.data.base import Vocab
import d2l
# 读取和预处理数据
def read_data_nmt():
data_dir = '/home/kesci/input/fraeng6506/fra-eng'
with zipfile.ZipFile(os.path.join(data_dir, 'fra-eng.zip'), 'r') as f:
raw_text = f.read('fra.txt').decode("utf-8")
return raw_text
raw_text = read_data_nmt()
print(raw_text[:100])
def preprocess_nmt(text):
text = text.replace('\u202f', ' ').replace('\xa0', ' ')
no_space = lambda char, prev_char: (
True if char in (',', '!', '.') and prev_char != ' ' else False)
out = [' '+char if i > 0 and no_space(char, text[i-1]) else char
for i, char in enumerate(text.lower())]
return ''.join(out)
text = preprocess_nmt(raw_text)
print(text[:100])
def tokenize_nmt(text, num_examples=None):
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
source, target = tokenize_nmt(text)
print(source[:3], target[:3])
# 建立词典
def build_vocab_nmt(tokens):
tokens = [token for line in tokens for token in line]
return Vocab(tokens, min_freq=3, use_special_tokens=True)
src_vocab = build_vocab_nmt(source)
print(list(src_vocab.token_to_idx.items())[:10])
tgt_vocab = build_vocab_nmt(target)
print(list(tgt_vocab.token_to_idx.items())[:10])
# 将文本转换为数字序列
def encode_nmt(src_tokens, tgt_tokens, src_vocab, tgt_vocab):
src_encoded = [[src_vocab[token] for token in line] for line in src_tokens]
tgt_encoded = [[tgt_vocab[token] for token in line] for line in tgt_tokens]
return src_encoded, tgt_encoded
src_encoded, tgt_encoded = encode_nmt(source, target, src_vocab, tgt_vocab)
print(src_encoded[:3], tgt_encoded[:3])
# 定义编码器和解码器模型
class Encoder(nn.Module):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
drop_prob=0):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.LSTM(embed_size, num_hiddens, num_layers,
dropout=drop_prob, bidirectional=True)
def forward(self, inputs, state=None):
# inputs shape: (batch_size, seq_len)
# outputs shape: (seq_len, batch_size, 2*num_hiddens)
embeddings = self.embedding(inputs)
outputs, state = self.rnn(embeddings.permute([1, 0, 2]), state)
return outputs.permute([1, 0, 2]), state
class Decoder(nn.Module):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
attention_size, drop_prob=0):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.attention = Attention(num_hiddens, attention_size, drop_prob)
self.rnn = nn.LSTM(num_hiddens + embed_size, num_hiddens, num_layers,
dropout=drop_prob)
self.out = nn.Linear(num_hiddens, vocab_size)
def forward(self, cur_input, state, enc_outputs):
# cur_input shape: (batch_size,)
# state: the hidden state of the last time step
# outputs shape: (batch_size, vocab_size)
embeddings = self.embedding(cur_input).unsqueeze(0)
context = self.attention(state[0][-1], enc_outputs)
rnn_input = torch.cat([embeddings, context.unsqueeze(0)], dim=2)
outputs, state = self.rnn(rnn_input, state)
outputs = self.out(outputs).squeeze(0)
return outputs, state
class Attention(nn.Module):
def __init__(self, enc_num_hiddens, dec_num_hiddens, attention_size,
drop_prob=0):
super(Attention, self).__init__()
self.enc_attention = nn.Linear(enc_num_hiddens, attention_size,
bias=False)
self.dec_attention = nn.Linear(dec_num_hiddens, attention_size,
bias=False)
self.combined_attention = nn.Linear(attention_size, 1, bias=True)
self.dropout = nn.Dropout(drop_prob)
def forward(self, dec_state, enc_outputs):
# dec_state shape: (batch_size, dec_num_hiddens)
# enc_outputs shape: (batch_size, seq_len, enc_num_hiddens)
dec_attention = self.dec_attention(dec_state).unsqueeze(1)
enc_attention = self.enc_attention(enc_outputs)
combined_attention = self.combined_attention(torch.tanh(
enc_attention + dec_attention))
attention_weights = F.softmax(combined_attention.squeeze(2), dim=1)
return torch.bmm(attention_weights.unsqueeze(1), enc_outputs).squeeze(1)
# 定义损失函数和优化器
def sequence_mask(X, valid_len, value=0):
maxlen = X.size(1)
mask = torch.arange(maxlen)[None, :] < valid_len[:, None]
X[~mask] = value
return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
def forward(self, pred, target, valid_len):
weights = torch.ones_like(target)
weights = sequence_mask(weights, valid_len).float()
self.reduction = 'none'
output = super(MaskedSoftmaxCELoss, self).forward(pred.transpose(1, 2),
target)
return (output * weights).mean(dim=1)
def train_epoch_ch8(net, data_iter, lr, optimizer, device, use_random_iter):
loss_sum, n = 0.0, 0
for batch in data_iter:
optimizer.zero_grad()
X, X_vlen, Y, Y_vlen = [x.to(device) for x in batch]
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
dec_input = torch.cat([bos, Y[:, :-1]], 1) # Teacher forcing
Y_hat, _ = net(X, dec_input, X_vlen)
loss = MaskedSoftmaxCELoss()(Y_hat, Y, Y_vlen)
loss.sum().backward()
d2l.grad_clipping(net, 1)
num_tokens = Y_vlen.sum()
optimizer.step()
loss_sum += loss.sum().item()
n += num_tokens.item()
return loss_sum / n
def train_ch8(net, train_iter, lr, num_epochs, device, use_random_iter=False):
def init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.LSTM:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[1, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
loss_avg = train_epoch_ch8(net, train_iter, lr, optimizer, device,
use_random_iter)
animator.add(epoch+1, loss_avg)
print(f'epoch {epoch + 1}, loss {loss_avg:.3f}, '
f'time {timer.stop():.1f} sec')
return net
# 训练模型
embed_size, num_hiddens, num_layers = 64, 128, 2
attention_size, drop_prob, lr, batch_size, num_epochs = 10, 0.5, 0.01, 64, 300
train_iter = d2l.load_data_nmt(batch_size, num_examples=1000)
encoder = Encoder(len(src_vocab), embed_size, num_hiddens, num_layers,
drop_prob)
decoder = Decoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
attention_size, drop_prob)
net = d2l.EncoderDecoder(encoder, decoder)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = train_ch8(net, train_iter, lr, num_epochs, device)
# 使用模型进行翻译
def predict_ch8(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device):
src_tokens = src_vocab[src_sentence.lower().split(' ')]
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
enc_X = torch.tensor(src_tokens, dtype=torch.long, device=device)
enc_outputs, enc_state = net.encoder(enc_X.unsqueeze(0),
enc_valid_len)
dec_state = enc_state
dec_X = torch.tensor([tgt_vocab['<bos>']], dtype=torch.long,
device=device).reshape(1, 1)
output_seq = []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state, enc_outputs)
dec_X = Y.argmax(dim=1).reshape(1, 1)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq))
src_sentence = 'They are watching.'
print(predict_ch8(net, src_sentence, src_vocab, tgt_vocab, num_steps=10,
device=device))
--相关问题--:
阅读全文