diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 431bc6d7bc389cc79af7418ea4fb536cea25e369..90960973972777d28e8b6977bf78d4418544ba52 100755 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -77,26 +77,15 @@ class PipelineParallel(MetaParallelBase): logger.info("start broadcast dp parameters") broadcast_dp_parameters(self._layers, self._hcg) - def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): - assert isinstance(optimizer, HybridParallelOptimizer), ( - 'optimizer should be HybridParallelOptimizer subclass.') - - assert fluid.framework._dygraph_tracer()._has_grad, ( - 'Please enable the generation of gradients.') - - if self.is_first_stage or self.is_last_stage: - assert data is not None, ( - "For the first and the last stage, the data must be set.") - else: - data = None + def forward_backward_pipeline(self, data, scaler=None): + # use the 1f1b scheduling strategy. + # this strategy is inspired by: + # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/schedules.py - self.optimizer = optimizer - self.lr_scheduler = lr_scheduler self.scaler = scaler - self.data = data - self._compute_loss = True - self._layers.train() + # store data for train + self.data = data # store total loss of entire batch self.total_loss = None @@ -104,10 +93,6 @@ class PipelineParallel(MetaParallelBase): # store data id for micro_batch self.micro_batch_id = 0 - # Next, use the 1f1b scheduling strategy. - # this strategy is inspired by: - # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/schedules.py - startup_steps = (self.num_stages - self.stage_id - 1) startup_steps = min(startup_steps, self.accumulate_steps) steady_steps = self.accumulate_steps - startup_steps @@ -161,11 +146,35 @@ class PipelineParallel(MetaParallelBase): self._layers.allreduce_shared_weight_gradients() - self.train_loss = self._broadcast_final_loss() + train_loss = self._broadcast_final_loss() + + return train_loss + + def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): + assert isinstance(optimizer, HybridParallelOptimizer), ( + 'optimizer should be HybridParallelOptimizer subclass.') + + assert fluid.framework._dygraph_tracer()._has_grad, ( + 'Please enable the generation of gradients.') + + if self.is_first_stage or self.is_last_stage: + assert data is not None, ( + "For the first and the last stage, the data must be set.") + else: + data = None + + self.optimizer = optimizer + self.lr_scheduler = lr_scheduler + + self._layers.train() + + # 1f1b for pipeline + train_loss = self.forward_backward_pipeline(data, scaler) # optimizer self._optimizer_step() - return self.train_loss + + return train_loss def eval_batch(self, data, compute_loss=False): self._layers.eval() diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index e4525a8d17992a1e2a1c3250734f9a8181518a49..7dd8d38aa70efb48d0b37efa14dacf8e683b08f4 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -354,9 +354,15 @@ def sync_params_buffers(model, if not isinstance(param, core.VarBase): raise TypeError("The data type of '%s' must be Varbase" % param.name) + # is_distributed param not need to sync when in mp mode - if is_model_parallel and isinstance(param, ParamBase): - if param.is_distributed: + if isinstance(param, ParamBase): + if is_model_parallel and param.is_distributed: + continue + + # NOTE(shenliang03): Support situations that do not require synchronization parameters, + # such as moe's expert parameters + if getattr(param, "no_sync", False): continue model_vars.append(param.detach())