NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。
本系列大概有6~7篇文章,通过论文和源码和大家一起学习研究。本文将看看 Megatron 如何处理模型并行。
本系列其他文章为:
[源码解析] 模型并行分布式训练Megatron (1) --- 论文 & 基础
[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
在论文篇之中,我们了解到,因为模型越来越大,其尺寸远远超过了处理器的内存限制,因此产生了诸如激活检查点(activation checkpointing)这样的内存管理技术。而模型并行则通过对模型进行各种分片来克服单个处理器内存限制,这样模型权重和其关联的优化器状态就可以分散到多个设备之上。
ParallelTransformerLayer 就是对 Transformer 层的并行实现,所以我们接着分析。
ParallelTransformerLayer 初始化方法之中,建立了如下:
class ParallelTransformerLayer(MegatronModule): """A single transformer layer. Transformer layer takes input with size [b, s, h] and returns an output of the same size. """ def __init__(self, init_method, output_layer_init_method, layer_number, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding): args = get_args() super(ParallelTransformerLayer, self).__init__() self.layer_number = layer_number self.layer_type = layer_type self.apply_residual_connection_post_layernorm \ = args.apply_residual_connection_post_layernorm self.bf16 = args.bf16 self.fp32_residual_connection = args.fp32_residual_connection # Layernorm on the input data. self.input_layernorm = LayerNorm( # 生成一个LayerNorm处理输入数据 args.hidden_size, eps=args.layernorm_epsilon, no_persist_layer_norm=args.no_persist_layer_norm) # Self attention. self.self_attention = ParallelAttention( # 生成并行Attention init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, attn_mask_type=self_attn_mask_type) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion # Layernorm on the attention output self.post_attention_layernorm = LayerNorm( # 生成处理attention输出的LayerNorm args.hidden_size, eps=args.layernorm_epsilon, no_persist_layer_norm=args.no_persist_layer_norm) if self.layer_type == LayerType.decoder: # 如果本层是decoder self.inter_attention = ParallelAttention( # 则生成一个ParallelAttention init_method, output_layer_init_method, layer_number, attention_type=AttnType.cross_attn) # Layernorm on the attention output. self.post_inter_attention_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, no_persist_layer_norm=args.no_persist_layer_norm) # MLP self.mlp = ParallelMLP(init_method, # 生成一个并行MLP output_layer_init_method)对应就是:

其前向传播方法如下,就是调用各种成员函数进行前向操作。
def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, inference_params=None): # hidden_states: [b, s, h] # Layer norm at the beginning of the transformer layer. layernorm_output = self.input_layernorm(hidden_states) # 对输入进行处理 # Self attention. attention_output, attention_bias = \ # attention操作 self.self_attention( layernorm_output, attention_mask, inference_params=inference_params) # Residual connection. 残差连接 if self.apply_residual_connection_post_layernorm: residual = layernorm_output #norm之后结果作为X else: residual = hidden_states # 原始输入X # jit scripting for a nn.module (with dropout) is not # trigerring the fusion kernel. For now, we use two # different nn.functional routines to account for varying # dropout semantics during training and inference phases. if self.bias_dropout_fusion: # dropout操作 if self.training: bias_dropout_add_func = bias_dropout_add_fused_train else: bias_dropout_add_func = bias_dropout_add_fused_inference else: bias_dropout_add_func = get_bias_dropout_add(self.training) # re-enable torch grad to enable fused optimization. with torch.enable_grad(): layernorm_input = bias_dropout_add_func( # dropout操作 attention_output, attention_bias.expand_as(residual), residual, self.hidden_dropout) # Layer norm post the self attention. layernorm_output = self.post_attention_layernorm(layernorm_input) # 处理attention输出 if self.layer_type == LayerType.decoder: attention_output, attention_bias = \ self.inter_attention(layernorm_output, enc_dec_attn_mask, encoder_output=encoder_output) # residual connection if self.apply_residual_connection_post_layernorm: residual = layernorm_output else: residual = layernorm_input # re-enable torch grad to enable fused optimization. with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, attention_bias.expand_as(residual), residual, self.hidden_dropout) # Layer norm post the decoder attention layernorm_output = self.post_inter_attention_layernorm(layernorm_input) # MLP. mlp_output, mlp_bias = self.mlp(layernorm_output) # MLP操作 # Second residual connection. if self.apply_residual_connection_post_layernorm: # 残差操作 residual = layernorm_output else: residual = layernorm_input # re-enable torch grad to enable fused optimization. with torch.enable_grad(): output = bias_dropout_add_func( # dropout操作 mlp_output, mlp_bias.expand_as(residual), residual, self.hidden_dropout) return outputParallelTransformerLayer 里面包含了 Attention 和 MLP,因为篇幅所限,我们这里主要对MLP进行分析。对于 Attention 则简单研究一下其行切分机制,毕竟我们想了解的是如何进行模型并行,而非深入理解Transformer。

