diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index e1a1c1fab5ef0a32e385bf0eae4da7fb8a1c97a1..895e459a37dd7df632445db8ae728799343e5160 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -522,7 +522,8 @@ class HeterCpuWorker : public HogwildWorker { }; #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) class PSGPUWorker : public HogwildWorker { public: @@ -537,8 +538,10 @@ class PSGPUWorker : public HogwildWorker { new (&program_) ProgramDesc(main_program); } void ProduceTasks() override; +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) virtual void SetStream(const gpuStream_t stream) { copy_stream_ = stream; } virtual void SetEvent(const gpuEvent_t event) { event_ = event; } +#endif void ResetStat(); protected: @@ -588,8 +591,10 @@ class PSGPUWorker : public HogwildWorker { std::unordered_map> feasign_set_; paddle::framework::Channel> pull_queue_; paddle::framework::Channel> push_queue_; +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) gpuEvent_t event_; gpuStream_t copy_stream_; +#endif int batch_cnt_{0}; std::atomic done_cnt_{0}; diff --git a/paddle/fluid/framework/device_worker_factory.cc b/paddle/fluid/framework/device_worker_factory.cc index 9c418b2f786ca288ff7945b7c99fdd2858a21e52..e6635a2f941cd11381670156050567f47c56665c 100644 --- a/paddle/fluid/framework/device_worker_factory.cc +++ b/paddle/fluid/framework/device_worker_factory.cc @@ -75,7 +75,8 @@ REGISTER_DEVICE_WORKER_CLASS(HeterSectionWorker); REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker); #endif diff --git a/paddle/fluid/framework/ps_gpu_trainer.cc b/paddle/fluid/framework/ps_gpu_trainer.cc index e4004c2fbf3b56731b460b74f5a6ed0eaaedd25b..9b12870a2bb9b01dcad6d65847562c08caf58853 100644 --- a/paddle/fluid/framework/ps_gpu_trainer.cc +++ b/paddle/fluid/framework/ps_gpu_trainer.cc @@ -23,7 +23,8 @@ limitations under the License. */ #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/framework/trainer.h" -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) #ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/cuda_device_guard.h" diff --git a/paddle/fluid/framework/ps_gpu_worker.cc b/paddle/fluid/framework/ps_gpu_worker.cc index 323b2c4803afd584cabd0ae56bd3c8d993006db1..e46bdf6d879e0ef21bff12d61c181f748cc93c86 100644 --- a/paddle/fluid/framework/ps_gpu_worker.cc +++ b/paddle/fluid/framework/ps_gpu_worker.cc @@ -18,7 +18,8 @@ limitations under the License. */ #include "paddle/fluid/platform/lodtensor_printer.h" #include "paddle/fluid/string/string_helper.h" -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) #ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/cuda_device_guard.h" @@ -131,6 +132,11 @@ void PSGPUWorker::TrainFiles() { device_reader_->Start(); int cur_batch; int batch_cnt = 0; +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + platform::SetDeviceId(thread_id_); +#elif defined(PADDLE_WITH_XPU_BKCL) + platform::SetXPUDeviceId(thread_id_); +#endif while ((cur_batch = device_reader_->Next()) > 0) { total_ins_num += cur_batch; for (auto& op : ops_) { @@ -227,6 +233,11 @@ void PSGPUWorker::TrainFilesWithProfiler() { int total_ins_num = 0; int cur_batch; timeline.Start(); +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + platform::SetDeviceId(thread_id_); +#elif defined(PADDLE_WITH_XPU_BKCL) + platform::SetXPUDeviceId(thread_id_); +#endif while ((cur_batch = device_reader_->Next()) > 0) { total_ins_num += cur_batch; timeline.Pause(); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 8a11775702e57887015f831fcd4e3a3f91bd9d56..2496d4d040e2e9889089d8436fc43cab10ffcb5f 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -248,7 +248,8 @@ class HeterXpuTrainer : public TrainerBase { #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) class PSGPUTrainer : public TrainerBase { public: diff --git a/paddle/fluid/framework/trainer_factory.cc b/paddle/fluid/framework/trainer_factory.cc index f189d0213da881399f5ef63f14cd7e16a86d8799..1f1122d32f5c3d07414dd9240ee590fa07cb84c8 100644 --- a/paddle/fluid/framework/trainer_factory.cc +++ b/paddle/fluid/framework/trainer_factory.cc @@ -76,7 +76,8 @@ REGISTER_TRAINER_CLASS(HeterPipelineTrainer); (defined PADDLE_WITH_PSLIB) && (!defined(PADDLE_WITH_HETERPS)) REGISTER_TRAINER_CLASS(HeterXpuTrainer); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ +#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \ + defined PADDLE_WITH_XPU_BKCL) && \ (defined PADDLE_WITH_PSLIB) REGISTER_TRAINER_CLASS(PSGPUTrainer); #endif diff --git a/paddle/fluid/operators/collective/c_comm_init_op.cc b/paddle/fluid/operators/collective/c_comm_init_op.cc index 39acb50d4e8709d9aa59e3d2b528297d50e1ea60..82d3b1b1dbfea06c842753e27fd150b346b00717 100644 --- a/paddle/fluid/operators/collective/c_comm_init_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_op.cc @@ -83,7 +83,6 @@ class CCommInitOp : public framework::OperatorBase { UniqueId* comm_id = var->GetMutable(); int nranks = Attr("nranks"); - int rank_id = Attr("rank"); int rid = Attr("ring_id"); #if defined(PADDLE_WITH_XPU_BKCL) @@ -98,8 +97,18 @@ class CCommInitOp : public framework::OperatorBase { if (Attr("device_id") >= 0) { device_id = Attr("device_id"); } + +#if defined(PADDLE_WITH_XPU_BKCL) && defined(PADDLE_WITH_HETERPS) && \ + defined(PADDLE_WITH_PSLIB) + // XPUPS rank_id only equals 0, so replace rank_id with device_id + CommContext::Instance().CreateComm(comm_id, nranks, device_id, device_id, + rid); +#else + int rank_id = Attr("rank"); CommContext::Instance().CreateComm(comm_id, nranks, rank_id, device_id, rid); +#endif + #endif } }; diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc index 42584948e065156a6f4daef934203ee812bb5c10..088366dbc8f6995cc90bf8c4d8334a7dcdb6a8a6 100644 --- a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc @@ -76,7 +76,15 @@ class CSyncCalcStreamKernel : public framework::OpKernel { auto dev_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); platform::MLUStreamSync(dev_ctx->stream()); +#elif defined(PADDLE_WITH_XPU_BKCL) + auto place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on xpu place only for now.")); + auto dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place)); + dev_ctx->Wait(); #else PADDLE_THROW(platform::errors::PreconditionNotMet( "PaddlePaddle should compile with GPU.")); @@ -97,3 +105,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel); REGISTER_OP_NPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel); REGISTER_OP_MLU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel); + +REGISTER_OP_XPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel); diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc index 37ce4ef7ee21d883a4ce940a01dca80930592920..5a9a00aa8e4d2f291b240ce025c2d52706c05b5c 100644 --- a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc @@ -20,7 +20,6 @@ limitations under the License. */ #endif #if defined(PADDLE_WITH_ASCEND_CL) -#include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/npu/hccl_helper.h" #endif @@ -28,6 +27,10 @@ limitations under the License. */ #include "paddle/fluid/platform/device/mlu/cncl_helper.h" #endif +#if defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/collective_helper.h" +#endif + namespace paddle { namespace operators { @@ -94,7 +97,16 @@ class CSyncCommStreamKernel : public framework::OpKernel { auto stream = platform::CNCLCommContext::Instance().Get(ring_id, place)->stream(); platform::MLUStreamSync(stream); - +#elif defined(PADDLE_WITH_XPU_BKCL) + auto place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on xpu place only for now.")); + int ring_id = ctx.Attr("ring_id"); + auto comm_dev_ctx = platform::BKCLCommContext::Instance() + .Get(ring_id, place) + ->dev_context(); + comm_dev_ctx->Wait(); #else PADDLE_THROW(platform::errors::PreconditionNotMet( "PaddlePaddle should compile with GPU.")); @@ -115,3 +127,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); REGISTER_OP_NPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); REGISTER_OP_MLU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); + +REGISTER_OP_XPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 8d803c0d5bd7d9d369170db3260f13ca63e8a2d7..40ff41fe89f47f2b240015a7908bcdf72b26f027 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -1139,10 +1139,11 @@ class DownpourOptimizer(DistributedOptimizer): from paddle.fluid.transpiler.collective import MultiThread # check start program if program_mode not in [ - "all_reduce", "fuse_all_reduce", "all_gather" + "all_reduce", "fuse_all_reduce", "all_gather", + "all_reduce_xpu" ]: raise ValueError("You should set program_mode in [ all_reduce, \ - fuse_all_reduce, all_gather ]") + fuse_all_reduce, all_gather, all_reduce_xpu ]") env = self.get_dist_env() if not isinstance(losses, list): startup_programs = [startup_programs] diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index ea88a89e68224cef7173e57ecb50ac1e77e68ab0..95ab446e1de6d4763024be6021913f330513043f 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -42,6 +42,7 @@ class Collective(object): self.nrings = nrings self.endpoints = None self.current_endpoint = None + self.other_endpoints = None self.nranks = None self.rank = None self.startup_program = None @@ -79,6 +80,12 @@ class Collective(object): self.endpoints = endpoints self.current_endpoint = current_endpoint + if current_endpoint: + nranks = len(endpoints) + other_endpoints = endpoints[:] + other_endpoints.remove(current_endpoint) + self.other_endpoints = other_endpoints + self.wait_port = wait_port self.startup_program._origin_program = self.startup_program.clone() @@ -462,9 +469,41 @@ class MultiThread(GradAllReduce): self.rank, ring_id, self.wait_port, True) else: - print("begin to _transpile_startup_program for single-node") - block = self.startup_program.global_block() - block.append_op(type='c_comm_init_all', attrs={'ring_id': 0}) + if "xpu" in self.trans_mode: + print( + "begin to _transpile_startup_program for single-node in XPU") + block = self.startup_program.global_block() + comm_id_var = block.create_var( + name=unique_name.generate('comm_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_bkcl_id', + inputs={}, + outputs={'Out': comm_id_var}, + attrs={ + 'rank': self.rank, + 'endpoint': self.current_endpoint, + 'other_endpoints': self.other_endpoints, + 'ring_id': 0, + self.op_role_key: OpRole.Forward + }) + block.append_op( + type='c_comm_init', + inputs={'X': comm_id_var}, + outputs={}, + attrs={ + 'nranks': + len(os.getenv("FLAGS_selected_gpus").split(",")), + 'rank': self.rank, + 'ring_id': 0, + self.op_role_key: OpRole.Forward + }) + + else: + print("begin to _transpile_startup_program for single-node") + block = self.startup_program.global_block() + block.append_op(type='c_comm_init_all', attrs={'ring_id': 0}) def _transpile_main_program(self): self._insert_scale_loss_grad_ops()