使用pytorch实现lstm,使用NSL-KDD数据集
时间: 2023-12-09 08:06:33 浏览: 183
1. 下载NSL-KDD数据集,解压后得到四个文件:KDDTrain+.txt、KDDTest+.txt、KDDTrain+_20Percent.txt、KDDTest-21.txt。
2. 安装pytorch和numpy库。
3. 读取数据集并进行预处理,包括one-hot编码和标准化处理。
4. 划分训练集和验证集。
5. 定义LSTM模型,包括LSTM层、全连接层和dropout层。
6. 定义损失函数和优化器。
7. 训练模型并记录训练过程中的准确率和损失值。
8. 在验证集上评估模型效果。
9. 使用测试集验证模型准确率。
以下是代码示例:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
# 读取数据集
train_data = np.loadtxt('KDDTrain+.txt', delimiter=',')
test_data = np.loadtxt('KDDTest+.txt', delimiter=',')
# 对数据进行预处理
def preprocess(data):
# 将标签(最后一列)转化为0-1表示的异常与否
labels = data[:, -1]
labels[labels != 0] = 1
data[:, -1] = labels
# 对离散特征进行one-hot编码
ct = ColumnTransformer([('encoder', OneHotEncoder(), [1, 2, 3])],
remainder='passthrough')
data = ct.fit_transform(data)
# 对数值特征进行标准化处理
scaler = StandardScaler()
data[:, :39] = scaler.fit_transform(data[:, :39])
return data
train_data = preprocess(train_data)
test_data = preprocess(test_data)
# 划分训练集和验证集
train_data, valid_data = train_test_split(train_data, test_size=0.2)
# 将数据转化为张量
train_x, train_y = torch.Tensor(train_data[:, :-1]), torch.Tensor(train_data[:, -1])
valid_x, valid_y = torch.Tensor(valid_data[:, :-1]), torch.Tensor(valid_data[:, -1])
test_x, test_y = torch.Tensor(test_data[:, :-1]), torch.Tensor(test_data[:, -1])
train_x = train_x.reshape(-1, 1, 122)
valid_x = valid_x.reshape(-1, 1, 122)
test_x = test_x.reshape(-1, 1, 122)
# 定义LSTM模型
class LSTM(nn.Module):
def __init__(self):
super(LSTM, self).__init__()
self.lstm = nn.LSTM(input_size=122, hidden_size=64, num_layers=1, batch_first=True)
self.fc = nn.Linear(64, 1)
self.dropout = nn.Dropout(p=0.2)
def forward(self, x):
x, _ = self.lstm(x)
x = x[:, -1, :]
x = self.dropout(x)
x = self.fc(x)
x = torch.sigmoid(x)
return x
model = LSTM()
# 定义损失函数和优化器
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
epochs = 50
train_loss_list, valid_loss_list, train_acc_list, valid_acc_list = [], [], [], []
for epoch in range(epochs):
train_loss, valid_loss = 0.0, 0.0
train_acc, valid_acc = 0.0, 0.0
# 训练集
model.train()
optimizer.zero_grad()
output = model(train_x)
loss = criterion(output.squeeze(), train_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_acc += sum((output.squeeze() > 0.5).float() == train_y).item() / len(train_y)
# 验证集
with torch.no_grad():
model.eval()
output = model(valid_x)
loss = criterion(output.squeeze(), valid_y)
valid_loss += loss.item()
valid_acc += sum((output.squeeze() > 0.5).float() == valid_y).item() / len(valid_y)
train_loss_list.append(train_loss)
valid_loss_list.append(valid_loss)
train_acc_list.append(train_acc)
valid_acc_list.append(valid_acc)
print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Valid Loss: {valid_loss:.4f}, Valid Acc: {valid_acc:.4f}")
# 在测试集上验证模型效果
with torch.no_grad():
model.eval()
output = model(test_x)
test_acc = sum((output.squeeze() > 0.5).float() == test_y).item() / len(test_y)
print(f"Test Acc: {test_acc:.4f}")
```
阅读全文