Megatron的并行MLP包含了两个线性层,第一个线性层实现了 hidden size 到 4 x hidden size 的转换,第二个线性层实现了 4 x hidden size 回到 hidden size。具体 MLP 的逻辑如下:

图:具有模型并行性的 MLP。f和g表示和通信切块相关的操作,其是共轭的。f 的前向传播是一个identity运算符,而后向传播是一个all-reduce,g 的前向传播是 all-reduce,后向传播是一个identity运算符。这里的 f 来自 ColumnParallelLinear,g 来自 RowParallelLinear。即,MLP 就是把 ColumnParallelLinear 和 RowParallelLinear 结合起来。
于是,这里焦点问题就是:如何把这两种线性层切开到不同的GPU卡之上?参见前文,这里采用了第二种方案,
另一个选项是沿列拆分A,得到 \(A=[A_1,A_2]\)。该分区允许GeLU非线性独立应用于每个分区GEMM的输出:
\[\begin{bmatrix}Y_1& Y_2\end{bmatrix}= \begin{bmatrix}GeLU(XA_1),GeLU(XA_2)\end{bmatrix}\]这个方法更好,因为它删除了同步点,直接把两个 GeLU 的输出拼接在一起就行。因此,我们以这种列并行方式划分第一个GEMM,并沿其行分割第二个GEMM,以便它直接获取GeLU层的输出,而不需要任何其他通信(比如 all-reduce 就不需要了),如图所示。
我们再深入分析一下为何选择这个方案。
按照常规逻辑,MLP 的前向传播应该分为两个阶段,分别对应了下面图之中的两行,

但是每个split会导致两次额外的通信(前向传播和后向传播各一次,下面只给出了前向传播)。因为对于第二行来说,其输入Y其实本质是 XA1,XA2并行的,所以为了降低通信量,我们可以把数据通信延后或者干脆取消通信,就是把第一行最后的 all_gather 和第二行最初的 split 省略掉,这其实就是数学上的传递性和结合律(局部和之和为全局和)。于是我们就得到了论文之中的第二种方案。

结合代码,就是:
j
我们首先看看命名规范,后文使用如下:
megatron/model/transformer.py 之中有 ParallelMLP 定义如下:
dropout操作是在上面ParallelTransformerLayer的forward之中进行。
所以,MLP大致如图,这里A,B是各自的权重矩阵:

也就是对应论文之中这个图形。

