每次迭代中,每个进程具有自己的
optimizer
,并独立完成所有的优化步骤,进程内与一般的训练相一致。每个进程对应一个独立的训练过程,只有对梯度等少量数据进行信息交换。在
nn.DataParallel
中, 全程维护一个 optimizer,然后梯度求和,然后在主gpu上进行参数更新,再将更新后的参数广播到其他gpu上。比较而言,前者传输数据量更少,因此速度更快,效率更高在使用多进程时,每个进程有自己计算得到的loss,记录数据时 希望对不同进程上的loss取平均,其它数据也是想要平均。这时需要用到api如下,详细的参见源码
def all_reduce(tensor, op=ReduceOp.SUM, group=group.WORLD, async_op=False):
"""
Reduces the tensor data across all machines in such a way that all get
the final result.
"""
由于使用 DDP 后,模型在每个GPU上都复制了一份,同时被封装了一层。所以保存模型时只需要保存 master 节点的模型,并将平时的
model
变成 model.module
,具体如下:if dist.get_rank()==0:
torch.save(model.module.state_dict(), "{}.ckpt".format(str(epoch)))
在加载模型时,只需要在构造 DDP 模型之前,在master节点上加载:if dist.get_rank() == 0 and ckpt_path is not None:
model.load_state_dict(torch.load(ckpt_path))
补充
- 每个进程包含独立的解释器和GIL
一般使用的Python解释器 CPython:是用C语言实现Python,是目前应用最广泛的解释器。全局锁使Python在多线程效果升表现不佳。全局解释器锁(Global Interpreter Lock)是Python用于同步线程的工具,使得任何时刻仅有一个线程在执行。
由于每个进程拥有独立的解释器和GIL,消除了来自单个Python进程中的多个执行线程,模型副本或GPU的额外解释器开销、线程颠簸,因此可以减少解释器和GIL使用冲突。这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。
使用方式----代码编写
import os
import argparse
import torch
import torch.nn as nn
import torch.distributed as dist
def parse():
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=0)
args = parser.parse_args()
return args
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= dist.get_world_size()
return rt
def record_loss(loss):
reduced_loss = reduce_tensor(loss.data)
train_epoch_loss += reduced_loss.item()
# 注意在写入TensorBoard的时候只让一个进程写入就够了:
# TensorBoard
if args.local_rank == 0:
writer.add_scalars('Loss/training', {'train_loss': train_epoch_loss,
'val_loss': val_epoch_loss}, epoch + 1)
def main():
"""=============================================================
在启动器启动python脚本后,会通过参数 local_rank 来告诉当前进程使用的是哪个GPU,
用于在每个进程中指定不同的device
================================================================"""
args = parse()
torch.cuda.set_device(args.local_rank)
dist.init_process_group(
'nccl', # 初始化GPU通信方式(NCCL)
init_menthod='env://' # 参数的获取方式(env代表通过环境变量)
)
"""=============================================================
分布式数据读取,具体使用方式, 参考 https://blog.csdn.net/magic_ll/article/details/123294552
================================================================"""
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
"""=======分布式模型的调用:包括SynBN========================================"""
model = ...
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
optimizer = optim.SGD(model.parameters())
"""=======训练===================================================="""
for epoch in range(100):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
record_loss(loss)
代码的启动方式
- 在多进程的启动方面,不用自己手写 multiprocess 进行一系列复杂的CPU/GPU分配任务,PyTorch提供了一个很方便的启动器 torch.distributed.launch 用于启动文件,故运行训练代码的方式如下:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py