我有一个来自十个类别各100个共1000个的信号数据,每个数据有512个特征点,存储为(1000,512)的tensor张量,现在我想将其输入一个深度LTSM网络训练分类模型用于分类这些类别,请使用pytorch实现
时间: 2024-05-15 19:16:09 浏览: 173
首先,需要导入必要的库:
```python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
```
接下来,定义数据集类,继承自`Dataset`类,用于加载信号数据:
```python
class SignalDataset(Dataset):
def __init__(self, data, targets):
self.data = data
self.targets = targets
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.data[idx]
target = self.targets[idx]
return sample, target
```
然后,定义LSTM模型类,继承自`nn.Module`类,用于分类:
```python
class LSTMClassifier(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(LSTMClassifier, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# Set initial hidden and cell states
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# Forward propagate LSTM
out, _ = self.lstm(x, (h0, c0))
# Decode the hidden state of the last time step
out = self.fc(out[:, -1, :])
return out
```
接下来,定义训练函数:
```python
def train(model, train_loader, criterion, optimizer, device):
model.train()
for batch_idx, (data, targets) in enumerate(train_loader):
data = data.to(device)
targets = targets.to(device)
# Forward pass
outputs = model(data)
loss = criterion(outputs, targets)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Batch {} Loss: {:.4f}'.format(batch_idx, loss.item()))
```
然后,定义测试函数:
```python
def test(model, test_loader, criterion, device):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, targets in test_loader:
data = data.to(device)
targets = targets.to(device)
# Forward pass
outputs = model(data)
test_loss += criterion(outputs, targets).item()
# Get predicted class
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == targets).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = correct / len(test_loader.dataset)
print('Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(test_loss, 100 * accuracy))
```
接下来,加载数据集并进行预处理:
```python
# Load data
data = torch.load('data.pt')
targets = torch.load('targets.pt')
# Split into train and test sets
train_data = data[:800]
train_targets = targets[:800]
test_data = data[800:]
test_targets = targets[800:]
# Create datasets and dataloaders
train_dataset = SignalDataset(train_data, train_targets)
test_dataset = SignalDataset(test_data, test_targets)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
```
最后,训练并测试模型:
```python
# Define hyperparameters
input_size = 512
hidden_size = 256
num_layers = 2
num_classes = 10
learning_rate = 0.001
num_epochs = 10
# Initialize model
model = LSTMClassifier(input_size, hidden_size, num_layers, num_classes).to(device)
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Train and test model
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch+1, num_epochs))
train(model, train_loader, criterion, optimizer, device)
test(model, test_loader, criterion, device)
```
完整代码如下:
```python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
class SignalDataset(Dataset):
def __init__(self, data, targets):
self.data = data
self.targets = targets
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.data[idx]
target = self.targets[idx]
return sample, target
class LSTMClassifier(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(LSTMClassifier, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# Set initial hidden and cell states
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# Forward propagate LSTM
out, _ = self.lstm(x, (h0, c0))
# Decode the hidden state of the last time step
out = self.fc(out[:, -1, :])
return out
def train(model, train_loader, criterion, optimizer, device):
model.train()
for batch_idx, (data, targets) in enumerate(train_loader):
data = data.to(device)
targets = targets.to(device)
# Forward pass
outputs = model(data)
loss = criterion(outputs, targets)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Batch {} Loss: {:.4f}'.format(batch_idx, loss.item()))
def test(model, test_loader, criterion, device):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, targets in test_loader:
data = data.to(device)
targets = targets.to(device)
# Forward pass
outputs = model(data)
test_loss += criterion(outputs, targets).item()
# Get predicted class
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == targets).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = correct / len(test_loader.dataset)
print('Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(test_loss, 100 * accuracy))
# Load data
data = torch.load('data.pt')
targets = torch.load('targets.pt')
# Split into train and test sets
train_data = data[:800]
train_targets = targets[:800]
test_data = data[800:]
test_targets = targets[800:]
# Create datasets and dataloaders
train_dataset = SignalDataset(train_data, train_targets)
test_dataset = SignalDataset(test_data, test_targets)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Define hyperparameters
input_size = 512
hidden_size = 256
num_layers = 2
num_classes = 10
learning_rate = 0.001
num_epochs = 10
# Initialize model
model = LSTMClassifier(input_size, hidden_size, num_layers, num_classes).to(device)
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Train and test model
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch+1, num_epochs))
train(model, train_loader, criterion, optimizer, device)
test(model, test_loader, criterion, device)
```
阅读全文