代码如下。
class ParallelMLP(MegatronModule): """MLP. MLP will take the input with h hidden state, project it to 4*h hidden dimension, perform nonlinear transformation, and project the state back into h hidden dimension. """ def __init__(self, init_method, output_layer_init_method): super(ParallelMLP, self).__init__() args = get_args() # Project to 4h. self.dense_h_to_4h = mpu.ColumnParallelLinear( # 列切分 args.hidden_size, args.ffn_hidden_size, gather_output=False, # 这里是false,采用第二种方案 init_method=init_method, skip_bias_add=True) self.bias_gelu_fusion = args.bias_gelu_fusion # gelu self.activation_func = F.gelu if args.openai_gelu: self.activation_func = openai_gelu elif args.onnx_safe: self.activation_func = erf_gelu # Project back to h. self.dense_4h_to_h = mpu.RowParallelLinear( # 行切分 args.ffn_hidden_size, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, skip_bias_add=True)这里分别调用了 ColumnParallelLinear 完成了 H 到 4H 的转换,RowParallelLinear 完成了 4H 到 H 的转换。
def forward(self, hidden_states): # [s, b, 4hp] intermediate_parallel, bias_parallel = self.dense_h_to_4h(hidden_states) # 纵向切分 if self.bias_gelu_fusion: intermediate_parallel = \ bias_gelu_impl(intermediate_parallel, bias_parallel) else: intermediate_parallel = \ self.activation_func(intermediate_parallel + bias_parallel) # [s, b, h] output, output_bias = self.dense_4h_to_h(intermediate_parallel) # 横向切分 return output, output_bias我们接下来分别介绍 ColumnParallelLinear 和 RowParallelLinear。ColumnParallelLinear 分别可以独立使用或者作为 ParallelMLP 的前半段,RowParallelLinear 也可以独立使用或者作为 ParallelMLP 的后半段。
ColumnParallelLinear 就是按列进行切分,也就是纵刀流。注意,这里说的是对权重进行列切分。就是:
具体切分如下:

因为 Python 语言特性,这里有用的只是注释,从注释中可以看出来,对于 $ Y = XA + b \(,A 被以如下方式进行并行化:\) A = [A_1, ..., A_p] $
class ColumnParallelLinear(torch.nn.Module): """Linear layer with column parallelism. The linear layer is defined as Y = XA + b. A is parallelized along its second dimension as A = [A_1, ..., A_p]. Arguments: input_size: first dimension of matrix A. output_size: second dimension of matrix A. bias: If true, add bias gather_output: If true, call all-gether on output and make Y avaiable to all GPUs, otherwise, every GPU will have its output which is Y_i = XA_i init_method: method to initialize weights. Note that bias is always set to zero. stride: For the strided linear layers. keep_master_weight_for_test: This was added for testing and should be set to False. It returns the master weights used for initialization. skip_bias_add: This was added to enable performance optimations where bias can be fused with other elementwise operations. we skip adding bias but instead return it. """初始化代码之中主要是用切分的信息来初始化权重。
def __init__(self, input_size, output_size, bias=True, gather_output=True, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, skip_bias_add=False): super(ColumnParallelLinear, self).__init__() # Keep input parameters self.input_size = input_size self.output_size = output_size self.gather_output = gather_output # Divide the weight matrix along the last dimension. world_size = get_tensor_model_parallel_world_size() # 获得本tensor并行组的world size self.output_size_per_partition = divide(output_size, world_size) # 获得本子模型应输出size self.skip_bias_add = skip_bias_add # Parameters. # Note: torch.nn.functional.linear performs XA^T + b and as a result # we allocate the transpose. # Initialize weight. args = get_args() if args.use_cpu_initialization: # 用切分的size初始化权重 self.weight = Parameter(torch.empty(self.output_size_per_partition, self.input_size, dtype=args.params_dtype)) self.master_weight = _initialize_affine_weight_cpu( # 初始化权重 self.weight, self.output_size, self.input_size, self.output_size_per_partition, 0, init_method, stride=stride, return_master_weight=keep_master_weight_for_test) else: # 用切分的size初始化权重 self.weight = Parameter(torch.empty( self.output_size_per_partition, self.input_size, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, # 初始化权重 partition_dim=0, stride=stride) if bias: if args.use_cpu_initialization: # 用切分的size初始化权重 self.bias = Parameter(torch.empty( self.output_size_per_partition, dtype=args.params_dtype)) else: # 用切分的size初始化权重 self.bias = Parameter(torch.empty( self.output_size_per_partition, device=torch.cuda.current_device(), dtype=args.params_dtype)) set_tensor_model_parallel_attributes(self.bias, True, 0, stride) # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() else: self.register_parameter('bias', None) self.async_tensor_model_parallel_allreduce = ( not args.no_async_tensor_model_parallel_allreduce and world_size > 1)self.output_size_per_partition = divide(output_size, world_size) 这里有一个分割size操作,得到每个子模型应该拥有的权重大小。
def ensure_divisibility(numerator, denominator): """Ensure that numerator is divisible by the denominator.""" assert numerator % denominator == 0, '{} is not divisible by {}'.format( numerator, denominator)def divide(numerator, denominator): """Ensure that numerator is divisible by the denominator and return the division value.""" ensure_divisibility(numerator, denominator) return numerator // denominator以下代码实现了初始化权重。
def _initialize_affine_weight_gpu(weight, init_method, partition_dim, stride=1): """Initialize affine weight for model parallel on GPU.""" set_tensor_model_parallel_attributes(tensor=weight, is_parallel=True, dim=partition_dim, stride=stride) with get_cuda_rng_tracker().fork(): init_method(weight)def _initialize_affine_weight_cpu(weight, output_size, input_size, per_partition_size, partition_dim, init_method, stride=1, return_master_weight=False): """Initialize affine weight for model parallel. Build the master weight on all processes and scatter the relevant chunk.""" set_tensor_model_parallel_attributes(tensor=weight, is_parallel=True, dim=partition_dim, stride=stride) # Initialize master weight master_weight = torch.empty(output_size, input_size, dtype=torch.float, requires_grad=False) init_method(master_weight) args = get_args() master_weight = master_weight.to(dtype=args.params_dtype) # Split and copy per_partition_per_stride_size = divide(per_partition_size, stride) weight_list = torch.split(master_weight, per_partition_per_stride_size, dim=partition_dim) rank = get_tensor_model_parallel_rank() world_size = get_tensor_model_parallel_world_size() my_weight_list = weight_list[rank::world_size] with torch.no_grad(): torch.cat(my_weight_list, dim=partition_dim, out=weight) if return_master_weight: return master_weight return None为了更好的分析,我们引入下图(来自参考1),这个图对应了 ColumnParallelLinear 类的前向传播和后向传播过程。这里的 f 和 g 操作其实是从代码之中抽象出来的,可以理解为 f 是对输入的处理,g 则是处理之后得到最终输出。此处对应了论文中描述的粗体字:
Figure 3. Blocks of Transformer with Model Parallelism. f and g are conjugate. f is an identity operator in the forward pass and all reduce in the backward pass while g is an all reduce in the forward pass and identity in the backward pass.

