以CWRU数据集编写基于python的mmd损失训练DCNN代码
时间: 2024-05-14 19:15:33 浏览: 176
机器学习CNN的代码
好的,下面是基于Python的MMD损失训练DCNN代码,使用CWRU数据集:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import scipy.io as sio
import argparse
# 设置参数
parser = argparse.ArgumentParser(description='MMD损失训练DCNN')
parser.add_argument('--batch_size', type=int, default=64, help='输入批次大小')
parser.add_argument('--lr', type=float, default=1e-3, help='学习率')
parser.add_argument('--num_epochs', type=int, default=100, help='训练轮次')
parser.add_argument('--num_classes', type=int, default=10, help='分类数')
parser.add_argument('--num_channels', type=int, default=2, help='通道数')
parser.add_argument('--num_filters', type=int, default=32, help='卷积核数')
parser.add_argument('--num_blocks', type=int, default=3, help='残差块数')
parser.add_argument('--num_features', type=int, default=128, help='特征数')
parser.add_argument('--mmd_sigma', type=float, default=1, help='MMD损失函数中的sigma')
args = parser.parse_args()
# 加载数据
data = sio.loadmat('CWRU.mat')
X_train = data['X_train']
y_train = data['y_train']
X_test = data['X_test']
y_test = data['y_test']
# 将数据转换为张量
X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).long()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).long()
# 创建模型
class ResNet(nn.Module):
def __init__(self, num_channels, num_filters, num_blocks, num_classes, num_features):
super(ResNet, self).__init__()
self.conv1 = nn.Conv1d(num_channels, num_filters, kernel_size=5, stride=2, padding=2)
self.bn1 = nn.BatchNorm1d(num_filters)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(num_filters, num_filters, num_blocks-1)
self.layer2 = self._make_layer(num_filters, num_filters*2, num_blocks)
self.layer3 = self._make_layer(num_filters*2, num_filters*4, num_blocks)
self.layer4 = self._make_layer(num_filters*4, num_filters*8, num_blocks)
self.avgpool = nn.AdaptiveAvgPool1d(1)
self.fc1 = nn.Linear(num_filters*8, num_features)
self.fc2 = nn.Linear(num_features, num_classes)
def _make_layer(self, in_channels, out_channels, num_blocks):
layers = []
layers.append(nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=2, padding=1))
layers.append(nn.BatchNorm1d(out_channels))
layers.append(nn.ReLU(inplace=True))
for i in range(num_blocks):
layers.append(nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1))
layers.append(nn.BatchNorm1d(out_channels))
layers.append(nn.ReLU(inplace=True))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# 定义损失函数和优化器
model = ResNet(args.num_channels, args.num_filters, args.num_blocks, args.num_classes, args.num_features)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# 训练模型
for epoch in range(args.num_epochs):
running_loss = 0.0
for i in range(0, len(X_train), args.batch_size):
# 取得当前批次的数据和标签
inputs = X_train[i:i+args.batch_size]
labels = y_train[i:i+args.batch_size]
# 清零梯度
optimizer.zero_grad()
# 前向传播
outputs = model(inputs)
# 计算损失函数
loss = criterion(outputs, labels)
# 计算MMD损失函数
X_data = inputs.numpy()
X_data = X_data.reshape(-1, X_data.shape[1]*X_data.shape[2])
X_data = X_data / np.max(X_data, axis=1, keepdims=True)
X_data = torch.from_numpy(X_data).float()
X_data = X_data.cuda()
mmd_loss = 0.0
for j in range(args.num_classes):
X_class = X_data[labels == j]
X_class_mean = torch.mean(X_class, dim=0, keepdim=True)
X_class_diff = X_class - X_class_mean
mmd_loss += torch.mean(torch.exp(-torch.sum(X_class_diff**2, dim=1)/(2*args.mmd_sigma**2)))
loss += mmd_loss
# 反向传播和优化
loss.backward()
optimizer.step()
running_loss += loss.item()
# 打印每轮的损失函数值
print('Epoch [%d/%d], Loss: %.4f' % (epoch+1, args.num_epochs, running_loss))
# 测试模型
with torch.no_grad():
correct = 0
total = 0
for i in range(0, len(X_test), args.batch_size):
inputs = X_test[i:i+args.batch_size]
labels = y_test[i:i+args.batch_size]
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('测试集准确率: %d %%' % (100 * correct / total))
```
在命令行中运行以下命令使用上面的代码:
```bash
python train.py --batch_size 64 --lr 1e-3 --num_epochs 100 --num_classes 10 --num_channels 2 --num_filters 32 --num_blocks 3 --num_features 128 --mmd_sigma 1
```
其中,`--batch_size` 表示每个批次的大小,`--lr` 表示学习率,`--num_epochs` 表示训练轮数,`--num_classes` 表示分类数,`--num_channels` 表示通道数,`--num_filters` 表示卷积核数,`--num_blocks` 表示残差块数,`--num_features` 表示特征数,`--mmd_sigma` 表示MMD损失函数中的sigma。
阅读全文