多机多卡的DDP pytorch
PyTorch 多机多卡分布式数据并行(DDP)实现
1. 基本概念
DistributedDataParallel (DDP)
是 PyTorch 中的一个高效工具,用于支持跨多个 GPU 和节点的模型训练。它通过在每个进程中复制模型实例,并利用 torch.distributed
包中的通信原语来同步梯度和其他状态变量[^2]。
为了实现多机多卡的 DDP 训练,需要解决以下几个关键问题:
- 初始化分布式环境:设置每台机器上的设备编号以及进程组。
- 划分数据集:确保不同进程处理不同的子集以减少冗余计算。
- 模型封装:将模型包装成
DistributedDataParallel
类型以便启用并行化功能。
2. 初始化分布式环境
在启动程序之前,需配置好分布式运行所需的参数,例如主机地址 (MASTER_ADDR
)、端口 (MASTER_PORT
)、当前 rank 及 world size 等。这些可以通过命令行传递或者硬编码完成。
以下是典型的初始化代码片段:
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_distributed(rank, world_size):
""" 设置分布式环境 """
os.environ['MASTER_ADDR'] = 'localhost' # 主节点IP地址
os.environ['MASTER_PORT'] = '12355' # 进程间通讯使用的端口号
# 使用 NCCL 后端进行GPU间的通信
dist.init_process_group(backend='nccl', init_method='env://', rank=rank, world_size=world_size)
def cleanup():
""" 清理分布式资源 """
dist.destroy_process_group()
上述函数中调用了 dist.init_process_group()
方法指定后端类型为 'nccl'
,这是 NVIDIA 提供的一种高性能 CUDA 集体通信库,适用于 GPU 场景下的快速消息交换操作。
3. 数据加载器调整
为了让各 worker 能够独立访问互不重叠的数据部分,可以借助于 DistributedSampler
来重新定义采样逻辑。该类会自动根据总样本数分配给定数量的任务单元至各个子线程之中执行。
下面展示了一个简单的例子说明如何构建适配器版本的数据读取管道:
from torch.utils.data import DataLoader, Dataset, DistributedSampler
class ToyDataset(Dataset):
def __init__(self):
self.data = list(range(100))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def prepare_data_loader(dataset, batch_size, rank, world_size):
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
loader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=False,
sampler=sampler
)
return loader
这里需要注意的是,在实际部署过程中可能还需要考虑其他因素比如 pin_memory 或者 drop_last 参数的选择等问题[^1]。
4. 构建与封装模型
一旦完成了前期准备工作之后就可以着手搭建神经网络结构并将其实例传入到 DDP 容器当中去了。具体做法如下所示:
import torchvision.models as models
def create_model(device):
model = models.resnet18(pretrained=False).to(device)
ddp_model = DDP(model, device_ids=[device])
return ddp_model
在这里我们选择了 ResNet-18 作为基础架构并通过 .to(device)
将其迁移到目标硬件之上;接着再用 DDP(...)
对象包裹起来形成最终可用的形式。
5. 整合全流程
最后一步就是把前面提到过的几个组件串联起来构成完整的解决方案框架了。整个流程大致遵循这样的顺序展开——先设定全局变量值 -> 接着分别针对每一个单独的工作单元重复执行相同的操作序列直至结束为止。
完整示例脚本如下所列:
if __name__ == "__main__":
local_rank = int(os.getenv('LOCAL_RANK'))
world_size = int(os.getenv('WORLD_SIZE'))
setup_distributed(local_rank, world_size)
try:
device = f"cuda:{local_rank}"
train_dataset = ToyDataset()
train_loader = prepare_data_loader(train_dataset, batch_size=16, rank=local_rank, world_size=world_size)
model = create_model(device=device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
for epoch in range(10): # Example epochs count.
train_loader.sampler.set_epoch(epoch) # Ensure different shuffling per epoch.
for data in train_loader:
outputs = model(data.to(device))
loss = ... # Define your own criterion function here
optimizer.zero_grad()
loss.backward()
optimizer.step()
finally:
cleanup()
此段代码展示了从初始化直到清理阶段的整体控制流设计思路^。
相关推荐


