图片来自 GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism。
我们针对上图,梳理一下逻辑。
我们一步一步细化。
首先,总体语义为:Y = XA + b。
其次,前向传播时候的逻辑如下:
再次,我们使用operator来细化一下:
我们把这些逻辑点在上图上用红色方框标示,输入 X 先经过 f 来处理,输出 Y 是 g 整合之后的结果。

我们接下来看看后向传播,对于上图来说,后向传播是从上至下,梯度先经过 g,最后被 f 处理。
反向传播的逻辑如下:
所以我们在图上用蓝色圆角矩形标示出来后向传播对应的算子。

我们接下来结合代码来分析。
ColumnParallelLinear 的 forward 代码之中,主要是实施了 f 和 g 的forward操作,同时把 f 和 g 的backward 操作搭建起来,具体如下:
gather_output为True,则在前向传播时候把 \(Y_i\) 做all-gather,因为反向传播时需要把完整梯度scatter到对应GPU之上,所以要搭建对于的split操作。MLP实现之中,此处设置为 False,这样每个GPU输出的是自己partition 的 4h/p,直接传送给下一个线性层。def forward(self, input_): # 如果选择忽略bias,就会设置为None,后续就不用处理了 bias = self.bias if not self.skip_bias_add else None # 下面主要是图中的 f 操作 if self.async_tensor_model_parallel_allreduce: # 建立反向传播时候的异步all-reduce input_shape = input_.shape input_ = input_.view(input_shape[0] * input_shape[1],input_shape[2]) # Maxtrix multiply with asynchronouse all-reduce execution output_parallel = ColumnParallelLinearWithAsyncAllreduce.apply( input_, self.weight, bias) output_parallel = output_parallel.view( input_shape[0], input_shape[1], output_parallel.shape[1]) else: # Set up backprop all-reduce.、 # 建立反向传播all-reduce,就是图中f的backward input_parallel = copy_to_tensor_model_parallel_region(input_) # Matrix multiply. output_parallel = F.linear(input_parallel, self.weight, bias) # 矩阵乘法操作 # 下面就是图中的 g 操作 if self.gather_output: # 是否需要聚合操作 # All-gather across the partitions. # 聚合输出,就是图中g的forward output = gather_from_tensor_model_parallel_region(output_parallel) # else: output = output_parallel output_bias = self.bias if self.skip_bias_add else None # 如果不忽略bias,还得传出去 return output, output_biasF 操作是对输入进行初步处理,具体是:

