基于transformer的文本识别在Pycharm中的代码实现
时间: 2024-02-11 08:06:24 浏览: 112
基于Transformer-XL架构的中文文本生成任务设计源码
在Pycharm中实现基于Transformer的文本识别,可以使用Python深度学习框架TensorFlow或PyTorch等。以下是基于PyTorch实现的代码示例:
```
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator
# 定义数据预处理
SRC = Field(tokenize = "spacy",
tokenizer_language='en',
init_token = '<sos>',
eos_token = '<eos>',
lower = True)
TRG = Field(tokenize = "spacy",
tokenizer_language='de',
init_token = '<sos>',
eos_token = '<eos>',
lower = True)
train_data, valid_data, test_data = Multi30k.splits(exts = ('.en', '.de'),
fields = (SRC, TRG))
SRC.build_vocab(train_data, min_freq = 2)
TRG.build_vocab(train_data, min_freq = 2)
# 定义模型
class Transformer(nn.Module):
def __init__(self, input_dim, output_dim, d_model, n_layers, n_heads, pf_dim, dropout, max_length = 100):
super().__init__()
self.input_embedding = nn.Embedding(input_dim, d_model)
self.output_embedding = nn.Embedding(output_dim, d_model)
self.positional_encoding = nn.Embedding(max_length, d_model)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, pf_dim, dropout) for _ in range(n_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, pf_dim, dropout) for _ in range(n_layers)])
self.fc_out = nn.Linear(d_model, output_dim)
self.dropout = nn.Dropout(dropout)
self.scale = torch.sqrt(torch.FloatTensor([d_model])).to(device)
def forward(self, src, trg, src_mask, trg_mask):
batch_size = src.shape[0]
src_len = src.shape[1]
trg_len = trg.shape[1]
position = torch.arange(0, max_len).unsqueeze(0).repeat(batch_size, 1).to(device)
src = self.dropout((self.input_embedding(src) * self.scale) + self.positional_encoding(position))
trg = self.dropout((self.output_embedding(trg) * self.scale) + self.positional_encoding(position))
for layer in self.encoder_layers:
src = layer(src, src_mask)
for layer in self.decoder_layers:
trg = layer(trg, src, trg_mask, src_mask)
output = self.fc_out(trg)
return output
# 定义Encoder层
class EncoderLayer(nn.Module):
def __init__(self, d_model, n_heads, pf_dim, dropout):
super().__init__()
self.self_attention_layer_norm = nn.LayerNorm(d_model)
self.encoder_attention_layer_norm = nn.LayerNorm(d_model)
self.positionwise_feedforward_layer_norm = nn.LayerNorm(d_model)
self.self_attention = nn.MultiheadAttention(d_model, n_heads)
self.encoder_attention = nn.MultiheadAttention(d_model, n_heads)
self.positionwise_feedforward = nn.Sequential(
nn.Linear(d_model, pf_dim),
nn.ReLU(),
nn.Linear(pf_dim, d_model)
)
self.dropout = nn.Dropout(dropout)
def forward(self, src, src_mask):
_src, _ = self.self_attention(src, src, src, src_mask)
src = self.self_attention_layer_norm(src + self.dropout(_src))
_src, _ = self.encoder_attention(src, src, src, src_mask)
src = self.encoder_attention_layer_norm(src + self.dropout(_src))
_src = self.positionwise_feedforward(src)
src = self.positionwise_feedforward_layer_norm(src + self.dropout(_src))
return src
# 定义Decoder层
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_heads, pf_dim, dropout):
super().__init__()
self.self_attention_layer_norm = nn.LayerNorm(d_model)
self.encoder_attention_layer_norm = nn.LayerNorm(d_model)
self.positionwise_feedforward_layer_norm = nn.LayerNorm(d_model)
self.self_attention = nn.MultiheadAttention(d_model, n_heads)
self.encoder_attention = nn.MultiheadAttention(d_model, n_heads)
self.positionwise_feedforward = nn.Sequential(
nn.Linear(d_model, pf_dim),
nn.ReLU(),
nn.Linear(pf_dim, d_model)
)
self.dropout = nn.Dropout(dropout)
def forward(self, trg, enc_src, trg_mask, src_mask):
_trg, _ = self.self_attention(trg, trg, trg, trg_mask)
trg = self.self_attention_layer_norm(trg + self.dropout(_trg))
_trg, _ = self.encoder_attention(trg, enc_src, enc_src, src_mask)
trg = self.encoder_attention_layer_norm(trg + self.dropout(_trg))
_trg = self.positionwise_feedforward(trg)
trg = self.positionwise_feedforward_layer_norm(trg + self.dropout(_trg))
return trg
# 训练模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size = BATCH_SIZE,
device = device)
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
D_MODEL = 256
N_LAYERS = 3
N_HEADS = 8
PF_DIM = 512
DROPOUT = 0.1
model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, N_LAYERS, N_HEADS, PF_DIM, DROPOUT).to(device)
optimizer = optim.Adam(model.parameters())
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)
def train(model, iterator, optimizer, criterion, clip):
model.train()
epoch_loss = 0
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
optimizer.zero_grad()
output = model(src, trg[:,:-1], None, None)
output_dim = output.shape[-1]
output = output.contiguous().view(-1, output_dim)
trg = trg[:,1:].contiguous().view(-1)
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def evaluate(model, iterator, criterion):
model.eval()
epoch_loss = 0
with torch.no_grad():
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
output = model(src, trg[:,:-1], None, None)
output_dim = output.shape[-1]
output = output.contiguous().view(-1, output_dim)
trg = trg[:,1:].contiguous().view(-1)
loss = criterion(output, trg)
epoch_loss += loss.item()
return epoch_loss / len(iterator)
N_EPOCHS = 10
CLIP = 1
for epoch in range(N_EPOCHS):
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iterator, criterion)
print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Val. Loss: {valid_loss:.3f}')
# 测试模型
def translate_sentence(model, sentence, src_field, trg_field, max_len = 50):
model.eval()
if isinstance(sentence, str):
nlp = spacy.load('en_core_web_sm')
tokens = [token.text.lower() for token in nlp(sentence)]
else:
tokens = [token.lower() for token in sentence]
tokens = [src_field.init_token] + tokens + [src_field.eos_token]
src_indexes = [src_field.vocab.stoi[token] for token in tokens]
src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)
src_mask = model.make_src_mask(src_tensor)
with torch.no_grad():
enc_src = model.encoder(src_tensor, src_mask)
trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]
for i in range(max_len):
trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
trg_mask = model.make_trg_mask(trg_tensor)
src_mask = model.make_src_mask(src_tensor)
with torch.no_grad():
output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)
pred_token = output.argmax(1).item()
trg_indexes.append(pred_token)
if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
break
trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]
return trg_tokens[1:], attention
example_idx = 12
src = vars(train_data.examples[example_idx])['src']
trg = vars(train_data.examples[example_idx])['trg']
print(f'src = {src}')
print(f'trg = {trg}')
translation, attention = translate_sentence(model, src, SRC, TRG)
print(f'predicted trg = {translation}')
阅读全文