用Python写一个代码,要求采用seq 2 seq模型,输入两个语料库英语和印地语,最后实现它们的互译,输入一种语言可以得到对应的另外一种语言
时间: 2024-03-16 21:42:46 浏览: 60
好的,以下是一份基于TensorFlow的seq2seq代码,可以实现英语和印地语的互译。请确保已经安装了TensorFlow和其它必要的依赖库。
```python
import numpy as np
import tensorflow as tf
import os
# 设置超参数
learning_rate = 0.001
batch_size = 64
num_epochs = 10
hidden_size = 256
num_layers = 2
source_vocab_size = 10000 # 英语词汇表大小
target_vocab_size = 10000 # 印地语词汇表大小
# 加载数据
def load_data(path):
with open(path, 'r', encoding='utf-8') as f:
data = f.read().split('\n')
return data
source_data = load_data('english_data.txt')
target_data = load_data('hindi_data.txt')
# 构建词汇表
class Vocab:
def __init__(self):
self.vocab = {}
self.rev_vocab = []
def build_vocab(self, data, max_size):
self.vocab['<PAD>'] = 0
self.vocab['<GO>'] = 1
self.vocab['<EOS>'] = 2
self.vocab['<UNK>'] = 3
for sentence in data:
for word in sentence.split():
if word not in self.vocab:
self.vocab[word] = len(self.vocab)
self.rev_vocab = {v: k for k, v in self.vocab.items()}
if len(self.vocab) > max_size:
self.vocab = {k: v for k, v in sorted(self.vocab.items(), key=lambda x: x[1])[:max_size]}
self.rev_vocab = {v: k for k, v in self.vocab.items()}
def word_to_id(self, word):
return self.vocab.get(word, self.vocab['<UNK>'])
def id_to_word(self, id):
return self.rev_vocab[id]
source_vocab = Vocab()
source_vocab.build_vocab(source_data, source_vocab_size)
target_vocab = Vocab()
target_vocab.build_vocab(target_data, target_vocab_size)
# 构建输入数据
def get_input_data(data, vocab):
input_data = []
for sentence in data:
sentence_ids = [vocab.word_to_id(word) for word in sentence.split()]
input_data.append(sentence_ids)
return input_data
source_input_data = get_input_data(source_data, source_vocab)
target_input_data = get_input_data(target_data, target_vocab)
# 添加特殊符号
def add_special_tokens(data):
new_data = []
for sentence in data:
new_sentence = [source_vocab.vocab['<GO>']] + sentence + [source_vocab.vocab['<EOS>']]
new_data.append(new_sentence)
return new_data
source_input_data = add_special_tokens(source_input_data)
target_input_data = add_special_tokens(target_input_data)
# 将输入数据按长度排序,加速训练
def sort_by_length(input_data, target_data):
data = list(zip(input_data, target_data))
data.sort(key=lambda x: len(x[0]))
input_data, target_data = zip(*data)
return input_data, target_data
source_input_data, target_input_data = sort_by_length(source_input_data, target_input_data)
# 构建模型
class Seq2Seq:
def __init__(self, source_vocab_size, target_vocab_size, hidden_size, num_layers):
self.source_vocab_size = source_vocab_size
self.target_vocab_size = target_vocab_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.encoder_inputs = tf.placeholder(tf.int32, [None, None], name='encoder_inputs')
self.decoder_inputs = tf.placeholder(tf.int32, [None, None], name='decoder_inputs')
self.decoder_targets = tf.placeholder(tf.int32, [None, None], name='decoder_targets')
self.decoder_lengths = tf.placeholder(tf.int32, [None], name='decoder_lengths')
with tf.variable_scope('encoder'):
encoder_embeddings = tf.get_variable('encoder_embeddings', [source_vocab_size, hidden_size])
encoder_inputs_embedded = tf.nn.embedding_lookup(encoder_embeddings, self.encoder_inputs)
encoder_cell = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)
encoder_cell = tf.nn.rnn_cell.MultiRNNCell([encoder_cell] * num_layers)
encoder_outputs, encoder_state = tf.nn.dynamic_rnn(encoder_cell, encoder_inputs_embedded, dtype=tf.float32)
with tf.variable_scope('decoder'):
decoder_embeddings = tf.get_variable('decoder_embeddings', [target_vocab_size, hidden_size])
decoder_inputs_embedded = tf.nn.embedding_lookup(decoder_embeddings, self.decoder_inputs)
decoder_cell = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)
decoder_cell = tf.nn.rnn_cell.MultiRNNCell([decoder_cell] * num_layers)
# 训练时使用teacher forcing
helper = tf.contrib.seq2seq.TrainingHelper(decoder_inputs_embedded, self.decoder_lengths)
decoder = tf.contrib.seq2seq.BasicDecoder(decoder_cell, helper, encoder_state)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(decoder)
self.logits = outputs.rnn_output
# 预测时使用beam search
beam_width = 5
decoder = tf.contrib.seq2seq.BeamSearchDecoder(
cell=decoder_cell,
embedding=decoder_embeddings,
start_tokens=tf.fill([tf.shape(encoder_outputs)[0]], target_vocab.vocab['<GO>']),
end_token=target_vocab.vocab['<EOS>'],
initial_state=tf.contrib.seq2seq.tile_batch(encoder_state, beam_width),
beam_width=beam_width,
output_layer=tf.layers.Dense(target_vocab_size, _reuse=True),
length_penalty_weight=0.0
)
outputs, _, _ = tf.contrib.seq2seq.dynamic_decode(decoder)
self.predictions = outputs.predicted_ids[:, :, 0]
self.loss = tf.contrib.seq2seq.sequence_loss(
self.logits,
self.decoder_targets,
tf.ones([tf.shape(self.decoder_targets)[0], tf.shape(self.decoder_targets)[1]])
)
self.train_op = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)
# 训练模型
tf.reset_default_graph()
model = Seq2Seq(source_vocab_size, target_vocab_size, hidden_size, num_layers)
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for epoch in range(num_epochs):
total_loss = 0
for i in range(0, len(source_input_data), batch_size):
batch_source_input_data = source_input_data[i:i+batch_size]
batch_target_input_data = target_input_data[i:i+batch_size]
batch_source_input_lengths = [len(sentence) for sentence in batch_source_input_data]
batch_target_input_lengths = [len(sentence) for sentence in batch_target_input_data]
batch_source_input_data = tf.keras.preprocessing.sequence.pad_sequences(
batch_source_input_data,
padding='post',
maxlen=max(batch_source_input_lengths)
)
batch_target_input_data = tf.keras.preprocessing.sequence.pad_sequences(
batch_target_input_data,
padding='post',
maxlen=max(batch_target_input_lengths)
)
batch_decoder_targets = np.zeros_like(batch_target_input_data)
batch_decoder_targets[:, :-1] = batch_target_input_data[:, 1:]
_, loss = sess.run(
[model.train_op, model.loss],
feed_dict={
model.encoder_inputs: batch_source_input_data,
model.decoder_inputs: batch_target_input_data,
model.decoder_targets: batch_decoder_targets,
model.decoder_lengths: batch_target_input_lengths
}
)
total_loss += loss
print('Epoch:', epoch+1, 'Loss:', total_loss)
save_path = saver.save(sess, './model.ckpt')
# 测试模型
tf.reset_default_graph()
model = Seq2Seq(source_vocab_size, target_vocab_size, hidden_size, num_layers)
saver = tf.train.Saver()
with tf.Session() as sess:
saver.restore(sess, './model.ckpt')
while True:
input_sentence = input('Enter English Sentence: ')
input_sentence = [source_vocab.word_to_id(word) for word in input_sentence.split()]
input_sentence = [source_vocab.vocab['<GO>']] + input_sentence + [source_vocab.vocab['<EOS>']]
output_sentence = []
for i in range(100):
prediction = sess.run(model.predictions, feed_dict={model.encoder_inputs: [input_sentence]})
prediction = prediction[0]
if prediction[i] == target_vocab.vocab['<EOS>']:
break
output_sentence.append(target_vocab.id_to_word(prediction[i]))
print('Hindi Translation:', ' '.join(output_sentence))
```
其中,英语数据保存在`english_data.txt`文件中,印地语数据保存在`hindi_data.txt`文件中。请将它们放在同一目录下。
你可以通过修改超参数和模型结构来进一步优化模型的性能。
阅读全文