自动编码器异常检测代码
时间: 2023-11-12 21:04:45 浏览: 104
以下是一个简单的自动编码器异常检测的代码示例,使用了PyTorch框架:
```python
import torch
import torch.nn as nn
import numpy as np
class Autoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim):
super(Autoencoder, self).__init__()
self.encoder = nn.Linear(input_dim, hidden_dim)
self.decoder = nn.Linear(hidden_dim, input_dim)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def train(model, x_train, x_valid, num_epochs, batch_size):
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
train_dataset = torch.utils.data.TensorDataset(torch.Tensor(x_train), torch.Tensor(x_train))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_dataset = torch.utils.data.TensorDataset(torch.Tensor(x_valid), torch.Tensor(x_valid))
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
for epoch in range(num_epochs):
train_loss = 0.0
valid_loss = 0.0
model.train()
for data in train_loader:
inputs, labels = data
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
model.eval()
for data in valid_loader:
inputs, labels = data
outputs = model(inputs)
loss = criterion(outputs, labels)
valid_loss += loss.item() * inputs.size(0)
train_loss /= len(train_loader.dataset)
valid_loss /= len(valid_loader.dataset)
print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(epoch+1, train_loss, valid_loss))
def anomaly_score(model, x):
with torch.no_grad():
recon = model(torch.Tensor(x))
mse_loss = np.mean(np.power(x - recon.numpy(), 2), axis=1)
return mse_loss
if __name__ == '__main__':
# 数据准备
x_train = np.loadtxt('train_data.csv', delimiter=',')
x_valid = np.loadtxt('valid_data.csv', delimiter=',')
x_test = np.loadtxt('test_data.csv', delimiter=',')
# 模型训练
input_dim = x_train.shape[1]
hidden_dim = 10
model = Autoencoder(input_dim, hidden_dim)
num_epochs = 100
batch_size = 32
train(model, x_train, x_valid, num_epochs, batch_size)
# 异常检测
threshold = 0.05
scores = anomaly_score(model, x_test)
anomalies = np.where(scores > threshold)[0]
print('Anomaly indices:', anomalies)
```
代码中使用了一个简单的单层自动编码器模型,以及MSE损失和Adam优化器进行训练。在训练完成后,使用未标记数据集进行异常检测,并通过设置阈值来标识异常值。
阅读全文