DistributedDataParallel(DDP) - PyTorch多进程并行计算

PyTorch的DistributedDataParallel(DDP)可以实现多进程的并行计算,相较于传统的单进程多线程的DataParallel,DDP支持多节点的分布式计算。即使在单机多卡的场景下,DDP通常性能也更好,因为它不仅规避了Python多线程的全局解释器锁争用(GIL contention)造成的性能开销,而且还不需要在多GPU训练中频繁复制同步模型、分发输入数据和收集模型输出。

DistributedDataParallel(DDP) - PyTorch多进程并行计算

背景

Python GIL

GIL(Global Interpreter Lock)指的是全局解释器锁,由CPython解释器引入。因为CPython解释器的内存管理是线程不安全的,所以为了避免多线程同时执行Python字节码造成线程安全问题,就加了这么一个全局的互斥锁。可也正是因为这个全局互斥锁,导致Python的多线程实际上同时只有一个线程在运行,显然无法充分利用多处理器的性能。

参考官方解释:

The Python GIL

Python has one peculiarity that makes concurrent programming harder. It’s called the GIL, short for Global Interpreter Lock. The GIL makes sure there is, at any time, only one thread running. Because only one thread can run at a time, it’s impossible to use multiple processors with threads. But don’t worry, there’s a way around this.

The GIL was invented because CPython’s memory management is not thread-safe. With only one thread running at a time, CPython can rest assured there will never be race conditions.

DistributedDataParallel较DataParallel的优势

DistributedDataParallel(DDP)相较于DataParallel(DP)有诸多优势,包括功能上的优势和性能上的优势:

功能上:

  1. DDP的原理是多进程,因此DDP支持多机多卡的分布式计算,而DP是但经常多线程,因此最高只支持单机多卡;
  2. DDP支持模型并行(model parallel),可以把一个模型拆成几个阶段来跑,而DP还不支持。

性能上:

  1. 正是因为DDP基于多进程(通常推荐1个GPU匹配一个工作进程),所以不像DP那样基于单进程多线程的并行性能受到GIL争用开销的阻碍。
  2. 在单机多卡的情况下,DP需要在训练中频繁在多卡之间复制模型以完成同步,需要分发(scatter)输入和收集(gather)输出,而DDP采用的All-Reduce算法采取聚合通信(collective communication)的方式收集梯度,其性能更好。

总的来讲,功能上的优势其实也是为了更好利用设备性能。

原理

原理可参阅:

Distributed Data Parallel - PyTorch master Documentation

另有一篇2020年的论文:

Li S, Zhao Y, Varma R, et al. PyTorch distributed: experiences on accelerating data parallel training[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 3005-3018.

PDF on vldb.org

总的架构可以参考:

  • Distributed System
    • Node 0
      • Process0 [Global Rank=0, Local Rank=0] -> GPU 0-0
      • Process1 [Global Rank=1, Local Rank=1] -> GPU 0-1
      • Process2 [Global Rank=2, Local Rank=2] -> GPU 0-2
      • Process3 [Global Rank=3, Local Rank=3] -> GPU 0-3
    • Node 1
      • Process4 [Global Rank=4, Local Rank=0] -> GPU 1-0
      • Process5 [Global Rank=5, Local Rank=1] -> GPU 1-1
      • Process6 [Global Rank=6, Local Rank=2] -> GPU 1-2
      • Process7 [Global Rank=7, Local Rank=3] -> GPU 1-3

在这样的架构中,有如下术语和数值:

  • N=2 Nodes
  • G=4 GPUs per node
  • W=8 Application processes across all nodes (aka. World Size)
  • L=4 Application processes on each nodes (aka. Local Size)

使用

相较于DataParallel只需要简单地套到原模型上,DistributedDataParallel因为其原理是基于多进程的,因此写起来会稍微显得复杂一点点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Distributed Data Parallel (DDP) example

Author: HearyShen
Date: 2020.12.17
"""
import time
import os
import random
from argparse import ArgumentParser

import torch
import torch.utils.data as data
import torch.nn as nn
import torch.cuda as cuda
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.optim as optim
import torchvision.models as models

DIST_DEFAULT_BACKEND = 'nccl'
DIST_DEFAULT_ADDR = 'localhost'
DIST_DEFAULT_PORT = '12344'
DIST_DEFAULT_INIT_METHOD = f'tcp://{DIST_DEFAULT_ADDR}:{DIST_DEFAULT_PORT}'
DIST_DEFAULT_WORLD_SIZE = cuda.device_count()

DEFAULT_BATCH_SIZE = 64
DEFAULT_NUM_WORKERS_PER_GPU = 8


class TinyNet(nn.Module):
def __init__(self):
super().__init__()
self.mlp = nn.Linear(3, 2)

def forward(self, x):
out = self.mlp(x)
return out


class TinyDataset(data.dataset.Dataset):
def __getitem__(self, index):
x = torch.rand([3, 224, 224])
y = random.randint(0, 999)
return x, y

def __len__(self):
return 10000


def worker(rank, args):
model = models.resnet50(pretrained=True)
if args.distributed:
print(
f"[{os.getpid()}] Initializing {rank}/{DIST_DEFAULT_WORLD_SIZE} at {DIST_DEFAULT_INIT_METHOD}"
)

# initialize with TCP in this example
dist.init_process_group(backend=DIST_DEFAULT_BACKEND,
init_method=DIST_DEFAULT_INIT_METHOD,
world_size=DIST_DEFAULT_WORLD_SIZE,
rank=rank)

# # Another way to initialize with environment variables
# os.environ["MASTER_PORT"] = DIST_DEFAULT_PORT
# os.environ["MASTER_ADDR"] = DIST_DEFAULT_ADDR
# os.environ["WORLD_SIZE"] = str(DIST_DEFAULT_WORLD_SIZE)
# os.environ["RANK"] = str(rank)
# dist.init_process_group(backend=DIST_DEFAULT_BACKEND)

print(
f"[{os.getpid()}] Computing {rank}/{DIST_DEFAULT_WORLD_SIZE} at {DIST_DEFAULT_INIT_METHOD}"
)
# ensuring that each process exclusively works on a single GPU
torch.cuda.set_device(rank)
model.cuda(rank)
# When using a single GPU per process and per
# DistributedDataParallel, we need to divide the batch size
# ourselves based on the total number of GPUs we have
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank])
else:
model = nn.DataParallel(model).cuda()

loss_func = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

# dataset
dataset = TinyDataset()
dist_sampler = data.distributed.DistributedSampler(
dataset) if args.distributed else None
dataloader = data.dataloader.DataLoader(
dataset,
batch_size=DEFAULT_BATCH_SIZE // DIST_DEFAULT_WORLD_SIZE if args.distributed else DEFAULT_BATCH_SIZE,
shuffle=(dist_sampler is None),
num_workers=DEFAULT_NUM_WORKERS_PER_GPU if args.distributed else DEFAULT_NUM_WORKERS_PER_GPU * DIST_DEFAULT_WORLD_SIZE,
sampler=dist_sampler)

# train
model = model.train()
for epoch in range(2):
if args.distributed:
dist_sampler.set_epoch(epoch)
for i, (x, label) in enumerate(dataloader):
y = model(x)
loss = loss_func(y, label.to(y.device))

optimizer.zero_grad()
loss.backward()
optimizer.step()

if args.distributed:
print(
f"[{os.getpid()}] Epoch-{epoch} ended {rank}/{DIST_DEFAULT_WORLD_SIZE} at {DIST_DEFAULT_INIT_METHOD} on {y.device}"
)
else:
print(f"[{os.getpid()}] Epoch-{epoch} ended on {y.device}")

if args.distributed:
print(
f"[{os.getpid()}] Finishing {rank}/{DIST_DEFAULT_WORLD_SIZE} at {DIST_DEFAULT_INIT_METHOD} on {y.device}"
)
dist.destroy_process_group()


def launch(args):
tic = time.time()
if args.distributed:
mp.spawn(worker,
args=(args, ),
nprocs=DIST_DEFAULT_WORLD_SIZE,
join=True)
else:
worker(None, args)
toc = time.time()
print(f"Finished in {toc-tic:.2f}s")


if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("-d", "--distributed", action="store_true")
args = parser.parse_args()

launch(args)

spawn创建多进程

1
2
3
4
5
6
7
8
9
10
11
12
def launch(args):
tic = time.time()
if args.distributed:
mp.spawn(worker,
args=(args, ),
nprocs=DIST_DEFAULT_WORLD_SIZE,
join=True)
else:
worker(None, args)
toc = time.time()
print(f"Finished in {toc-tic:.2f}s")

launch使用multiprocessing.spawn来快速创建nprocs个新进程,每个进程都执行worker函数,并传入args作为函数参数。

需要注意的是,spawn默认会为函数传入一个i,且i[0, nprocs)之间。即,worker函数收到的参数列表是(i, args, )

worker多进程并行

每一个worker进程做的工作分以下几个阶段:

  1. 初始化进程组:并行启动的多进程相互之间得形成一个进程组,即,要知道在哪会合(rendezvous)。DDP的机制会把以rank=0进程上的模型为准,自动保证其他进程上的模型与之保持一致性。
  2. DDP包装模型:创建模型,用DistributedDataParallel包装,移动到该进程对应的GPU设备上。
  3. 准备数据:为DDP建立DistributedSampler,以便DataLoader将数据加载给每个GPU上训练的模型。
  4. 进行训练:每个进程根据DataLoader分担的batch_size来并行处理训练数据。
  5. 销毁进程组:进程执行完成后,销毁启动的进程组。

init_process_group

参阅torch.distributed的官方文档:

Distributed communication package - torch.distributed

初始化函数原型:

1
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')

其中,进程组后端是负责提供进程组聚合通信(collective communication)的库。PyTorch支持Gloo, MPI和NCCL三种,推荐的做法是,

  • 在分布式GPU训练时使用NCCL;
  • 在分布式CPU训练时使用Gloo。

参考资料:

Gloo

Gloo is a collective communications library. It comes with a number of collective algorithms useful for machine learning applications. These include a barrier, broadcast, and allreduce.

NVIDIA NCCL

The NVIDIA Collective Communication Library (NCCL) implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and Networking. NCCL provides routines such as all-gather, all-reduce, broadcast, reduce, reduce-scatter as well as point-to-point send and receive that are optimized to achieve high bandwidth and low latency over PCIe and NVLink high-speed interconnects within a node and over NVIDIA Mellanox Network across nodes.

初始化可以选择通过init_method填写通信地址和端口,也可以通过store来传入一个进程间共同访问的键值对容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
# initialize with TCP in this example
dist.init_process_group(backend=DIST_DEFAULT_BACKEND,
init_method=DIST_DEFAULT_INIT_METHOD,
world_size=DIST_DEFAULT_WORLD_SIZE,
rank=rank)

# # Another way to initialize with environment variables
# os.environ["MASTER_PORT"] = DIST_DEFAULT_PORT
# os.environ["MASTER_ADDR"] = DIST_DEFAULT_ADDR
# os.environ["WORLD_SIZE"] = str(DIST_DEFAULT_WORLD_SIZE)
# os.environ["RANK"] = str(rank)
# dist.init_process_group(backend=DIST_DEFAULT_BACKEND)

例子中演示了基于init_method的通信方式,具体采用TCP连接的方式来初始化,也可通过环境变量的方式(见注释掉的代码)。另外,还可以使用共享的文件系统来实现初始化,可参阅torch.distributed的官方文档。我觉得TCP连接足够简单且兼容性好,这里就以TCP的方式为主了。

DistributedDataParallel

参阅torch.nn.parallel.DistributedDataParallel的官方文档:

torch.nn.parallel.DistributedDataParallel

首先,需要注意的是,在建立DDP之前,在N个GPU的机器上,spawn出N个进程的时候,需要确保每个进程负责其对应的那一个GPU,不要互相打架。

1
2
3
4
5
6
7
# ensuring that each process exclusively works on a single GPU
torch.cuda.set_device(rank)
model.cuda(rank)
# When using a single GPU per process and per
# DistributedDataParallel, we need to divide the batch size
# ourselves based on the total number of GPUs we have
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank])

torch.nn.parallel.DistributedDataParallel类包装原模型,并将该进程的模型映射到对应的GPU设备上。

DistributedSampler

参阅torch.utils.data.distributed.DistributedSampler的官方文档:

torch.utils.data.distributed.DistributedSampler

其实就是在多进程的情况下,每个进程训练数据集的一个子集,不应互相重复,通过DistributedSampler来实现分布式的采样原数据集中的一个子集:

1
2
3
4
5
6
7
8
9
10
# dataset
dataset = TinyDataset()
dist_sampler = data.distributed.DistributedSampler(
dataset) if args.distributed else None
dataloader = data.dataloader.DataLoader(
dataset,
batch_size=DEFAULT_BATCH_SIZE // DIST_DEFAULT_WORLD_SIZE if args.distributed else DEFAULT_BATCH_SIZE,
shuffle=(dist_sampler is None),
num_workers=DEFAULT_NUM_WORKERS_PER_GPU if args.distributed else DEFAULT_NUM_WORKERS_PER_GPU * DIST_DEFAULT_WORLD_SIZE,
sampler=dist_sampler)

需要注意的是,多epoch场景下,需要在每个epoch开始前用sampler.set_epoch(epoch)设置当前的epoch,以免每次epoch训练的数据顺序都是相同的。