def train(model: torch.nn.Module, optimizer: torch.optim.Optimizer, data_loader: DataLoader, device: str): """训练函数""" model.train() loss_func = torch.nn.BCELoss(reduction="none") total_loss = 0 total_num = 0 for texts, labels, mask in tqdm(data_loader, desc="Train"): texts = texts.to(device) labels = labels.float().to(device) mask = mask.float().to(device) logits = model(texts, mask) loss = loss_func(logits, labels) loss = (loss * mask).sum() / mask.sum() optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() * mask.sum().item() total_num += mask.sum().item() return total_loss / total_num
时间: 2023-07-14 10:12:17 浏览: 36
这是一个 PyTorch 的训练函数,用于在给定数据集上训练一个模型。该函数接受四个参数:
- `model`:待训练的模型。
- `optimizer`:优化器,用于更新模型参数。
- `data_loader`:数据加载器,用于将数据分批次加载到模型中。
- `device`:设备,用于指定训练模型所在的设备(例如 CPU 或 GPU)。
在函数中,我们首先将模型设置为训练模式,然后定义了一个二分类交叉熵损失函数。接下来,我们循环遍历数据加载器中的每个批次,将输入数据和标签移动到指定设备上,并将数据输入到模型中得到模型输出。然后计算损失,执行反向传播,更新模型参数。最后,我们统计总损失并返回平均损失。
相关问题
def train_and_evaluate(model: torch.nn.Module, optimizer: torch.optim.Optimizer, train_loader: DataLoader, valid_loader: DataLoader, num_epochs: int, device: str): """训练和评估函数""" best_valid_loss = float("inf") for epoch in range(num_epochs): train_loss = train(model, optimizer, train_loader, device) valid_loss = evaluate(model, valid_loader, device) print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}") if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), "best_model.pt") model.load_state_dict(torch.load("best_model.pt")) test_loss = evaluate(model, test_loader, device) print(f"Test Loss: {test_loss:.4f}")
这段代码是一个训练和评估模型的函数。它包含了以下参数:
- `model`: 要训练的PyTorch模型。
- `optimizer`: 优化器,用于更新模型参数。
- `train_loader`: 训练数据的DataLoader。
- `valid_loader`: 验证数据的DataLoader。
- `num_epochs`: 训练的总轮数。
- `device`: 训练使用的设备(GPU或CPU)。
在训练过程中,函数会迭代每个epoch,并在每个epoch完成后打印训练和验证损失。如果当前的验证损失比之前最佳的验证损失更小,就会保存当前模型的状态。最后,它会使用保存的最佳模型状态来计算测试集上的损失。
这段代码中加一个test loss功能 class LSTM(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size, device): super().__init__() self.device = device self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.output_size = output_size self.num_directions = 1 # 单向LSTM self.batch_size = batch_size self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True) self.linear = nn.Linear(65536, self.output_size) def forward(self, input_seq): h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(self.device) c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(self.device) output, _ = self.lstm(input_seq, (h_0, c_0)) pred = self.linear(output.contiguous().view(self.batch_size, -1)) return pred if __name__ == '__main__': # 加载已保存的模型参数 saved_model_path = '/content/drive/MyDrive/危急值/model/dangerous.pth' device = 'cuda:0' lstm_model = LSTM(input_size=1, hidden_size=64, num_layers=1, output_size=3, batch_size=256, device='cuda:0').to(device) state_dict = torch.load(saved_model_path) lstm_model.load_state_dict(state_dict) dataset = ECGDataset(X_train_df.to_numpy()) dataloader = DataLoader(dataset, batch_size=256, shuffle=True, num_workers=0, drop_last=True) loss_fn = nn.CrossEntropyLoss() optimizer = optim.SGD(lstm_model.parameters(), lr=1e-4) for epoch in range(200000): print(f'epoch:{epoch}') lstm_model.train() epoch_bar = tqdm(dataloader) for x, y in epoch_bar: optimizer.zero_grad() x_out = lstm_model(x.to(device).type(torch.cuda.FloatTensor)) loss = loss_fn(x_out, y.long().to(device)) loss.backward() epoch_bar.set_description(f'loss:{loss.item():.4f}') optimizer.step() if epoch % 100 == 0 or epoch == epoch - 1: torch.save(lstm_model.state_dict(), "/content/drive/MyDrive/危急值/model/dangerous.pth") print("权重成功保存一次")
以下是加入 Test Loss 功能后的代码:
```python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size, device):
super().__init__()
self.device = device
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.num_directions = 1 # 单向LSTM
self.batch_size = batch_size
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(65536, self.output_size)
def forward(self, input_seq):
h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(self.device)
c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(self.device)
output, _ = self.lstm(input_seq, (h_0, c_0))
pred = self.linear(output.contiguous().view(self.batch_size, -1))
return pred
if __name__ == '__main__':
# 加载已保存的模型参数
saved_model_path = '/content/drive/MyDrive/危急值/model/dangerous.pth'
device = 'cuda:0'
lstm_model = LSTM(input_size=1, hidden_size=64, num_layers=1, output_size=3, batch_size=256, device=device).to(device)
state_dict = torch.load(saved_model_path)
lstm_model.load_state_dict(state_dict)
# 加载测试数据集
test_dataset = ECGDataset(X_test_df.to_numpy())
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=0, drop_last=False)
# 定义损失函数和优化器
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(lstm_model.parameters(), lr=1e-4)
# 开始训练
for epoch in range(200000):
print(f'epoch:{epoch}')
lstm_model.train()
epoch_bar = tqdm(dataloader)
for x, y in epoch_bar:
optimizer.zero_grad()
x_out = lstm_model(x.to(device).type(torch.cuda.FloatTensor))
loss = loss_fn(x_out, y.long().to(device))
loss.backward()
epoch_bar.set_description(f'loss:{loss.item():.4f}')
optimizer.step()
# 计算测试集上的平均损失
test_loss = 0.0
lstm_model.eval()
with torch.no_grad():
for x, y in test_loader:
x_out = lstm_model(x.to(device).type(torch.cuda.FloatTensor))
loss = loss_fn(x_out, y.long().to(device))
test_loss += loss.item() * x.size(0)
test_loss /= len(test_dataset)
print(f'Test Loss: {test_loss:.4f}')
if epoch % 100 == 0 or epoch == epoch - 1:
torch.save(lstm_model.state_dict(), "/content/drive/MyDrive/危急值/model/dangerous.pth")
print("权重成功保存一次")
```
在这个代码中,我们首先加载了测试数据集 `test_dataset` 和测试数据加载器 `test_loader`。在每个 epoch 完成后,我们计算测试集上的平均损失值。我们使用 `lstm_model.eval()` 将模型设为评估模式,并利用 `torch.no_grad()` 避免计算梯度,以加速计算。最后,我们输出测试集上的平均损失值。