diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 0435199ecaa48c8a24d869a176cea5c6d52647f3..4082bf2ec5585a403ee319411ee7847437f881ae 100755 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -61,6 +61,7 @@ message MpConfig { message PpConfig { optional bool dp_comm_overlap = 1 [ default = false ]; optional bool delay_scale_loss = 2 [ default = false ]; + optional bool enable_timer = 3 [ default = false ]; } message HybridConfig { diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 2a7dd4d0bb71038fd64c346c22a91840af7863e5..5ff01d35ec5ffb005d1f998468590fafffb223a8 100755 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -15,6 +15,7 @@ import paddle from paddle import framework from ..meta_optimizers.dygraph_optimizer import HybridParallelOptimizer +from ..utils import timer_helper as timer from ..utils.hybrid_parallel_util import ( broadcast_dp_parameters, broadcast_mp_parameters, @@ -73,13 +74,24 @@ class PipelineParallel(MetaParallelBase): self._dp_comm_overlap = self._strategy.hybrid_configs[ "pp_configs" ].dp_comm_overlap + self._enable_timer = self._strategy.hybrid_configs[ + "pp_configs" + ].enable_timer self._dp_comm_buffers = [] if self._dp_comm_overlap: assert self.use_data_parallel and self.num_stages > 1 + if self._enable_timer: + if not timer.is_timer_initialized(): + timer.set_timers() + self.timers = timer.get_timers() + p2p.initialize_p2p_groups( - hcg, self._using_cache, self._enable_partial_send_recv + hcg, + self._using_cache, + self._enable_partial_send_recv, + self._enable_timer, ) self.global_rank = self._hcg.get_global_rank() @@ -153,6 +165,12 @@ class PipelineParallel(MetaParallelBase): for param in parameters: param._register_backward_hook(self.bw_hook_func(buffer, param)) + def timer_printer(self): + if not self._enable_timer: + return + all_flag_names = self.timers.timers.keys() + self.timers.log(all_flag_names) + def forward_backward_pipeline(self, data, scaler=None): # use the 1f1b scheduling strategy. # this strategy is inspired by: @@ -236,9 +254,17 @@ class PipelineParallel(MetaParallelBase): for buffer in self._dp_comm_buffers: buffer.scale_and_split_grads() + if self._enable_timer: + self.timers("allreduce_shared_weight_gradients").start() self._layers.allreduce_shared_weight_gradients() + if self._enable_timer: + self.timers("allreduce_shared_weight_gradients").stop() + self.timers("broadcast_final_loss").start() with paddle.amp.auto_cast(enable=False): train_loss = self._broadcast_final_loss() + if self._enable_timer: + self.timers("broadcast_final_loss").stop() + self.timer_printer() return train_loss def _prepare_training(self, data, optimizer, lr_scheduler): @@ -334,6 +360,8 @@ class PipelineParallel(MetaParallelBase): return self.train_loss def _forward_step(self, input_tensor, chunk_id=None): + if self._enable_timer: + self.timers("forward_step").start() if self.is_pipeline_first_stage(): input_tensor = self._load_micro_batch(self.micro_batch_id) @@ -365,9 +393,13 @@ class PipelineParallel(MetaParallelBase): # Only increase micro batch id at virtual first/last pp stage. # The micro batch id is used to load data, therefore, only increase it when load data. self.micro_batch_id += 1 + if self._enable_timer: + self.timers("forward_step").stop() return output_tensor def _backward_step(self, input_tensor, output_tensor, output_tensor_grad): + if self._enable_timer: + self.timers("backward_step").start() with paddle.amp.auto_cast(enable=False): if self.is_pipeline_last_stage(): assert output_tensor_grad is None @@ -397,6 +429,8 @@ class PipelineParallel(MetaParallelBase): ) else: input_tensor_grad = input_tensor.grad + if self._enable_timer: + self.timers("backward_step").stop() return input_tensor_grad def _check_data_vaild(self, data): @@ -807,16 +841,25 @@ class PipelineParallelWithInterleave(PipelineParallel): for buffer in self._dp_comm_buffers: buffer.scale_and_split_grads() + if self._enable_timer: + self.timers("allreduce_shared_weight_gradients").start() self._layers.allreduce_shared_weight_gradients() + if self._enable_timer: + self.timers("allreduce_shared_weight_gradients").stop() if compute_loss: # return loss if compute loss + if self._enable_timer: + self.timers("broadcast_final_loss").start() with paddle.amp.auto_cast(enable=False): train_loss = self._broadcast_final_loss() + if self._enable_timer: + self.timers("broadcast_final_loss").stop() else: # else just return all intermediate output tensor for all micro steps train_loss = self.output_tensors + self.timer_printer() return train_loss def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): diff --git a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py index fa4b937ba56b7240ee5b0b3f060022d5ae3b9c85..1da86c83640b5305e001825e391d56582b52f9f4 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py +++ b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py @@ -19,12 +19,14 @@ import numpy as np import paddle from paddle import framework +from ...utils import timer_helper as timer from ...utils.log_util import logger from .utils import number_2_dtype, paddle_2_number _hcg = None _use_cache = False _enable_partial_send_recv = True +_timers = None _xpu_comm_group_started = False @@ -50,11 +52,15 @@ def _xpu_comm_group_end(): _xpu_comm_group_started = False -def initialize_p2p_groups(hcg, use_cache=True, enable_partial_send_recv=True): - global _hcg, _use_cache, _enable_partial_send_recv +def initialize_p2p_groups( + hcg, use_cache=True, enable_partial_send_recv=True, enable_timer=False +): + global _hcg, _use_cache, _enable_partial_send_recv, _timers _hcg = hcg _use_cache = use_cache _enable_partial_send_recv = enable_partial_send_recv + if enable_timer: + _timers = timer.get_timers() ( send_next_group, send_prev_group, @@ -683,6 +689,9 @@ def _p2p_helper( def recv_forward(pp_first_stage, sync_recv=True): + global _timers + if _timers is not None: + _timers("recv_forward").start() if pp_first_stage: input_tensor = None else: @@ -697,10 +706,15 @@ def recv_forward(pp_first_stage, sync_recv=True): recv_next=False, sync_recv=sync_recv, ) + if _timers is not None: + _timers("recv_forward").stop() return input_tensor def recv_backward(pp_last_stage, sync_recv=True): + global _timers + if _timers is not None: + _timers("recv_backward").start() if pp_last_stage: output_tensor_grad = None else: @@ -711,10 +725,15 @@ def recv_backward(pp_last_stage, sync_recv=True): recv_next=True, sync_recv=sync_recv, ) + if _timers is not None: + _timers("recv_backward").stop() return output_tensor_grad def send_forward(output_tensor, pp_last_stage): + global _timers + if _timers is not None: + _timers("send_forward").start() if not pp_last_stage: if not _send_recv_meta.has_send_meta: _send_recv_meta.set_send_message(output_tensor) @@ -727,9 +746,14 @@ def send_forward(output_tensor, pp_last_stage): recv_prev=False, recv_next=False, ) + if _timers is not None: + _timers("send_forward").stop() def send_backward(input_tensor_grad, pp_first_stage): + global _timers + if _timers is not None: + _timers("send_backward").start() if not pp_first_stage: _p2p_helper( tensor_send_next=None, @@ -737,9 +761,14 @@ def send_backward(input_tensor_grad, pp_first_stage): recv_prev=False, recv_next=False, ) + if _timers is not None: + _timers("send_backward").stop() def send_forward_recv_backward(output_tensor, pp_last_stage): + global _timers + if _timers is not None: + _timers("send_forward_recv_backward").start() if pp_last_stage: output_tensor_grad = None else: @@ -749,10 +778,15 @@ def send_forward_recv_backward(output_tensor, pp_last_stage): recv_prev=False, recv_next=True, ) + if _timers is not None: + _timers("send_forward_recv_backward").stop() return output_tensor_grad def send_backward_recv_forward(input_tensor_grad, pp_first_stage): + global _timers + if _timers is not None: + _timers("send_backward_recv_forward").start() if pp_first_stage: input_tensor = None else: @@ -762,6 +796,8 @@ def send_backward_recv_forward(input_tensor_grad, pp_first_stage): recv_prev=True, recv_next=False, ) + if _timers is not None: + _timers("send_backward_recv_forward").stop() return input_tensor @@ -769,6 +805,9 @@ def send_forward_backward_recv_forward_backward( output_tensor, input_tensor_grad, recv_prev, recv_next ): # always have to send dytpe info to downstream + global _timers + if _timers is not None: + _timers("send_forward_backward_recv_forward_backward").start() if not _send_recv_meta.has_send_meta: _send_recv_meta.set_send_message(output_tensor) _send_recv_meta.send_meta(output_tensor, _hcg.send_next_group) @@ -783,11 +822,16 @@ def send_forward_backward_recv_forward_backward( recv_next=recv_next, sync_recv=False, ) + if _timers is not None: + _timers("send_forward_backward_recv_forward_backward").stop() return input_tensor, output_tensor_grad def send_forward_recv_forward(output_tensor, recv_prev): # always have to send dytpe info to downstream + global _timers + if _timers is not None: + _timers("send_forward_recv_forward").start() if not _send_recv_meta.has_send_meta: _send_recv_meta.set_send_message(output_tensor) _send_recv_meta.send_meta(output_tensor, _hcg.send_next_group) @@ -803,11 +847,15 @@ def send_forward_recv_forward(output_tensor, recv_prev): recv_next=False, sync_recv=False, ) - + if _timers is not None: + _timers("send_forward_recv_forward").stop() return input_tensor def send_backward_recv_backward(input_tensor_grad, recv_next): + global _timers + if _timers is not None: + _timers("send_backward_recv_backward").start() _, output_tensor_grad = _p2p_helper( tensor_send_next=None, tensor_send_prev=input_tensor_grad, @@ -815,4 +863,6 @@ def send_backward_recv_backward(input_tensor_grad, recv_next): recv_next=recv_next, sync_recv=False, ) + if _timers is not None: + _timers("send_backward_recv_backward").stop() return output_tensor_grad diff --git a/python/paddle/distributed/fleet/utils/timer_helper.py b/python/paddle/distributed/fleet/utils/timer_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..1c0e737f005263dbe1f49be69a242e644c492c67 --- /dev/null +++ b/python/paddle/distributed/fleet/utils/timer_helper.py @@ -0,0 +1,113 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import paddle + +_GLOBAL_TIMERS = None + + +def is_timer_initialized(): + return _GLOBAL_TIMERS is not None + + +def _ensure_var_is_not_initialized(var, name): + """Make sure the input variable is not None.""" + assert var is None, f"{name} has been already initialized." + + +def _ensure_var_is_initialized(var, name): + """Make sure the input variable is not None.""" + assert var is not None, f"{name} is not initialized." + + +def get_timers(): + _ensure_var_is_initialized(_GLOBAL_TIMERS, "timers") + return _GLOBAL_TIMERS + + +def set_timers(): + """Initialize timers.""" + global _GLOBAL_TIMERS + _ensure_var_is_not_initialized(_GLOBAL_TIMERS, "timers") + _GLOBAL_TIMERS = Timers() + + +class _Timer: + """Timer.""" + + def __init__(self, name): + self.name = name + self.elapsed_ = 0.0 + self.started_ = False + self.start_time = time.time() + + def start(self): + """Start the timer.""" + assert not self.started_, "timer has already started" + paddle.device.cuda.synchronize() + self.start_time = time.time() + self.started_ = True + + def stop(self): + """Stop the timers.""" + assert self.started_, "timer is not started." + paddle.device.cuda.synchronize() + self.elapsed_ += time.time() - self.start_time + self.started_ = False + + def reset(self): + """Reset timer.""" + self.elapsed_ = 0.0 + self.started_ = False + + def elapsed(self, reset=True): + """Calculate the elapsed time.""" + started_ = self.started_ + # If the timing in progress, end it first. + if self.started_: + self.stop() + # Get the elapsed time. + elapsed_ = self.elapsed_ + # Reset the elapsed time + if reset: + self.reset() + # If timing was in progress, set it back. + if started_: + self.start() + return elapsed_ + + +class Timers: + """Group of timers.""" + + def __init__(self): + self.timers = {} + + def __call__(self, name): + if name not in self.timers: + self.timers[name] = _Timer(name) + return self.timers[name] + + def log(self, names, normalizer=1.0, reset=True): + """Log a group of timers.""" + assert normalizer > 0.0 + string = "time (ms)" + for name in names: + elapsed_time = ( + self.timers[name].elapsed(reset=reset) * 1000.0 / normalizer + ) + string += f" | {name}: {elapsed_time:.2f}" + print(string, flush=True) diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py index 9f43fc4c9ef8eb46a167998cbf53ea4ea4e2b554..3d7991727ee83599e94d5fef25f09a104953b10a 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py @@ -147,6 +147,9 @@ class TestDistPPTraning(unittest.TestCase): "dp_degree": self.data_parallel_size, "mp_degree": self.model_parallel_size, "pp_degree": self.pipeline_parallel_size, + "pp_configs": { + "enable_timer": True, + }, } strategy.pipeline_configs = { "accumulate_steps": batch_size // micro_batch_size, diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_alexnet.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_alexnet.py index 1c3eac9cec402c5e59bedca9284cf6f93c2ea9f4..052abbeb594ce8e27f9fef07521d9d085f1c1e42 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_alexnet.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_alexnet.py @@ -145,6 +145,7 @@ class TestDistPPDelayScaleLoss(TestDistPPTraning): "pp_degree": self.pipeline_parallel_size, "pp_configs": { "delay_scale_loss": True, + "enable_timer": True, }, } strategy.pipeline_configs = {