diff --git a/paddle/fluid/operators/collective/recv_v2_op.cu.cc b/paddle/fluid/operators/collective/recv_v2_op.cu.cc index 96b27a833fba3446534386630eb6a67cd3dfbc5e..7a2a802382f6c9e0aa24c5abf3f3db0c40ec4202 100644 --- a/paddle/fluid/operators/collective/recv_v2_op.cu.cc +++ b/paddle/fluid/operators/collective/recv_v2_op.cu.cc @@ -122,4 +122,5 @@ REGISTER_OP_CUDA_KERNEL(recv_v2, ops::RecvOpV2CUDAKernel, ops::RecvOpV2CUDAKernel, ops::RecvOpV2CUDAKernel, ops::RecvOpV2CUDAKernel, + ops::RecvOpV2CUDAKernel, ops::RecvOpV2CUDAKernel); diff --git a/paddle/fluid/operators/collective/send_v2_op.cu.cc b/paddle/fluid/operators/collective/send_v2_op.cu.cc index add352306fa28f4f86d4fa80d7545a057e5da796..57a3fe2e45d7ee95e42b6f1d6ce7fc802d321ce8 100644 --- a/paddle/fluid/operators/collective/send_v2_op.cu.cc +++ b/paddle/fluid/operators/collective/send_v2_op.cu.cc @@ -109,4 +109,5 @@ REGISTER_OP_CUDA_KERNEL(send_v2, ops::SendOpV2CUDAKernel, ops::SendOpV2CUDAKernel, ops::SendOpV2CUDAKernel, ops::SendOpV2CUDAKernel, + ops::SendOpV2CUDAKernel, ops::SendOpV2CUDAKernel); diff --git a/paddle/fluid/operators/collective/send_v2_op_npu.cc b/paddle/fluid/operators/collective/send_v2_op_npu.cc index 2d7382f3dfd7027638e94d3b4f998c6b1b185ffb..882630467a012f1c2cef3bf9d9edd33a244fc97f 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu.cc @@ -41,7 +41,6 @@ class CSendOpASCENDKernel : public framework::OpKernel { // Use ProcessGroup distributed::ProcessGroup* pg = map->get(ring_id); std::vector in_tensor; - auto x = ctx.Input("X"); in_tensor.push_back(*x); auto task = pg->Send(in_tensor, 1); return; diff --git a/paddle/fluid/platform/device/gpu/nccl_helper.h b/paddle/fluid/platform/device/gpu/nccl_helper.h index 4301ef4bcf126db60784da93a326fb08c108e68f..61ea0fd3cd2939c5256a59b738964152483b6499 100644 --- a/paddle/fluid/platform/device/gpu/nccl_helper.h +++ b/paddle/fluid/platform/device/gpu/nccl_helper.h @@ -50,6 +50,8 @@ inline ncclDataType_t ToNCCLDataType(framework::proto::VarType::Type type) { return ncclInt64; } else if (type == framework::proto::VarType::FP16) { return ncclFloat16; + } else if (type == framework::proto::VarType::INT8) { + return ncclInt8; } else { PADDLE_THROW(platform::errors::Unimplemented( "This datatype in nccl is not supported.")); diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index e33a3dba669abffbc730a158f06957b2c972e2a1..a781f314d3f2037e18ea12b5f69561aad05ed41c 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -226,9 +226,15 @@ def _new_process_group_impl(backend, world_size, group_name, pg_options, - group_id=0): + group_id=0, + src_rank=None, + dst_rank=None): pg = None genv = _get_global_env() + if backend != 'heter': + assert src_rank is None and dst_rank is None, ( + "src_rank and dst_rank " + "can only be set for heter backend.") assert backend in _valid_backend_list, "Unsupported backend: %s." % backend if backend == "gloo": place = core.CPUPlace() @@ -269,7 +275,9 @@ def _new_process_group_impl(backend, gloo_rank=cluster_id, gloo_size=len(cluster_size), with_switch=True, - switch_endpoint=switch_ep) + switch_endpoint=switch_ep, + src_rank=src_rank, + dst_rank=dst_rank) return pg @@ -322,6 +330,16 @@ def barrier(group=None): attrs={'ring_id': ring_id}) +# _custom_gid provides a way for users to +# set the group id, which is usually useful +# to be compatible with the static mode. +_custom_gid = None + + +def _set_custom_gid(gid): + _custom_gid = gid + + def new_group(ranks=None, backend=None): """ @@ -348,9 +366,9 @@ def new_group(ranks=None, backend=None): global _group_map if in_dygraph_mode(): global _default_group_name - gid = _new_ring_id() + gid = _custom_gid if _custom_gid else _new_ring_id() group_name = _default_group_name + str(gid) - if ranks is None or len(ranks) > 1: + if backend != 'heter' and (ranks is None or len(ranks) > 1): global_group = _get_default_group() global_rank = global_group.rank global_ranks = global_group.ranks @@ -362,8 +380,10 @@ def new_group(ranks=None, backend=None): "equal to that of the default global group.") size = len(ranks) ranks = sorted(ranks) - if size > 1 and global_rank in ranks: - rank = ranks.index(global_rank) + if backend == 'heter' or (size > 1 and global_rank in ranks): + rank = 0 if backend == 'heter' else ranks.index(global_rank) + src_rank = ranks[0] if backend == 'heter' else None + dst_rank = ranks[1] if backend == 'heter' else None pg = _new_process_group_impl( backend, _default_store, @@ -371,7 +391,9 @@ def new_group(ranks=None, backend=None): size, group_name, pg_options=None, - group_id=gid) + group_id=gid, + src_rank=src_rank, + dst_rank=dst_rank) else: rank = -1 pg = None diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index c4d42f90615fc1251ff19126bf992b38cc02d11b..90440ff9d0ea9ed7417d732a32fb114eee03409b 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -138,9 +138,16 @@ class ShardingOptimizer(MetaOptimizerBase): if pp_degree > 1: assert strategy.pipeline is True - assert global_world_size == mp_degree * sharding_degree * pp_degree * dp_degree, \ - "global work size [{}], mp_degree [{}], sharding_degree [{}], pp_degree [{}], dp_degree [{}].".format( - global_world_size, mp_degree, sharding_degree, pp_degree, dp_degree) + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None): + assert pp_degree == 2, ("For manually set pipeline, only " + "pp_degree = 2 is supported.") + assert global_world_size == mp_degree * sharding_degree * dp_degree, \ + "global work size [{}], mp_degree [{}], sharding_degree [{}], dp_degree [{}].".format( + global_world_size, mp_degree, sharding_degree, dp_degree) + else: + assert global_world_size == mp_degree * sharding_degree * pp_degree * dp_degree, \ + "global work size [{}], mp_degree [{}], sharding_degree [{}], pp_degree [{}], dp_degree [{}].".format( + global_world_size, mp_degree, sharding_degree, pp_degree, dp_degree) # FIXME (JZ-LIANG) deprecated hybrid_dp if sharding_configs["hybrid_dp"]: @@ -268,7 +275,11 @@ class ShardingOptimizer(MetaOptimizerBase): if self.pp_degree > 1: startup_program = startup_program._pipeline_opt['startup_program'] print("pp_rank:", self.pp_rank) - main_program = program_list[self.pp_rank] + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None): + main_program = program_list[int( + os.getenv("PADDLE_MANUAL_PIPELINE_STAGE"))] + else: + main_program = program_list[self.pp_rank] with open("main_%d" % self.role_maker._worker_index(), 'w') as f: f.writelines(str(main_program)) main_block = main_program.global_block() @@ -633,14 +644,15 @@ class ShardingOptimizer(MetaOptimizerBase): self.pp_group_endpoints[pair[1]], ] pp_rank = 0 if self.pp_rank == pair[0] else 1 - self._collective_helper._init_communicator( - self._startup_program, - self.current_endpoint, - pp_group_endpoints, - pp_rank, - ring_id, - False, - sync=False) + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None) is None: + self._collective_helper._init_communicator( + self._startup_program, + self.current_endpoint, + pp_group_endpoints, + pp_rank, + ring_id, + False, + sync=False) def _init_npu_pipeline_comm(self, startup_block): # NOTE(wangxi): some bug with hccl, must set pp_degree be even number @@ -714,14 +726,15 @@ class ShardingOptimizer(MetaOptimizerBase): def _init_pipeline_comm(self, startup_block): # TODO (JZ-LIANG) to unify pp_rank_ and pp_rank - self._collective_helper._init_communicator( - self._startup_program, - self.current_endpoint, - self.pp_group_endpoints, - self.pp_rank, - self.pp_ring_id, - False, - sync=False) + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None) is None: + self._collective_helper._init_communicator( + self._startup_program, + self.current_endpoint, + self.pp_group_endpoints, + self.pp_rank, + self.pp_ring_id, + False, + sync=False) if core.is_compiled_with_npu(): self._init_npu_pipeline_comm(startup_block) @@ -1387,17 +1400,27 @@ class ShardingOptimizer(MetaOptimizerBase): # NOTE (JZ-LIANG) support outter-pure-dp to scale the throughput in 3D parallelism # e.g. mp-sharding-pp-dp # sharding-hybrid-dp as one senario of outter-pure-dp - assert self.global_word_size == self.mp_degree * self.sharding_degree * self.pp_degree * self.dp_degree, "mp_degree: [{}], sharding_degree: [{}], pp_degree: [{}], dp_degree: [{}]; BUT global nrank: [{}]".format( - self.mp_degree, self.sharding_degree, self.pp_degree, - self.dp_degree, self.global_word_size) + local_pp_degree = self.pp_degree + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None): + assert self.pp_degree == 2, ("For manually set pipeline, only " + "pp_degree = 2 is supported.") + assert self.global_word_size == self.mp_degree * self.sharding_degree * self.dp_degree, \ + "global work size [{}], mp_degree [{}], sharding_degree [{}], dp_degree [{}].".format( + self.global_word_size, self.mp_degree, self.sharding_degree, self.dp_degree) + local_pp_degree = 1 + else: + assert self.global_word_size == self.mp_degree * self.sharding_degree * self.pp_degree * self.dp_degree, "mp_degree: [{}], sharding_degree: [{}], pp_degree: [{}], dp_degree: [{}]; BUT global nrank: [{}]".format( + self.mp_degree, self.sharding_degree, self.pp_degree, + self.dp_degree, self.global_word_size) if self.dp_degree > 1: self.dp_ring_id = 2 - self.dp_rank = self.global_rank // (self.sharding_degree * - self.mp_degree * self.pp_degree) + self.dp_rank = self.global_rank // ( + self.sharding_degree * self.mp_degree * local_pp_degree) dp_first_rank_idx = self.global_rank % ( - self.sharding_degree * self.mp_degree * self.pp_degree) - dp_offset = (self.sharding_degree * self.mp_degree * self.pp_degree) + self.sharding_degree * self.mp_degree * local_pp_degree) + dp_offset = (self.sharding_degree * self.mp_degree * + local_pp_degree) self.dp_group_endpoints = [] for i in range(self.dp_degree): self.dp_group_endpoints.append(self.global_endpoints[ diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index bb14fb9a86f1596dd648de7c7b4de9c59231557f..49fb5399d8aecb14656f99dc6468a03e3baea65d 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -6005,7 +6005,14 @@ class PipelineOptimizer(object): for p in program_list: self._create_vars(p.global_block(), main_block) - self.local_rank %= len(device_list) + if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None): + self.local_rank = int(os.getenv("PADDLE_MANUAL_PIPELINE_STAGE")) + assert self.local_rank < len(device_list), ( + "Manually specified " + "pipeline stage must be less than total number of pipeline " + "stages.") + else: + self.local_rank %= len(device_list) # Step3.5: optimize forward send sync_comm to overlap send and recv self._optimize_forward_send_sync(program_list[self.local_rank]) diff --git a/python/paddle/fluid/tests/unittests/test_pipeline.py b/python/paddle/fluid/tests/unittests/test_pipeline.py index 8f46119d551c6784e467400df8f6da179932532a..04772a2da2871d542588d5bdc434b54348cd78a0 100644 --- a/python/paddle/fluid/tests/unittests/test_pipeline.py +++ b/python/paddle/fluid/tests/unittests/test_pipeline.py @@ -63,7 +63,7 @@ class TestPipeline(TestDistBase): "pipeline_mnist_one_device.py", check_error_log=True, log_name=flag_name, - need_envs=self.need_envs()) + need_envs={"PADDLE_MANUAL_PIPELINE_STAGE": "0"}) if __name__ == '__main__':