这里我们主要分析 copy_to_tensor_model_parallel_region,其做了前向copy操作,同时构建了后向 all-reduce。
def copy_to_tensor_model_parallel_region(input_): return _CopyToModelParallelRegion.apply(input_)我们还是需要看看 _CopyToModelParallelRegion。可以看到,其 forward 就是简单的把输入转移到输出,就是对应了前向复制identity。
class _CopyToModelParallelRegion(torch.autograd.Function): """Pass the input to the model parallel region.""" @staticmethod def symbolic(graph, input_): return input_ @staticmethod def forward(ctx, input_): return input_ # 简单的把输入转移到输出,就是对应了前向复制identity @staticmethod def backward(ctx, grad_output): return _reduce(grad_output) # 反向传播时候,输入是多个GPU上的梯度整体,通过all-reduce合并对应的后向传播就使用了All-reduce,反向传播时候,输入是多个GPU上的梯度整体,通过all-reduce合并。
def _reduce(input_): """All-reduce the input tensor across model parallel group.""" # Bypass the function if we are using only 1 GPU. if get_tensor_model_parallel_world_size()==1: return input_ # All-reduce. torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group()) return input_ColumnParallelLinearWithAsyncAllreduce 这里把同步之中的乘法操作也放置进来。
class ColumnParallelLinearWithAsyncAllreduce(torch.autograd.Function): """ Column-parallel linear layer execution with asynchronous all-reduce execution in backprop. """ @staticmethod def forward(ctx, input, weight, bias): ctx.save_for_backward(input, weight) ctx.use_bias = bias is not None output = torch.matmul(input, weight.t()) # 同步时候的乘法也在这里了 if bias is not None: output = output + bias return output @staticmethod def backward(ctx, grad_output): input, weight = ctx.saved_tensors use_bias = ctx.use_bias grad_input = grad_output.matmul(weight) # Asyncronous all-reduce handle = torch.distributed.all_reduce( # 反向传播操作 grad_input, group=get_tensor_model_parallel_group(), async_op=True) # Delay the start of weight gradient computation shortly (3us) to have # all-reduce scheduled first and have GPU resources allocated _ = torch.empty(1, device=grad_output.device) + 1 grad_weight = grad_output.t().matmul(input) grad_bias = grad_output.sum(dim=0) if use_bias else None handle.wait() return grad_input, grad_weight, grad_bias以下对应了图之中的 g 操作。G操作是最终生成输出Y,逻辑是:

