生成一个 pytorch ddp 训练和验证 resnet18 的例子,并且代码中 ddp 使用 mpirun 的方式启动
时间: 2023-05-25 20:06:10 浏览: 610
以下是 Pytorch DDP 训练和验证 ResNet18 的例子,使用 mpirun 启动 DDP:
```python
import torch
import torch.nn as nn
import torch.distributed as dist
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
def train(rank, world_size):
# 初始化进程间通信
dist.init_process_group(
backend='mpi',
init_method='env://'
)
# 声明 ResNet18 模型和损失函数
model = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(512, 1000),
nn.ReLU(inplace=True),
nn.Linear(1000, 10)
)
criterion = nn.CrossEntropyLoss()
# 将模型和损失分配到指定的 GPU 上
torch.cuda.set_device(rank)
model.cuda(rank)
criterion.cuda(rank)
# 将模型转化为 DDP 模型
model = DDP(model, device_ids=[rank])
# 加载训练数据集
train_dataset = datasets.CIFAR10(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=256,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler
)
# 加载测试数据集
test_dataset = datasets.CIFAR10(
root='./data',
train=False,
transform=transforms.ToTensor(),
download=True
)
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset,
num_replicas=world_size,
rank=rank
)
test_loader = torch.utils.data.DataLoader(
dataset=test_dataset,
batch_size=256,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=test_sampler
)
# 设置优化器和学习率调度器
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [50, 100, 150], gamma=0.1)
# 训练和验证
for epoch in range(200):
# 训练
train_sampler.set_epoch(epoch)
for i, (images, labels) in enumerate(train_loader):
# 使用 GPU 计算损失
images = images.cuda(rank, non_blocking=True)
labels = labels.cuda(rank, non_blocking=True)
outputs = model(images)
loss = criterion(outputs, labels)
# 梯度下降
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 验证
test_loss = 0.0
test_correct = 0.0
with torch.no_grad():
test_sampler.set_epoch(epoch)
for i, (images, labels) in enumerate(test_loader):
# 使用 GPU 计算损失和准确率
images = images.cuda(rank, non_blocking=True)
labels = labels.cuda(rank, non_blocking=True)
outputs = model(images)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs.data, 1)
test_loss += loss.item()
test_correct += preds.eq(labels.data.view_as(preds)).sum().item()
# 打印训练和验证的结果
print(f"Rank {rank}, epoch {epoch}: Train Loss = {loss.item()}, Test Loss = {test_loss/len(test_loader)}, Test Accuracy = {test_correct/len(test_loader.dataset)}")
# 更新学习率
scheduler.step()
if __name__ == '__main__':
# 解析参数
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int)
args = parser.parse_args()
# 启动进程
torch.distributed.init_process_group(backend='mpi')
torch.cuda.set_device(args.local_rank)
world_size = torch.distributed.get_world_size()
train(args.local_rank, world_size)
```
使用以下命令在 2 个进程上启动 DDP 训练:
```
mpirun -n 2 python train.py --local_rank $OMPI_COMM_WORLD_LOCAL_RANK
```
请确保已经正确设置环境变量``OMP_PROC_BIND``和``OMPI_MCA_btl_vader_single_copy_mechanism=none``。
阅读全文