NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。
本系列大概有 5 篇文章,通过论文和源码和大家一起学习研究。本文将看看 Megatron 如何处理设置并行。
本系列其他文章为:
[源码解析] 模型并行分布式训练Megatron (1) --- 论文 & 基础
[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现
前文我们对模型并行的原理和代码进行了分析,对于给定的模型,现在还需要解决几个问题:
我们接下来就仔细分析一下。
initialize_model_parallel 方法用来设置模型并行,所以我们接下来就具体分析。
因为前文_initialize_distributed之中调用了torch.distributed.init_process_group 初始化分布式环境,所以我们知道,每个进程都有自己的 gloabl rank 和 local rank,都有自己的全局变量。
主要变量如下(具体例子可以结合 initialize_model_parallel 之中的注释来看):
具体如下:
# Intra-layer model parallel group that the current rank belongs to._TENSOR_MODEL_PARALLEL_GROUP = None# Inter-layer model parallel group that the current rank belongs to._PIPELINE_MODEL_PARALLEL_GROUP = None# Model parallel group (both intra- and pipeline) that the current rank belongs to._MODEL_PARALLEL_GROUP = None# Embedding group._EMBEDDING_GROUP = None# Data parallel group that the current rank belongs to._DATA_PARALLEL_GROUP = None_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = None_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = None# These values enable us to change the mpu sizes on the fly._MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE = None_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None_MPU_TENSOR_MODEL_PARALLEL_RANK = None_MPU_PIPELINE_MODEL_PARALLEL_RANK = None# A list of ranks that have a copy of the embedding._EMBEDDING_GLOBAL_RANKS = None# A list of global ranks for each pipeline group to ease calculation of the source# rank when broadcasting from the first or last pipeline stage._PIPELINE_GLOBAL_RANKS = None我们首先把 initialize_model_parallel 代码摘录出来。initialize_model_parallel 作用就是对模型进行分组,然后初始化进程组相关的各种全局变量。
def initialize_model_parallel(tensor_model_parallel_size_=1, pipeline_model_parallel_size_=1, virtual_pipeline_model_parallel_size_=None, pipeline_model_parallel_split_rank_=None): """ Initialize model data parallel groups. Arguments: tensor_model_parallel_size: number of GPUs used for tensor model parallelism. pipeline_model_parallel_size: number of GPUs used for pipeline model parallelism. virtual_pipeline_model_parallel_size: number of virtual stages (interleaved pipeline). pipeline_model_parallel_split_rank: for models with both encoder and decoder, rank in pipeline with split point. Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize the model pipeline. The present function will create 8 tensor model-parallel groups, 4 pipeline model-parallel groups and 8 data-parallel groups as: 8 data_parallel groups: [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15] 8 tensor model-parallel groups: [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15] 4 pipeline model-parallel groups: [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15] Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box. """ if torch.distributed.get_rank() == 0: print('> initializing tensor model parallel with size {}'.format( tensor_model_parallel_size_)) print('> initializing pipeline model parallel with size {}'.format( pipeline_model_parallel_size_)) # Get world size and rank. Ensure some consistencies. world_size = torch.distributed.get_world_size() tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size) pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size) ensure_divisibility(world_size, tensor_model_parallel_size * pipeline_model_parallel_size) data_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size) num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size num_data_parallel_groups = world_size // data_parallel_size if virtual_pipeline_model_parallel_size_ is not None: global _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK global _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0 _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_ if pipeline_model_parallel_split_rank_ is not None: global _PIPELINE_MODEL_PARALLEL_SPLIT_RANK _PIPELINE_MODEL_PARALLEL_SPLIT_RANK = pipeline_model_parallel_split_rank_ rank = torch.distributed.get_rank() # Build the data-parallel groups. global _DATA_PARALLEL_GROUP all_data_parallel_group_ranks = [] for i in range(pipeline_model_parallel_size): start_rank = i * num_pipeline_model_parallel_groups end_rank = (i + 1) * num_pipeline_model_parallel_groups for j in range(tensor_model_parallel_size): ranks = range(start_rank + j, end_rank, tensor_model_parallel_size) all_data_parallel_group_ranks.append(list(ranks)) group = torch.distributed.new_group(ranks) if rank in ranks: _DATA_PARALLEL_GROUP = group # Build the model-parallel groups. global _MODEL_PARALLEL_GROUP for i in range(data_parallel_size): ranks = [data_parallel_group_ranks[i] for data_parallel_group_ranks in all_data_parallel_group_ranks] group = torch.distributed.new_group(ranks) if rank in ranks: _MODEL_PARALLEL_GROUP = group # Build the tensor model-parallel groups. global _TENSOR_MODEL_PARALLEL_GROUP for i in range(num_tensor_model_parallel_groups): ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) group = torch.distributed.new_group(ranks) if rank in ranks: _TENSOR_MODEL_PARALLEL_GROUP = group # Build the pipeline model-parallel groups and embedding groups # (first and last rank in each pipeline model-parallel group). global _PIPELINE_MODEL_PARALLEL_GROUP global _PIPELINE_GLOBAL_RANKS global _EMBEDDING_GROUP global _EMBEDDING_GLOBAL_RANKS for i in range(num_pipeline_model_parallel_groups): ranks = range(i, world_size, num_pipeline_model_parallel_groups) group = torch.distributed.new_group(ranks) if rank in ranks: _PIPELINE_MODEL_PARALLEL_GROUP = group _PIPELINE_GLOBAL_RANKS = ranks # Setup embedding group (to exchange gradients between # first and last stages). if len(ranks) > 1: embedding_ranks = [ranks[0], ranks[-1]] if pipeline_model_parallel_split_rank_ is not None and \ pipeline_model_parallel_split_rank_ not in embedding_ranks: embedding_ranks = [ranks[0], ranks[pipeline_model_parallel_split_rank_], ranks[-1]] else: embedding_ranks = ranks group = torch.distributed.new_group(embedding_ranks) if rank in embedding_ranks: _EMBEDDING_GROUP = group if rank in ranks: _EMBEDDING_GLOBAL_RANKS = embedding_ranks我们使用注释内容来进行学习如何切分模型,如何把多种并行模式组合在一起。
initialize_model_parallel 的注释值得我们深入学习,具体如下:
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and weuse 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelizethe model pipeline. The present function willcreate 8 tensor model-parallel groups, 4 pipeline model-parallel groupsand 8 data-parallel groups as: 8 data_parallel groups: [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15] 8 tensor model-parallel groups: [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15] 4 pipeline model-parallel groups: [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]Note that for efficiency, the caller should make sure adjacent ranksare on the same DGX box. For example if we are using 2 DGX-1 boxeswith a total of 16 GPUs, rank 0 to 7 belong to the first box andranks 8 to 15 belong to the second box.从注释可以知道如下信息:
假定目前有16个GPU,属于两个node,rank 0 ~7 属于第一个节点,rank 8 ~ 15 属于第二个节点。
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups,这说明将一个完整模型切分如下:
因为张量模型并行组大小是2,即16个GPU被分成8组,则这8组内容是 [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。
因为流水线并行组大小是4,即16个GPU被分成4组,则这4组内容是[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。
因为数据并行组大小是2,16个GPU被分成8组,则这8组内容是[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。
以上这些进程组都是通过 torch.distributed.new_group 来完成,这样组内进程之间就知道哪些进程是在同一个组内,是在一起训练的,也知道怎么通信。
模型原始图如下

模型切分之后如下,一共被分成8块。其中,第一层被切分为 A,B,所以 A,B 之间就是 Tensor Model parallel。后面 C,D 之间也是 Tensor Model parallel,把两层都做了切分,依次类推。

我们的目标就是用代码来看看如何生成注释里面的各种模型组。
我们接下来看看具体切分的策略,也就是GPU分配策略。切分需要综合考虑多种情况,首先看看模型并行的通信状况。
我们接下来看看各种并行机制的对比。
最后看看结论
我们接下来做一个实验看看。
import torchworld_size = 16tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensorpipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipelinedata_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size) # 2num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4num_data_parallel_groups = world_size // data_parallel_size # 8# Build the data-parallel groups.print("------ Build the data-parallel groups -----")all_data_parallel_group_ranks = []for i in range(pipeline_model_parallel_size): start_rank = i * num_pipeline_model_parallel_groups end_rank = (i + 1) * num_pipeline_model_parallel_groups for j in range(tensor_model_parallel_size): ranks = range(start_rank + j, end_rank, tensor_model_parallel_size) all_data_parallel_group_ranks.append(list(ranks))print(all_data_parallel_group_ranks)# Build the model-parallel groups.print("------ Build the model-parallel groups -----")for i in range(data_parallel_size): ranks = [data_parallel_group_ranks[i] for data_parallel_group_ranks in all_data_parallel_group_ranks] print(list(ranks))# Build the tensor model-parallel groups.print("------ Build the tensor model-parallel groups -----")for i in range(num_tensor_model_parallel_groups): ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) print(list(ranks))# Build the pipeline model-parallel groups and embedding groups# (first and last rank in each pipeline model-parallel group).print("------ Build the pipeline model-parallel groups -----")for i in range(num_pipeline_model_parallel_groups): ranks = range(i, world_size, num_pipeline_model_parallel_groups) print(list(ranks))输出如下。需要注意,这里都是 GPU 的序列号,[0,2] 就是 [g0, g2]:
------ Build the data-parallel groups -----[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]------ Build the model-parallel groups -----[0, 1, 4, 5, 8, 9, 12, 13][2, 3, 6, 7, 10, 11, 14, 15]------ Build the tensor model-parallel groups -----[0, 1][2, 3][4, 5][6, 7][8, 9][10, 11][12, 13][14, 15]------ Build the pipeline model-parallel groups -----[0, 4, 8, 12][1, 5, 9, 13][2, 6, 10, 14][3, 7, 11, 15]我们对比一下注释,发现代码打印结果可以和注释对应上: Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize the model pipeline. The present function will create 8 tensor model-parallel groups, 4 pipeline model-parallel groups and 8 data-parallel groups as: 8 data_parallel groups: [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15] 8 tensor model-parallel groups: [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15] 4 pipeline model-parallel groups: [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]我们接下来会进行具体分析。
从注释中可以看到:
Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box.意思就是:调用者需要确保相邻的rank在同一个节点上,我们例子有两个Node,其中第一个Node拥有 GPU 0 ~ 7,就是 rank 0 ~ 7,第二个Node是 GPU 8~15,就是 rank 8 ~ 15。
具体如下,这里每行4个GPU,是因为 4 GPUs to parallelize the model pipeline,所以流水线每个stage是4个GPU。

下面是论文之中提到的一些符号,这里有必要再取出来温习一下:
(??, ??, ??): Parallelization dimensions.
?? for the pipeline-modelparallel size,
?? for the tensor-model-parallel size, and ?? for the data-parallel size.
??: Number of GPUs. We require ?? · ?? · ?? = ??.
依据注释,我们得出目前分组情况和一些全局信息。
接下来结合代码看看需要分成多少个process groups,他们在代码之中的变量是什么。
具体如下:
world_size = 16tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensorpipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipelinedata_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size) # 2num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4num_data_parallel_groups = world_size // data_parallel_size # 8本节我们分析的是,如何将 Node 上的 GPU 分给 tensor model 并行组。
对于注释例子,16 / 2 = 8,分成 8 个进程组,每个组 两个 rank。这些分组分别是:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15],我们得到了如下信息:
[g0, g1] 就是某一层分切为2半,分别被 g0, g1 来执行,[g2, g3] 表示另一层被分为两层,分别被 g2,g3 来执行。
我们可以看到,每一个 tensor-model-parallel group的 rank一定是相邻的,比如 [g0, g1], [g2, g3]。
注意,0 ~ 7 不代表是同一个模型。0 ~ 7 是同一个 Node 上的 GPU,这点容易被混淆。
我们再看看代码:
# Build the tensor model-parallel groups. global _TENSOR_MODEL_PARALLEL_GROUP for i in range(num_tensor_model_parallel_groups): # 8 ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) group = torch.distributed.new_group(ranks) # 就有生成 8 组 if rank in ranks: # 如果本rank在某一list之中,即1 在 [0,1] 之中,则本 rank 就属于 new_group([0,1]) _TENSOR_MODEL_PARALLEL_GROUP = group 我们实验之中在这里得到:
------ Build the tensor model-parallel groups -----[0, 1][2, 3][4, 5][6, 7][8, 9][10, 11][12, 13][14, 15]对应我们图上如下,每个 tensor model group 用一个虚线小矩形框标示,一共8个:

_TENSOR_MODEL_PARALLEL_GROUP = group 就记录了本rank的进程组信息,比如 rank 2,它的 _TENSOR_MODEL_PARALLEL_GROUP 内容就是:group([g2, g3])。
我们接下来看看如何使用。
get_tensor_model_parallel_group 返回了自己 rank 对应的 tensor model group。
def get_tensor_model_parallel_group(): """Get the tensor model parallel group the caller rank belongs to.""" return _TENSOR_MODEL_PARALLEL_GROUP在 megatron/mpu/mappings.py 之中有对 tensor model group 的使用:
def _reduce(input_): """All-reduce the input tensor across model parallel group.""" # Bypass the function if we are using only 1 GPU. if get_tensor_model_parallel_world_size()==1: return input_ # All-reduce. torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group()) return input_就是当流水线反向传播时候,利用 _TENSOR_MODEL_PARALLEL_GROUP 进行在组内进行集合通信。
本节我们分析的是,如何将 Node 上的 GPU 分给 pipeline model 并行组。
从注释可以看到,流水线分组就是把这个16个GPU 分成 4 组,每组 4 个 GPU,得到 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15],我们得到了如下信息:
每组的四个GPU进行模型流水线并行,所以 pipeline_model_parallel_size = 4。就是 Notation 之中的 p。其实,就是流水线深度为 4, 每组内 4 个 GPU 是串行的。即, [g0, g4, g8, g12] 这4个 GPU是串行的。
再看看流水线的每一层,含有 16 / 4 = 4 个 GPU,能看到第一层是 0 ~ 4,第二层是 5 ~ 8,......。
可以看到,流水线的 group是隔 n // p个取一个,比如[0, 4, 8, 12]。
对于流水线每个stage,则是stage i 的 rank 范围是:[(i-1) * n//p, (i) * n//p],即 rank 2 所在的stage 的rank是 [0,1,2,3]。
_PIPELINE_MODEL_PARALLEL_GROUP 得到了本rank对应的流水线进程组。
_PIPELINE_GLOBAL_RANKS 得到了进程组的ranks。
假如本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14]。
具体代码如下:
# Build the pipeline model-parallel groups and embedding groups # (first and last rank in each pipeline model-parallel group). global _PIPELINE_MODEL_PARALLEL_GROUP global _PIPELINE_GLOBAL_RANKS global _EMBEDDING_GROUP for i in range(num_pipeline_model_parallel_groups): # 4 ranks = range(i, world_size, # 每隔 n // p个取一个 num_pipeline_model_parallel_groups) group = torch.distributed.new_group(ranks) if rank in ranks: _PIPELINE_MODEL_PARALLEL_GROUP = group _PIPELINE_GLOBAL_RANKS = ranks # Setup embedding group (to exchange gradients between # first and last stages). if len(ranks) > 1: embedding_ranks = [ranks[0], ranks[-1]] else: embedding_ranks = ranks group = torch.distributed.new_group(embedding_ranks) if rank in embedding_ranks: _EMBEDDING_GROUP = group我们拓展之前图如下,现在看到增加了 4 条从上到下的虚线箭头,分别对应了 4 组流水线串行。横向层是从 Stage 0 ~ Stage 3。

