用numpy实现一个transformer
时间: 2023-12-31 19:14:55 浏览: 202
好的,下面是一个基于numpy实现的Transformer模型,包含编码器和解码器部分:
```python
import numpy as np
class MultiHeadAttention:
def __init__(self, d_model, num_heads):
self.num_heads = num_heads
self.d_model = d_model
self.depth = d_model // self.num_heads
self.Wq = np.random.randn(d_model, d_model)
self.Wk = np.random.randn(d_model, d_model)
self.Wv = np.random.randn(d_model, d_model)
self.Wo = np.random.randn(d_model, d_model)
def softmax(self, x):
exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
def split_heads(self, x):
batch_size = x.shape[0]
return np.transpose(x.reshape(batch_size, -1, self.num_heads, self.depth), (0, 2, 1, 3))
def merge_heads(self, x):
batch_size = x.shape[0]
return np.reshape(np.transpose(x, (0, 2, 1, 3)), (batch_size, -1, self.d_model))
def forward(self, x):
Q = np.dot(x, self.Wq)
K = np.dot(x, self.Wk)
V = np.dot(x, self.Wv)
Q_split = self.split_heads(Q)
K_split = self.split_heads(K)
V_split = self.split_heads(V)
attention_weights = self.softmax(np.matmul(Q_split, K_split.transpose((0, 1, 3, 2))) / np.sqrt(self.depth))
context = np.matmul(attention_weights, V_split)
merged_context = self.merge_heads(context)
output = np.dot(merged_context, self.Wo)
return output
class FeedForward:
def __init__(self, d_model, d_ff):
self.W1 = np.random.randn(d_model, d_ff)
self.b1 = np.random.randn(d_ff)
self.W2 = np.random.randn(d_ff, d_model)
self.b2 = np.random.randn(d_model)
def forward(self, x):
hidden = np.maximum(0, np.dot(x, self.W1) + self.b1)
output = np.dot(hidden, self.W2) + self.b2
return output
class EncoderLayer:
def __init__(self, d_model, num_heads, d_ff):
self.multihead_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.layer_norm1 = LayerNormalization()
self.layer_norm2 = LayerNormalization()
def forward(self, x):
attention_output = self.multihead_attention.forward(x)
attention_output = self.layer_norm1.forward(x + attention_output)
feed_forward_output = self.feed_forward.forward(attention_output)
output = self.layer_norm2.forward(attention_output + feed_forward_output)
return output
class DecoderLayer:
def __init__(self, d_model, num_heads, d_ff):
self.masked_multihead_attention = MultiHeadAttention(d_model, num_heads)
self.multihead_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.layer_norm1 = LayerNormalization()
self.layer_norm2 = LayerNormalization()
self.layer_norm3 = LayerNormalization()
def forward(self, x, encoder_output):
masked_attention_output = self.masked_multihead_attention.forward(x)
masked_attention_output = self.layer_norm1.forward(x + masked_attention_output)
attention_output = self.multihead_attention.forward(masked_attention_output, encoder_output)
attention_output = self.layer_norm2.forward(masked_attention_output + attention_output)
feed_forward_output = self.feed_forward.forward(attention_output)
output = self.layer_norm3.forward(attention_output + feed_forward_output)
return output
class LayerNormalization:
def __init__(self, epsilon=1e-6):
self.gamma = np.random.randn(1)
self.beta = np.random.randn(1)
self.epsilon = epsilon
def forward(self, x):
mean = np.mean(x, axis=-1, keepdims=True)
std = np.sqrt(np.var(x, axis=-1, keepdims=True) + self.epsilon)
norm_x = (x - mean) / std
output = self.gamma * norm_x + self.beta
return output
class Encoder:
def __init__(self, num_layers, d_model, num_heads, d_ff):
self.layers = [EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
def forward(self, x):
for layer in self.layers:
x = layer.forward(x)
return x
class Decoder:
def __init__(self, num_layers, d_model, num_heads, d_ff):
self.layers = [DecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
def forward(self, x, encoder_output):
for layer in self.layers:
x = layer.forward(x, encoder_output)
return x
class Transformer:
def __init__(self, num_layers, d_model, num_heads, d_ff):
self.encoder = Encoder(num_layers, d_model, num_heads, d_ff)
self.decoder = Decoder(num_layers, d_model, num_heads, d_ff)
self.linear = Linear(d_model, 1)
def forward(self, encoder_input, decoder_input):
encoder_output = self.encoder.forward(encoder_input)
decoder_output = self.decoder.forward(decoder_input, encoder_output)
output = self.linear.forward(decoder_output)
return output
```
需要注意的是,上述实现中省略了一些细节,例如位置编码和掩码等,这些可以根据实际需求进行添加。此外,实现中使用了一些随机初始化的权重和偏置,实际应用中需要根据具体任务进行训练。
阅读全文