python编写基于DCNN的滚动轴承故障诊断代码(损失函数为源域分类损失+目标域分类损失+MMD损失)
时间: 2023-08-01 14:09:32 浏览: 125
1DCNN滚动轴承轴承故障诊断python
5星 · 资源好评率100%
好的,下面给出基于PyTorch的基于DCNN的滚动轴承故障诊断代码(损失函数为源域分类损失+目标域分类损失+MMD损失)的示例:
```python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from sklearn.metrics import confusion_matrix
import numpy as np
import os
# 定义数据集类
class BearingDataset(Dataset):
def __init__(self, data_path, mode):
self.data_path = data_path
self.mode = mode
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
self.data, self.labels = self.load_data()
def __len__(self):
return len(self.labels)
def __getitem__(self, index):
data = self.transform(self.data[index])
label = self.labels[index]
return data, label
def load_data(self):
if self.mode == "source":
data_path = os.path.join(self.data_path, "source")
elif self.mode == "target":
data_path = os.path.join(self.data_path, "target")
else:
raise Exception("Invalid mode: {}".format(self.mode))
data = []
labels = []
for class_idx, class_name in enumerate(os.listdir(data_path)):
class_path = os.path.join(data_path, class_name)
for file_name in os.listdir(class_path):
file_path = os.path.join(class_path, file_name)
img = Image.open(file_path).convert("RGB")
data.append(np.array(img))
labels.append(class_idx)
return data, labels
# 定义DCNN模型
class DCNN(nn.Module):
def __init__(self):
super(DCNN, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
self.bn1 = nn.BatchNorm2d(32)
self.relu1 = nn.ReLU(inplace=True)
self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
self.bn2 = nn.BatchNorm2d(64)
self.relu2 = nn.ReLU(inplace=True)
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.relu3 = nn.ReLU(inplace=True)
self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
self.fc1 = nn.Linear(128*6*6, 256)
self.bn4 = nn.BatchNorm1d(256)
self.relu4 = nn.ReLU(inplace=True)
self.dropout1 = nn.Dropout(p=0.5)
self.fc2 = nn.Linear(256, 10)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.pool1(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu2(x)
x = self.pool2(x)
x = self.conv3(x)
x = self.bn3(x)
x = self.relu3(x)
x = self.pool3(x)
x = x.view(x.size(0), -1)
x = self.fc1(x)
x = self.bn4(x)
x = self.relu4(x)
x = self.dropout1(x)
x = self.fc2(x)
return x
# 定义损失函数
def loss_function(source_outputs, target_outputs, source_labels, target_labels, lambd):
source_loss = nn.CrossEntropyLoss()(source_outputs, source_labels)
target_loss = nn.CrossEntropyLoss()(target_outputs, target_labels)
mmd_loss = MMD(source_outputs, target_outputs)
total_loss = source_loss + target_loss + lambd * mmd_loss
return total_loss
# 定义MMD损失函数
def MMD(source_outputs, target_outputs):
source_mean = torch.mean(source_outputs, dim=0)
target_mean = torch.mean(target_outputs, dim=0)
source_cov = torch.mm(torch.transpose(source_outputs, 0, 1), source_outputs) / source_outputs.size(0)
target_cov = torch.mm(torch.transpose(target_outputs, 0, 1), target_outputs) / target_outputs.size(0)
mmd_loss = torch.norm(source_mean - target_mean, p=2) ** 2 + torch.norm(source_cov - target_cov, p="fro") ** 2
return mmd_loss
# 定义训练函数
def train(model, optimizer, source_dataloader, target_dataloader, lambd):
model.train()
for batch_idx, (source_data, source_labels) in enumerate(source_dataloader):
target_data, _ = next(iter(target_dataloader))
source_data, source_labels = source_data.to(device), source_labels.to(device)
target_data = target_data.to(device)
optimizer.zero_grad()
source_outputs = model(source_data)
target_outputs = model(target_data)
batch_loss = loss_function(source_outputs, target_outputs, source_labels, target_labels, lambd)
batch_loss.backward()
optimizer.step()
# 定义测试函数
def test(model, dataloader):
model.eval()
test_loss = 0
test_correct = 0
test_total = 0
with torch.no_grad():
for data, labels in dataloader:
data, labels = data.to(device), labels.to(device)
outputs = model(data)
loss = nn.CrossEntropyLoss()(outputs, labels)
test_loss += loss.item() * labels.size(0)
_, predicted = torch.max(outputs.data, 1)
test_total += labels.size(0)
test_correct += predicted.eq(labels.data).cpu().sum().item()
test_loss /= test_total
test_acc = 100. * test_correct / test_total
return test_loss, test_acc
# 定义主函数
if __name__ == "__main__":
data_path = "bearing_data"
batch_size = 64
num_epochs = 100
lr = 0.001
lambd = 0.1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
source_dataset = BearingDataset(data_path, mode="source")
target_dataset = BearingDataset(data_path, mode="target")
source_dataloader = DataLoader(source_dataset, batch_size=batch_size, shuffle=True)
target_dataloader = DataLoader(target_dataset, batch_size=batch_size, shuffle=True)
model = DCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(num_epochs):
train(model, optimizer, source_dataloader, target_dataloader, lambd)
source_test_loss, source_test_acc = test(model, source_dataloader)
target_test_loss, target_test_acc = test(model, target_dataloader)
print("Epoch {}: Source Test Loss: {:.4f}, Source Test Acc: {:.2f}%, Target Test Loss: {:.4f}, Target Test Acc: {:.2f}%".format(epoch+1, source_test_loss, source_test_acc, target_test_loss, target_test_acc))
```
以上是基于DCNN的滚动轴承故障诊断代码的示例,其中包括数据预处理、DCNN模型构建、损失函数定义、训练函数、测试函数和主函数。
阅读全文