DistributedDataParallel(DDP) - PyTorch多进程并行计算
PyTorch的DistributedDataParallel(DDP)可以实现多进程的并行计算,相较于传统的单进程多线程的DataParallel,DDP支持多节点的分布式计算。即使在单机多卡的场景下,DDP通常性能也更好,因为它不仅规避了Python多线程的全局解释器锁争用(GIL contention)造成的性能开销,而且还不需要在多GPU训练中频繁复制同步模型、分发输入数据和收集模型输出。
DistributedDataParallel(DDP) - PyTorch多进程并行计算
背景
Python GIL
GIL(Global Interpreter Lock)指的是全局解释器锁,由CPython解释器引入。因为CPython解释器的内存管理是线程不安全的,所以为了避免多线程同时执行Python字节码造成线程安全问题,就加了这么一个全局的互斥锁。可也正是因为这个全局互斥锁,导致Python的多线程实际上同时只有一个线程在运行,显然无法充分利用多处理器的性能。
参考官方解释:
Python has one peculiarity that makes concurrent programming harder. It’s called the GIL, short for Global Interpreter Lock. The GIL makes sure there is, at any time, only one thread running. Because only one thread can run at a time, it’s impossible to use multiple processors with threads. But don’t worry, there’s a way around this.
The GIL was invented because CPython’s memory management is not thread-safe. With only one thread running at a time, CPython can rest assured there will never be race conditions.
DistributedDataParallel较DataParallel的优势
DistributedDataParallel(DDP)相较于DataParallel(DP)有诸多优势,包括功能上的优势和性能上的优势:
功能上:
- DDP的原理是多进程,因此DDP支持多机多卡的分布式计算,而DP是但经常多线程,因此最高只支持单机多卡;
- DDP支持模型并行(model parallel),可以把一个模型拆成几个阶段来跑,而DP还不支持。
性能上:
- 正是因为DDP基于多进程(通常推荐1个GPU匹配一个工作进程),所以不像DP那样基于单进程多线程的并行性能受到GIL争用开销的阻碍。
- 在单机多卡的情况下,DP需要在训练中频繁在多卡之间复制模型以完成同步,需要分发(scatter)输入和收集(gather)输出,而DDP采用的All-Reduce算法采取聚合通信(collective communication)的方式收集梯度,其性能更好。
总的来讲,功能上的优势其实也是为了更好利用设备性能。
原理
原理可参阅:
另有一篇2020年的论文:
Li S, Zhao Y, Varma R, et al. PyTorch distributed: experiences on accelerating data parallel training[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 3005-3018.
总的架构可以参考:
- Distributed System
- Node 0
- Process0 [Global Rank=0, Local Rank=0] -> GPU 0-0
- Process1 [Global Rank=1, Local Rank=1] -> GPU 0-1
- Process2 [Global Rank=2, Local Rank=2] -> GPU 0-2
- Process3 [Global Rank=3, Local Rank=3] -> GPU 0-3
- Node 1
- Process4 [Global Rank=4, Local Rank=0] -> GPU 1-0
- Process5 [Global Rank=5, Local Rank=1] -> GPU 1-1
- Process6 [Global Rank=6, Local Rank=2] -> GPU 1-2
- Process7 [Global Rank=7, Local Rank=3] -> GPU 1-3
- Node 0
在这样的架构中,有如下术语和数值:
- N=2 Nodes
- G=4 GPUs per node
- W=8 Application processes across all nodes (aka. World Size)
- L=4 Application processes on each nodes (aka. Local Size)
使用
相较于DataParallel只需要简单地套到原模型上,DistributedDataParallel因为其原理是基于多进程的,因此写起来会稍微显得复杂一点点。
1 | """ |
spawn创建多进程
1 | def launch(args): |
launch
使用multiprocessing.spawn
来快速创建nprocs
个新进程,每个进程都执行worker
函数,并传入args
作为函数参数。
需要注意的是,spawn
默认会为函数传入一个i
,且i
在[0, nprocs)
之间。即,worker
函数收到的参数列表是(i, args, )
。
worker多进程并行
每一个worker进程做的工作分以下几个阶段:
- 初始化进程组:并行启动的多进程相互之间得形成一个进程组,即,要知道在哪会合(rendezvous)。DDP的机制会把以rank=0进程上的模型为准,自动保证其他进程上的模型与之保持一致性。
- DDP包装模型:创建模型,用DistributedDataParallel包装,移动到该进程对应的GPU设备上。
- 准备数据:为DDP建立
DistributedSampler
,以便DataLoader将数据加载给每个GPU上训练的模型。 - 进行训练:每个进程根据DataLoader分担的batch_size来并行处理训练数据。
- 销毁进程组:进程执行完成后,销毁启动的进程组。
init_process_group
参阅torch.distributed的官方文档:
初始化函数原型:
1 | torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='') |
其中,进程组后端是负责提供进程组聚合通信(collective communication)的库。PyTorch支持Gloo, MPI和NCCL三种,推荐的做法是,
- 在分布式GPU训练时使用NCCL;
- 在分布式CPU训练时使用Gloo。
参考资料:
Gloo is a collective communications library. It comes with a number of collective algorithms useful for machine learning applications. These include a barrier, broadcast, and allreduce.
The NVIDIA Collective Communication Library (NCCL) implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and Networking. NCCL provides routines such as all-gather, all-reduce, broadcast, reduce, reduce-scatter as well as point-to-point send and receive that are optimized to achieve high bandwidth and low latency over PCIe and NVLink high-speed interconnects within a node and over NVIDIA Mellanox Network across nodes.
初始化可以选择通过init_method
填写通信地址和端口,也可以通过store
来传入一个进程间共同访问的键值对容器。
1 | # initialize with TCP in this example |
例子中演示了基于init_method
的通信方式,具体采用TCP连接的方式来初始化,也可通过环境变量的方式(见注释掉的代码)。另外,还可以使用共享的文件系统来实现初始化,可参阅torch.distributed
的官方文档。我觉得TCP连接足够简单且兼容性好,这里就以TCP的方式为主了。
DistributedDataParallel
参阅torch.nn.parallel.DistributedDataParallel的官方文档:
首先,需要注意的是,在建立DDP之前,在N个GPU的机器上,spawn出N个进程的时候,需要确保每个进程负责其对应的那一个GPU,不要互相打架。
1 | # ensuring that each process exclusively works on a single GPU |
用torch.nn.parallel.DistributedDataParallel
类包装原模型,并将该进程的模型映射到对应的GPU设备上。
DistributedSampler
参阅torch.utils.data.distributed.DistributedSampler的官方文档:
其实就是在多进程的情况下,每个进程训练数据集的一个子集,不应互相重复,通过DistributedSampler来实现分布式的采样原数据集中的一个子集:
1 | # dataset |
需要注意的是,多epoch场景下,需要在每个epoch开始前用sampler.set_epoch(epoch)
设置当前的epoch,以免每次epoch训练的数据顺序都是相同的。