未验证 提交 8ffcc7c8 编写于 作者: S ShenLiang 提交者: GitHub

[HybridParallel]Rebuild code for pipeline (#36396)

* add no_sync for parameters sync

* add pipeline for moe
上级 693b1aa1
......@@ -77,26 +77,15 @@ class PipelineParallel(MetaParallelBase):
logger.info("start broadcast dp parameters")
broadcast_dp_parameters(self._layers, self._hcg)
def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None):
assert isinstance(optimizer, HybridParallelOptimizer), (
'optimizer should be HybridParallelOptimizer subclass.')
assert fluid.framework._dygraph_tracer()._has_grad, (
'Please enable the generation of gradients.')
if self.is_first_stage or self.is_last_stage:
assert data is not None, (
"For the first and the last stage, the data must be set.")
else:
data = None
def forward_backward_pipeline(self, data, scaler=None):
# use the 1f1b scheduling strategy.
# this strategy is inspired by:
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/schedules.py
self.optimizer = optimizer
self.lr_scheduler = lr_scheduler
self.scaler = scaler
self.data = data
self._compute_loss = True
self._layers.train()
# store data for train
self.data = data
# store total loss of entire batch
self.total_loss = None
......@@ -104,10 +93,6 @@ class PipelineParallel(MetaParallelBase):
# store data id for micro_batch
self.micro_batch_id = 0
# Next, use the 1f1b scheduling strategy.
# this strategy is inspired by:
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/schedules.py
startup_steps = (self.num_stages - self.stage_id - 1)
startup_steps = min(startup_steps, self.accumulate_steps)
steady_steps = self.accumulate_steps - startup_steps
......@@ -161,11 +146,35 @@ class PipelineParallel(MetaParallelBase):
self._layers.allreduce_shared_weight_gradients()
self.train_loss = self._broadcast_final_loss()
train_loss = self._broadcast_final_loss()
return train_loss
def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None):
assert isinstance(optimizer, HybridParallelOptimizer), (
'optimizer should be HybridParallelOptimizer subclass.')
assert fluid.framework._dygraph_tracer()._has_grad, (
'Please enable the generation of gradients.')
if self.is_first_stage or self.is_last_stage:
assert data is not None, (
"For the first and the last stage, the data must be set.")
else:
data = None
self.optimizer = optimizer
self.lr_scheduler = lr_scheduler
self._layers.train()
# 1f1b for pipeline
train_loss = self.forward_backward_pipeline(data, scaler)
# optimizer
self._optimizer_step()
return self.train_loss
return train_loss
def eval_batch(self, data, compute_loss=False):
self._layers.eval()
......
......@@ -354,9 +354,15 @@ def sync_params_buffers(model,
if not isinstance(param, core.VarBase):
raise TypeError("The data type of '%s' must be Varbase" %
param.name)
# is_distributed param not need to sync when in mp mode
if is_model_parallel and isinstance(param, ParamBase):
if param.is_distributed:
if isinstance(param, ParamBase):
if is_model_parallel and param.is_distributed:
continue
# NOTE(shenliang03): Support situations that do not require synchronization parameters,
# such as moe's expert parameters
if getattr(param, "no_sync", False):
continue
model_vars.append(param.detach())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册