def gather_from_tensor_model_parallel_region(input_): return _GatherFromModelParallelRegion.apply(input_)具体代码如下:
class _GatherFromModelParallelRegion(torch.autograd.Function): """Gather the input from model parallel region and concatinate.""" @staticmethod def symbolic(graph, input_): return _gather(input_) @staticmethod def forward(ctx, input_): return _gather(input_) @staticmethod def backward(ctx, grad_output): return _split(grad_output)我们接下来看看上面用到的一些基础函数。
_gather 是沿着最后一个维度进行拼接。
def _gather(input_): """Gather tensors and concatinate along the last dimension.""" world_size = get_tensor_model_parallel_world_size() # Bypass the function if we are using only 1 GPU. if world_size==1: return input_ # Size and dimension. last_dim = input_.dim() - 1 rank = get_tensor_model_parallel_rank() # 获得本worker在tensor并行之中的rank tensor_list = [torch.empty_like(input_) for _ in range(world_size)] tensor_list[rank] = input_ # 在本 tensor 进程组之间进行 all-gather操作 torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) # Note: torch.cat already creates a contiguous tensor. output = torch.cat(tensor_list, dim=last_dim).contiguous() return output_split 完成了张量切分操作。
def split_tensor_along_last_dim(tensor, num_partitions, contiguous_split_chunks=False): """Split a tensor along its last dimension. Arguments: tensor: input tensor. num_partitions: number of partitions to split the tensor contiguous_split_chunks: If True, make each chunk contiguous in memory. """ # Get the size and dimension. last_dim = tensor.dim() - 1 last_dim_size = divide(tensor.size()[last_dim], num_partitions) # 得到每个切分的size # Split. tensor_list = torch.split(tensor, last_dim_size, dim=last_dim) # 对张量进行切分 # Note: torch.split does not create contiguous tensors by default. if contiguous_split_chunks: return tuple(chunk.contiguous() for chunk in tensor_list) return tensor_listdef _split(input_): """Split the tensor along its last dimension and keep the corresponding slice.""" world_size = get_tensor_model_parallel_world_size() # 获取本tensor进程组的world size # Bypass the function if we are using only 1 GPU. if world_size==1: return input_ # Split along last dimension. input_list = split_tensor_along_last_dim(input_, world_size) # Note: torch.split does not create contiguous tensors by default. rank = get_tensor_model_parallel_rank() # 获取自己的rank output = input_list[rank].contiguous() # 获取切分后,自己对应的rank return output其中,get_tensor_model_parallel_rank 作用是获取本进程在tensor并行组的rank。
def get_tensor_model_parallel_rank(): """Return my rank for the tensor model parallel group.""" global _MPU_TENSOR_MODEL_PARALLEL_RANK if _MPU_TENSOR_MODEL_PARALLEL_RANK is not None: return _MPU_TENSOR_MODEL_PARALLEL_RANK return torch.distributed.get_rank(group=get_tensor_model_parallel_group())RowParallelLinear 这里是按照行进行切分,就是横刀流,注意这里是对权重A实施行切分。比如公式为 Y = XA,X是输入,A是权重,Y是输出,行切分就是针对A的第一个维度进行切分,这里 \(X_1\) 最后一个维度等于 \(A_1\) 第一个维度。
具体如下:

定义之中只有注释有用,可以看出来如何切分。
class RowParallelLinear(torch.nn.Module): """Linear layer with row parallelism. The linear layer is defined as Y = XA + b. A is parallelized along its first dimension and X along its second dimension as: - - | A_1 | | . | A = | . | X = [X_1, ..., X_p] | . | | A_p | - - Arguments: input_size: first dimension of matrix A. output_size: second dimension of matrix A. bias: If true, add bias. Note that bias is not parallelized. input_is_parallel: If true, we assume that the input is already split across the GPUs and we do not split again. init_method: method to initialize weights. Note that bias is always set to zero. stride: For the strided linear layers. keep_master_weight_for_test: This was added for testing and should be set to False. It returns the master weights used for initialization. skip_bias_add: This was added to enable performance optimization where bias can be fused with other elementwise operations. We skip adding bias but instead return it. """和列切分类似,初始化之中主要是获取每个权重分区的大小,然后据此切分权重。
def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, skip_bias_add=False): super(RowParallelLinear, self).__init__() # Keep input parameters self.input_size = input_size self.output_size = output_size self.input_is_parallel = input_is_parallel # Divide the weight matrix along the last dimension. world_size = get_tensor_model_parallel_world_size() self.input_size_per_partition = divide(input_size, world_size) # 获取每个权重分区的大小 self.skip_bias_add = skip_bias_add # Parameters. # Note: torch.nn.functional.linear performs XA^T + b and as a result # we allocate the transpose. # Initialize weight. args = get_args() if args.use_cpu_initialization: self.weight = Parameter(torch.empty(self.output_size, self.input_size_per_partition, dtype=args.params_dtype)) # 切分权重 self.master_weight = _initialize_affine_weight_cpu( self.weight, self.output_size, self.input_size, self.input_size_per_partition, 1, init_method, stride=stride, return_master_weight=keep_master_weight_for_test) else: self.weight = Parameter(torch.empty( self.output_size, self.input_size_per_partition, device=torch.cuda.current_device(), dtype=args.params_dtype)) # 切分权重 _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=1, stride=stride) if bias: if args.use_cpu_initialization: self.bias = Parameter(torch.empty(self.output_size, dtype=args.params_dtype)) else: self.bias = Parameter(torch.empty( self.output_size, device=torch.cuda.current_device(), dtype=args.params_dtype)) # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() else: self.register_parameter('bias', None)为了更好的分析,我们引入下图(来自参考1),这个图对应了 RowParallelLinear 类的前向传播和后向传播过程。这里的 f 和 g 操作其实是从代码之中抽象出来的,可以理解为 f 是对输入的处理,g 则是处理之后得到最终输出。