接下来看看如何使用。
get_pipeline_model_parallel_group 返回了自己 rank 对应的 pipeline model group。
def get_pipeline_model_parallel_group(): """Get the pipeline model parallel group the caller rank belongs to.""" return _PIPELINE_MODEL_PARALLEL_GROUP具体使用是在 megatron/p2p_communication.py,_communicate 之中会用流水线组信息来进行通信。这里省略了大部分代码。
def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next, use_ring_exchange=False, tensor_shape=None, override_scatter_gather_tensors_in_pipeline=False, dtype_=None): """Communicate tensors between stages. Used as helper method in other communication methods that are used in megatron/schedules.py. """ # Send tensors in both the forward and backward directions as appropriate. if use_ring_exchange: # 这里使用get_pipeline_model_parallel_group 进行通信 torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev, tensor_recv_prev=tensor_recv_prev, tensor_send_next=tensor_send_next, tensor_recv_next=tensor_recv_next, group=mpu.get_pipeline_model_parallel_group()) else: ops = [] if tensor_send_prev is not None: send_prev_op = torch.distributed.P2POp( torch.distributed.isend, tensor_send_prev, mpu.get_pipeline_model_parallel_prev_rank()) # 得到流水线前一个rank ops.append(send_prev_op) if tensor_recv_prev is not None: recv_prev_op = torch.distributed.P2POp( torch.distributed.irecv, tensor_recv_prev, mpu.get_pipeline_model_parallel_prev_rank()) ops.append(recv_prev_op) if tensor_send_next is not None: send_next_op = torch.distributed.P2POp( torch.distributed.isend, tensor_send_next, mpu.get_pipeline_model_parallel_next_rank()) # 得到流水线下一个rank ops.append(send_next_op) if tensor_recv_next is not None: recv_next_op = torch.distributed.P2POp( torch.distributed.irecv, tensor_recv_next, mpu.get_pipeline_model_parallel_next_rank()) ops.append(recv_next_op)具体如何得到流水线上下游的rank?是通过 get_pipeline_model_parallel_next_rank 和 get_pipeline_model_parallel_prev_rank 来完成。其中_PIPELINE_GLOBAL_RANKS 得到了进程组的ranks,假如本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14]。
def get_pipeline_model_parallel_next_rank(): rank_in_pipeline = get_pipeline_model_parallel_rank() world_size = get_pipeline_model_parallel_world_size() return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]def get_pipeline_model_parallel_prev_rank(): rank_in_pipeline = get_pipeline_model_parallel_rank() world_size = get_pipeline_model_parallel_world_size() return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]get_pipeline_model_parallel_world_size 得到了进程组的 world size。
def get_pipeline_model_parallel_world_size(): """Return world size for the pipeline model parallel group.""" global _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE is not None: return _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE return torch.distributed.get_world_size(group=get_pipeline_model_parallel_group())我们接下来看看数据并行。
对于注释例子,16 / 2 = 8,分成 8 个进程组,每个组 两个 rank。这些分组分别是:[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15],我们得到了如下信息:
我们再看看用代码怎么确定有哪些group,每个group里面包含什么。
ranks = range(start_rank + j, end_rank, tensor_model_parallel_size) ,意思是这stage的n//p个GPUs中,每隔 t 个取一个作为数据并行 group 之中的一份子,因此每个data-parallel group大小为 n // p // t = d。具体代码如下:
# Build the data-parallel groups. global _DATA_PARALLEL_GROUP assert _DATA_PARALLEL_GROUP is None, \ 'data parallel group is already initialized' all_data_parallel_group_ranks = [] for i in range(pipeline_model_parallel_size): # 遍历流水线深度 start_rank = i * num_pipeline_model_parallel_groups # 找到每个stage的起始rank end_rank = (i + 1) * num_pipeline_model_parallel_groups # 找到每个stage的终止rank for j in range(tensor_model_parallel_size): # 遍历tensor model分组size ranks = range(start_rank + j, end_rank, # 每隔 t 个取一个作为数据并行group中的一份子 tensor_model_parallel_size) all_data_parallel_group_ranks.append(list(ranks)) group = torch.distributed.new_group(ranks) if rank in ranks: _DATA_PARALLEL_GROUP = group打印输出如下,和注释一致。
------ Build the data-parallel groups -----[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]对应图片拓展如下:其中,每个新增的双箭头对应一个DDP(两个rank),比如[2, 3]对应一个DDP。