我们针对上图,梳理一下逻辑。
我们一步一步细化。
首先,总体语义为:Y = XA + b。
其次,前向传播时候的逻辑如下:
再次,我们使用operator来细化一下:
我们把这些逻辑点在上图上用红色方框标示,输入 X 先经过 f 来处理,输出 Y 是 g 整合之后的结果。

我们接下来看看后向传播,对于上图来说,后向传播是从上至下,梯度先经过 g,最后被 f 处理。
反向传播的逻辑如下:
所以我们在图上用蓝色圆角矩形标示出来后向传播对应的算子。

我们接下来看看代码如何实现。
RowParallelLinear 的 forward 代码之中,主要是实施了 f 和 g 的forward操作,同时把 f 和 g 的backward 操作搭建起来,具体如下:
def forward(self, input_): # 这里,输入的张量已经被分割到每个GPU,输出张量是all-reduce之后的整体 # Set up backprop all-reduce. if self.input_is_parallel: # 是否已经是split的输入 # Transformer's MLP 到达这里,因为已经split,所以直接就接了输入,不会scatter input_parallel = input_ else: # 独立 row parallel 线性层到这里,会进行前向切分和后向拼接 input_parallel = scatter_to_tensor_model_parallel_region(input_) # Matrix multiply. # 比如 X_i 和 A_i 进行乘法操作 output_parallel = F.linear(input_parallel, self.weight) # All-reduce across all the partitions. # 进行前向all-reduce操作,这样每个GPU之上都是完整的最新结果,同时搭建了后向的identity操作。 output_ = reduce_from_tensor_model_parallel_region(output_parallel) if not self.skip_bias_add: # 加上bias output = output_ + self.bias if self.bias is not None else output_ output_bias = None else: output = output_ output_bias = self.bias return output, output_biasscatter_to_tensor_model_parallel_region 对应了f操作,其作用是:

代码为:
def scatter_to_tensor_model_parallel_region(input_): return _ScatterToModelParallelRegion.apply(input_)具体 _ScatterToModelParallelRegion 完成了实际业务,具体 _split, _gather 操作在前面都介绍过。
class _ScatterToModelParallelRegion(torch.autograd.Function): """Split the input and keep only the corresponding chuck to the rank.""" @staticmethod def symbolic(graph, input_): return _split(input_) @staticmethod def forward(ctx, input_): return _split(input_) @staticmethod def backward(ctx, grad_output): return _gather(grad_output)reduce_from_tensor_model_parallel_region 对应了 g 操作,作用是:
前向操作是 all-reduce之后得到最终输出.
反向操作则直接拷贝操作。

代码为:
def reduce_from_tensor_model_parallel_region(input_): return _ReduceFromModelParallelRegion.apply(input_)具体业务如下:
class _ReduceFromModelParallelRegion(torch.autograd.Function): """All-reduce the input from the model parallel region.""" @staticmethod def symbolic(graph, input_): return _reduce(input_) @staticmethod def forward(ctx, input_): return _reduce(input_) # 前面有介绍 @staticmethod def backward(ctx, grad_output): return grad_output #就是indentity 操作,直接把输入拷贝到两个GPU之上我们接下来看看 embedding。为了让内存做到均衡配置,对embedding也会按照vocab维度来做shard操作,最终把分区放到多个GPU之上。这样每个卡上都有嵌入表的一部分。
class VocabParallelEmbedding(torch.nn.Module): """Embedding parallelized in the vocabulary dimension. This is mainly adapted from torch.nn.Embedding and all the default values are kept. Arguments: num_embeddings: vocabulary size. embedding_dim: size of hidden state. init_method: method to initialize weights. """ def __init__(self, num_embeddings, embedding_dim, init_method=init.xavier_normal_): super(VocabParallelEmbedding, self).__init__() # Keep the input dimensions. self.num_embeddings = num_embeddings self.embedding_dim = embedding_dim # Set the detauls for compatibility. self.padding_idx = None self.max_norm = None self.norm_type = 2. self.scale_grad_by_freq = False self.sparse = False self._weight = None self.tensor_model_parallel_size = get_tensor_model_parallel_world_size() # Divide the weight matrix along the vocaburaly dimension. self.vocab_start_index, self.vocab_end_index = \ # 得到分区的起始,终止位置 VocabUtility.vocab_range_from_global_vocab_size( self.num_embeddings, get_tensor_model_parallel_rank(), self.tensor_model_parallel_size) self.num_embeddings_per_partition = self.vocab_end_index - \ # 得到分区内嵌入数目 self.vocab_start_index # Allocate weights and initialize. args = get_args() if args.use_cpu_initialization: self.weight = Parameter(torch.empty( self.num_embeddings_per_partition, self.embedding_dim, dtype=args.params_dtype)) _initialize_affine_weight_cpu( # 对权重进行分区 self.weight, self.num_embeddings, self.embedding_dim, self.num_embeddings_per_partition, 0, init_method) else: self.weight = Parameter(torch.empty( self.num_embeddings_per_partition, self.embedding_dim, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, # 对权重进行分区 partition_dim=0, stride=1)因为每一个GPU只是获得了总体嵌入的一部分,所以对于每个worker来说,可能有一个输入找不到嵌入,因此需要对embedding最终输出做一个 all-reduce操作,这样可以得到完整embedding。
def forward(self, input_): if self.tensor_model_parallel_size > 1: # Build the mask. # input_mask 意思是单词不在本worker的 embedding 分区范围内,所以设置为0 input_mask = (input_ < self.vocab_start_index) | \ (input_ >= self.vocab_end_index) # Mask the input. masked_input = input_.clone() - self.vocab_start_index masked_input[input_mask] = 0 else: masked_input = input_ # Get the embeddings. output_parallel = F.embedding(masked_input, self.weight, self.padding_idx, self.max_norm, self.norm_type, self.scale_grad_by_freq, self.sparse) # Mask the output embedding. if self.tensor_model_parallel_size > 1: output_parallel[input_mask, :] = 0.0 # Reduce across all the model parallel GPUs. output = reduce_from_tensor_model_parallel_region(output_parallel) return output我们总结一下MLP的并行实现,具体如下图,其中逻辑如下:

论文之中提到了共轭函数。
f and g are conjugate. f is an identity operator in the forward pass and all reduce in the backward pass while g is an all reduce in the forward pass and identity in the backward pass.
我们前面代码之中也有使用到,我们整理出来如下,其中两两互为共轭函数。
其实,就是MLP之中的 f,g 操作,这两个是共轭函数。
类似,gather_from_tensor_model_parallel_region 是前向操作 all-gather,后向操作 scatter,这和scatter_to_tensor_model_parallel_region 也是共轭函数。
这些函数代码具体如下:
def copy_to_tensor_model_parallel_region(input_): return _CopyToModelParallelRegion.apply(input_)def reduce_from_tensor_model_parallel_region(input_): return _ReduceFromModelParallelRegion.apply(input_)def scatter_to_tensor_model_parallel_region(input_): return _ScatterToModelParallelRegion.apply(input_)def gather_from_tensor_model_parallel_region(input_): return _GatherFromModelParallelRegion.apply(input_)至此,我们已经完成了对模型并行实现的分析,下一篇我们看看在源码之中如何设定各种并行配置。
https://developer.nvidia.com/gtc/2020/slides/s21496-megatron-lm-training-multi-billion-parameter-language-models-using-model-parallelism.pdf
[细读经典]Megatron论文和代码详细分析(2)
[细读经典]Megatron论文和代码详细分析(1)
Megatron-LM源码阅读(一)
Megatron-LM源码阅读(二)
megatron学习总结
GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism
大规模训练之 transformer 中的张量模型并行