我们接下来看看如何使用。
get_data_parallel_group 会得到本rank对应的 _DATA_PARALLEL_GROUP。
def get_data_parallel_group(): """Get the data parallel group the caller rank belongs to.""" return _DATA_PARALLEL_GROUP在 allreduce_gradients之中,会对本数据并行组进行all-reduce。
def allreduce_gradients(self): """Reduce gradients across data parallel ranks.""" # If we have buffers, simply reduce the data in the buffer. if self._grad_buffers is not None: for _, buffer_ in self._grad_buffers.items(): buffer_.data /= mpu.get_data_parallel_world_size() # 数据并行 world size torch.distributed.all_reduce( buffer_.data, group=mpu.get_data_parallel_group()) # 数据并行组 else: # Otherwise, bucketize and all-reduce buckets = {} # Pack the buckets. for param in self.module.parameters(): if param.requires_grad and param.grad is not None: tp = param.data.type() if tp not in buckets: buckets[tp] = [] buckets[tp].append(param) param.main_grad = param.grad # For each bucket, all-reduce and copy all-reduced grads. for tp in buckets: bucket = buckets[tp] grads = [param.grad.data for param in bucket] coalesced = _flatten_dense_tensors(grads) coalesced /= mpu.get_data_parallel_world_size() torch.distributed.all_reduce( coalesced, group=mpu.get_data_parallel_group()) for buf, synced in zip(grads, _unflatten_dense_tensors( coalesced, grads)): buf.copy_(synced)前面实验中,我们得到模型并行组如下:[0, 1, 4, 5, 8, 9, 12, 13] [2, 3, 6, 7, 10, 11, 14, 15]。生成代码如下:
# Build the model-parallel groups. global _MODEL_PARALLEL_GROUP for i in range(data_parallel_size): ranks = [data_parallel_group_ranks[i] for data_parallel_group_ranks in all_data_parallel_group_ranks] group = torch.distributed.new_group(ranks) if rank in ranks: _MODEL_PARALLEL_GROUP = group_MODEL_PARALLEL_GROUP 会得到本rank对应的模型组。
def get_model_parallel_group(): """Get the model parallel group the caller rank belongs to.""" return _MODEL_PARALLEL_GROUP这里是裁剪梯度会用到,就是在本模型的全部rank之中进行梯度裁剪相关操作。
def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): """Clips gradient norm of an iterable of parameters whose gradients are in fp32. This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and added functionality to handle model parallel parameters. Note that the gradients are modified in place. Arguments: parameters (Iterable[Tensor] or Tensor): an iterable of Tensors or a single Tensor that will have gradients normalized max_norm (float or int): max norm of the gradients norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for infinity norm. Returns: Total norm of the parameters (viewed as a single vector). """ if isinstance(parameters, torch.Tensor): parameters = [parameters] # Filter parameters based on: # - grad should not be none # - parameter should not be shared # - should not be a replica due to tensor model parallelism grads = [] grads_for_norm = [] for param in parameters: grad_not_none = param.grad is not None is_not_shared = param_is_not_shared(param) is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param) grad = param.grad.detach() if grad_not_none: # Make sure the grads are in fp32 grads.append(grad) if grad_not_none and is_not_shared and is_not_tp_duplicate: grads_for_norm.append(grad) # Norm parameters. max_norm = float(max_norm) norm_type = float(norm_type) total_norm = 0.0 # Calculate norm. if norm_type == inf: total_norm = max(grad.abs().max() for grad in grads_for_norm) total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)]) # Take max across all model-parallel GPUs. torch.distributed.all_reduce(total_norm_cuda, op=torch.distributed.ReduceOp.MAX, group=mpu.get_model_parallel_group()) # 模型组信息 total_norm = total_norm_cuda[0].item() else: if norm_type == 2.0: dummy_overflow_buf = torch.cuda.IntTensor([0]) # Use apex's multi-tensor applier for efficiency reasons. # Multi-tensor applier takes a function and a list of list # and performs the operation on that list all in one kernel. grad_norm, _ = multi_tensor_applier( amp_C.multi_tensor_l2norm, dummy_overflow_buf, [grads_for_norm], False # no per-parameter norm ) # Since we will be summing across data parallel groups, # we need the pow(norm-type). total_norm = grad_norm ** norm_type else: for grad in grads_for_norm: grad_norm = torch.norm(grad, norm_type) total_norm += grad_norm ** norm_type # Sum across all model-parallel GPUs. torch.distributed.all_reduce(total_norm, op=torch.distributed.ReduceOp.SUM, group=mpu.get_model_parallel_group()) # 模型组信息 total_norm = total_norm.item() ** (1.0 / norm_type) # Scale. clip_coeff = max_norm / (total_norm + 1.0e-6) if clip_coeff < 1.0: dummy_overflow_buf = torch.cuda.IntTensor([0]) multi_tensor_applier(amp_C.multi_tensor_scale, dummy_overflow_buf, [grads, grads], clip_coeff) return total_norm之前的图如下,利用看到分成两组,左边是Model 0 对应的全部ranks,右面是model 1 的ranks。

我们最后还有一个问题没有涉及,就是如何把模型分块放到对应的GPU之上。就是如何与最初分成A,B,..., H 的那个图对应起来。其实,不是根据模型来把模型部分拷贝到对应的rank或者GPU,而是rank或者GPU主动过来拷贝自己对应的层。
具体 ParallelTransformer 初始化代码如下:
class ParallelTransformer(MegatronModule): """Transformer class.""" def __init__(self, init_method, output_layer_init_method, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding, pre_process=True, post_process=True): super(ParallelTransformer, self).__init__() args = get_args() # 省略代码 # Transformer layers. def build_layer(layer_number): return ParallelTransformerLayer( init_method, output_layer_init_method, layer_number, layer_type=layer_type, self_attn_mask_type=self_attn_mask_type) # 下面 offset 就是根据rank知道自己应该生成模型的那些层 if args.virtual_pipeline_model_parallel_size is not None: # Number of layers in each model chunk is the number of layers in the stage, # divided by the number of model chunks in a stage. self.num_layers = self.num_layers // args.virtual_pipeline_model_parallel_size # With 8 layers, 2 stages, and 4 model chunks, we want an assignment of # layers to stages like (each list is a model chunk): # Stage 0: [0] [2] [4] [6] # Stage 1: [1] [3] [5] [7] # With 8 layers, 2 stages, and 2 virtual stages, we want an assignment of # layers to stages like (each list is a model chunk): # Stage 0: [0, 1] [4, 5] # Stage 1: [2, 3] [6, 7] offset = mpu.get_virtual_pipeline_model_parallel_rank() * ( args.num_layers // args.virtual_pipeline_model_parallel_size) + \ (mpu.get_pipeline_model_parallel_rank() * self.num_layers) else: # Each stage gets a contiguous set of layers. offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers self.layers = torch.nn.ModuleList( [build_layer(i + 1 + offset) for i in range(self.num_layers)]) if self.post_process: # Final layer norm before output. self.final_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon)所以,最终效果如下,其中同名子模块具有同样的参数,可以数据并行,即两个A可以数据并行。一列上的层之间可以流水线串行,比如 A--> C --> E --> G 就是串行,而一个横行4个是流水线的一个stage,其中从0开始,横向相邻两个GPU是 tensor model 并行。

[细读经典]Megatron论文和代码详细分析(2)
[细读经典]Megatron论文和代码详细分析(1)
Megatron-LM源码阅读(一)
Megatron-LM源码阅读(二)
megatron学习总结
GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism
如何评价 NVIDIA 发布的 DGX-1?