diff --git a/paddle/fluid/operators/collective/c_allreduce_sum_op_xpu.cc b/paddle/fluid/operators/collective/c_allreduce_sum_op.kps similarity index 58% rename from paddle/fluid/operators/collective/c_allreduce_sum_op_xpu.cc rename to paddle/fluid/operators/collective/c_allreduce_sum_op.kps index d23572e6d670b7071163d7298747d5ec06a7df16..3230d2c9ec33134c342266a1dfa2aa4675fc5495 100644 --- a/paddle/fluid/operators/collective/c_allreduce_sum_op_xpu.cc +++ b/paddle/fluid/operators/collective/c_allreduce_sum_op.kps @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -12,10 +12,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#ifdef PADDLE_WITH_XPU_KP + +// Please do not modify the following code +#if defined(__CUDA_ARCH__) +#undef __CUDA_ARCH__ +#endif + +#if defined(__CUDACC__) +#undef __CUDACC__ +#endif + +#if defined(__CUDA__) +#undef __CUDA__ +#endif + +#if defined(__NVCC__) +#undef __NVCC__ +#endif + #include "paddle/fluid/operators/collective/c_allreduce_op.h" namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_XPU_KERNEL(c_allreduce_sum, - ops::CAllReduceOpXPUKernel) +REGISTER_OP_KERNEL(c_allreduce_sum, KP, plat::XPUPlace, + ops::CAllReduceOpXPUKernel); + +#endif diff --git a/paddle/fluid/operators/collective/c_comm_init_all_op.cc b/paddle/fluid/operators/collective/c_comm_init_all_op.cc index 5820bd318d8bc9d8d3902ec6f0a22c0853147d37..ce2da1f22f1489f702808e3ac2e3bf26742858a6 100644 --- a/paddle/fluid/operators/collective/c_comm_init_all_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_all_op.cc @@ -17,11 +17,16 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/threadpool.h" -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #include "paddle/fluid/platform/collective_helper.h" + +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif +#if defined(PADDLE_WITH_XPU_BKCL) +#include "paddle/fluid/platform/device/xpu/bkcl_helper.h" +#endif + namespace paddle { namespace framework { class InferShapeContext; @@ -48,9 +53,9 @@ class CCommInitAllOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { - PADDLE_ENFORCE_EQ(platform::is_gpu_place(place), true, - platform::errors::PreconditionNotMet( - "CCommInitAllOp can run on gpu place only")); +// PADDLE_ENFORCE_EQ(platform::is_gpu_place(place), true, +// platform::errors::PreconditionNotMet( +// "CCommInitAllOp can run on gpu place only")); #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) std::vector devices = Attr>("devices"); @@ -61,9 +66,52 @@ class CCommInitAllOp : public framework::OperatorBase { int rid = Attr("ring_id"); platform::NCCLCommContext::Instance().CreateAllNCCLComms(devices, rid); + +#elif defined(PADDLE_WITH_XPU_BKCL) + std::vector devices = Attr>("devices"); + int ring_id = Attr("ring_id"); + + if (devices.empty()) { + int count = platform::GetXPUDeviceCount(); + for (int i = 0; i < count; ++i) { + devices.push_back(i); + } + } + + if (devices.size() > 1) { + std::vector place_list_; + for (size_t i = 0; i < devices.size(); ++i) { + auto p = platform::XPUPlace(devices[i]); + place_list_.push_back(p); + } + + // create pthread to bkcl_init_rank on all devices + auto ptr = new platform::BKCLContextMap(place_list_); + ptr->init(); + + for (size_t i = 0; i < devices.size(); ++i) { + platform::BKCLCommContext::Instance().AssignBKCLComm( + ptr->contexts_.at(devices[i]).comm_, devices.size(), devices[i], + devices[i], ring_id); + + VLOG(0) << "bkcl communicator of rank " << devices[i] << " in ring " + << ring_id << " has been created on device " << devices[i]; + + // TODO(WorgenZhang): need release comm_map_ when quit + // std::call_once(once_flag_, []() { + // std::atexit([]() { + // platform::BKCLCommContext::Instance().ReleaseBKCLComms(); }); + // }); + } + + VLOG(0) << "done bkcl_init_rank on all devices"; + } else { + VLOG(0) + << "bkcl_init_rank doesn't support on one device, skip init process"; + } #else PADDLE_THROW(platform::errors::PreconditionNotMet( - "PaddlePaddle should compile with GPU.")); + "PaddlePaddle should compile with GPU or XPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_comm_init_op.cc b/paddle/fluid/operators/collective/c_comm_init_op.cc index 82d3b1b1dbfea06c842753e27fd150b346b00717..490747520d67d5adb52ed46b68c9d19670a9464b 100644 --- a/paddle/fluid/operators/collective/c_comm_init_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_op.cc @@ -97,18 +97,9 @@ class CCommInitOp : public framework::OperatorBase { if (Attr("device_id") >= 0) { device_id = Attr("device_id"); } - -#if defined(PADDLE_WITH_XPU_BKCL) && defined(PADDLE_WITH_HETERPS) && \ - defined(PADDLE_WITH_PSLIB) - // XPUPS rank_id only equals 0, so replace rank_id with device_id - CommContext::Instance().CreateComm(comm_id, nranks, device_id, device_id, - rid); -#else int rank_id = Attr("rank"); CommContext::Instance().CreateComm(comm_id, nranks, rank_id, device_id, rid); -#endif - #endif } }; diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc index 6ad22ff8b19eb58918d5728fd985551058c15338..bf7434686b97acb77d1487cdd6d50f79cf6011f3 100644 --- a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc @@ -23,7 +23,6 @@ class CSyncCalcStreamOpMaker : public framework::OpProtoAndCheckerMaker { AddOutput("Out", "(Tensor) Dependency of the variable need to sync"); AddComment(R"DOC( CSyncCalcStream Operator - Call calculation stream synchronization. )DOC"); } diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op.kps b/paddle/fluid/operators/collective/c_sync_calc_stream_op.kps new file mode 100644 index 0000000000000000000000000000000000000000..65126f416c4aadb6089532fb9e4ee493f05b593d --- /dev/null +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op.kps @@ -0,0 +1,42 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifdef PADDLE_WITH_XPU_KP + +// Please do not modify the following code +#if defined(__CUDA_ARCH__) +#undef __CUDA_ARCH__ +#endif + +#if defined(__CUDACC__) +#undef __CUDACC__ +#endif + +#if defined(__CUDA__) +#undef __CUDA__ +#endif + +#if defined(__NVCC__) +#undef __NVCC__ +#endif + +#include "paddle/fluid/operators/collective/c_sync_calc_stream_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_KERNEL(c_sync_calc_stream, KP, plat::XPUPlace, + ops::CSyncCalcStreamKernel); + +#endif diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc index 5a9a00aa8e4d2f291b240ce025c2d52706c05b5c..a3717459a2dac958211dd4772c248d578566b882 100644 --- a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc @@ -11,25 +11,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include - -#include "paddle/fluid/framework/op_registry.h" - -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) -#include "paddle/fluid/platform/device/gpu/nccl_helper.h" -#endif - -#if defined(PADDLE_WITH_ASCEND_CL) -#include "paddle/fluid/platform/device/npu/hccl_helper.h" -#endif - -#if defined(PADDLE_WITH_CNCL) -#include "paddle/fluid/platform/device/mlu/cncl_helper.h" -#endif - -#if defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) -#include "paddle/fluid/platform/collective_helper.h" -#endif +#include "paddle/fluid/operators/collective/c_sync_comm_stream_op.h" namespace paddle { namespace operators { @@ -58,62 +40,11 @@ class CSyncCommStreamOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("ring_id", "(int default 0) ring id.").SetDefault(0); AddComment(R"DOC( CSyncCommStream Operator - Call communication stream synchronization. )DOC"); } }; -template -class CSyncCommStreamKernel : public framework::OpKernel { - public: - void Compute(const framework::ExecutionContext& ctx) const override { -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - auto place = ctx.GetPlace(); - int ring_id = ctx.Attr("ring_id"); - auto stream = - platform::NCCLCommContext::Instance().Get(ring_id, place)->stream(); - - platform::GpuStreamSync(stream); - -#elif defined(PADDLE_WITH_ASCEND_CL) - auto place = ctx.GetPlace(); - PADDLE_ENFORCE_EQ(platform::is_npu_place(place), true, - platform::errors::PreconditionNotMet( - "Sync comm stream op can run on npu place only for " - "now, but we got %s, please check the environment.", - place.DebugString())); - int ring_id = ctx.Attr("ring_id"); - auto stream = - platform::HCCLCommContext::Instance().Get(ring_id, place)->stream(); - platform::NPUStreamSync(stream); - -#elif defined(PADDLE_WITH_CNCL) - auto place = ctx.GetPlace(); - PADDLE_ENFORCE_EQ(platform::is_mlu_place(place), true, - platform::errors::PreconditionNotMet( - "Sync stream op can run on mlu place only for now.")); - int ring_id = ctx.Attr("ring_id"); - auto stream = - platform::CNCLCommContext::Instance().Get(ring_id, place)->stream(); - platform::MLUStreamSync(stream); -#elif defined(PADDLE_WITH_XPU_BKCL) - auto place = ctx.GetPlace(); - PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true, - platform::errors::PreconditionNotMet( - "Sync stream op can run on xpu place only for now.")); - int ring_id = ctx.Attr("ring_id"); - auto comm_dev_ctx = platform::BKCLCommContext::Instance() - .Get(ring_id, place) - ->dev_context(); - comm_dev_ctx->Wait(); -#else - PADDLE_THROW(platform::errors::PreconditionNotMet( - "PaddlePaddle should compile with GPU.")); -#endif - } -}; - } // namespace operators } // namespace paddle @@ -127,5 +58,3 @@ REGISTER_OP_CUDA_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); REGISTER_OP_NPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); REGISTER_OP_MLU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); - -REGISTER_OP_XPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel); diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.h b/paddle/fluid/operators/collective/c_sync_comm_stream_op.h new file mode 100644 index 0000000000000000000000000000000000000000..f9dec9303742cc051121d1943a0df977665eb409 --- /dev/null +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.h @@ -0,0 +1,88 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include + +#include "paddle/fluid/framework/op_registry.h" + +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/platform/device/gpu/nccl_helper.h" +#endif + +#if defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/device/npu/hccl_helper.h" +#endif + +#if defined(PADDLE_WITH_CNCL) +#include "paddle/fluid/platform/device/mlu/cncl_helper.h" +#endif + +#if defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/collective_helper.h" +#endif + +namespace paddle { +namespace operators { + +template +class CSyncCommStreamKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + auto place = ctx.GetPlace(); + int ring_id = ctx.Attr("ring_id"); + auto stream = + platform::NCCLCommContext::Instance().Get(ring_id, place)->stream(); + + platform::GpuStreamSync(stream); + +#elif defined(PADDLE_WITH_ASCEND_CL) + auto place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_npu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync comm stream op can run on npu place only for " + "now, but we got %s, please check the environment.", + place.DebugString())); + int ring_id = ctx.Attr("ring_id"); + auto stream = + platform::HCCLCommContext::Instance().Get(ring_id, place)->stream(); + platform::NPUStreamSync(stream); + +#elif defined(PADDLE_WITH_CNCL) + auto place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_mlu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on mlu place only for now.")); + int ring_id = ctx.Attr("ring_id"); + auto stream = + platform::CNCLCommContext::Instance().Get(ring_id, place)->stream(); + platform::MLUStreamSync(stream); +#elif defined(PADDLE_WITH_XPU_BKCL) + auto place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on xpu place only for now.")); + int ring_id = ctx.Attr("ring_id"); + auto comm_dev_ctx = platform::BKCLCommContext::Instance() + .Get(ring_id, place) + ->dev_context(); + comm_dev_ctx->Wait(); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.kps b/paddle/fluid/operators/collective/c_sync_comm_stream_op.kps new file mode 100644 index 0000000000000000000000000000000000000000..bfac7bf5c5b9266697a2c31c539219c4a458cf10 --- /dev/null +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.kps @@ -0,0 +1,42 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifdef PADDLE_WITH_XPU_KP + +// Please do not modify the following code +#if defined(__CUDA_ARCH__) +#undef __CUDA_ARCH__ +#endif + +#if defined(__CUDACC__) +#undef __CUDACC__ +#endif + +#if defined(__CUDA__) +#undef __CUDA__ +#endif + +#if defined(__NVCC__) +#undef __NVCC__ +#endif + +#include "paddle/fluid/operators/collective/c_sync_comm_stream_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_KERNEL(c_sync_comm_stream, KP, plat::XPUPlace, + ops::CSyncCommStreamKernel); + +#endif diff --git a/paddle/fluid/platform/device/xpu/xpu_op_kpfirst_list.h b/paddle/fluid/platform/device/xpu/xpu_op_kpfirst_list.h index 778c18146d64d014d8384ed3c4b35148268b6e56..452f388f03dcf9643fdff89edb27578c732c490c 100644 --- a/paddle/fluid/platform/device/xpu/xpu_op_kpfirst_list.h +++ b/paddle/fluid/platform/device/xpu/xpu_op_kpfirst_list.h @@ -113,6 +113,12 @@ XPUOpMap& get_kp_ops() { XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, {"reduce_amax", XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, {"reduce_amin", XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, + {"c_sync_calc_stream", + XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, + {"c_sync_comm_stream", + XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, + {"c_allreduce_sum", + XPUKernelSet({pOpKernelType(vartype::FP32, XPUPlace())})}, }; return s_xpu_kp_kernels; diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index 95ab446e1de6d4763024be6021913f330513043f..1ddebad286d3661f09665298a5acfdbb3b580315 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -473,33 +473,14 @@ class MultiThread(GradAllReduce): print( "begin to _transpile_startup_program for single-node in XPU") block = self.startup_program.global_block() - comm_id_var = block.create_var( - name=unique_name.generate('comm_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) block.append_op( - type='c_gen_bkcl_id', - inputs={}, - outputs={'Out': comm_id_var}, + type='c_comm_init_all', attrs={ - 'rank': self.rank, - 'endpoint': self.current_endpoint, - 'other_endpoints': self.other_endpoints, - 'ring_id': 0, - self.op_role_key: OpRole.Forward + 'devices': list( + map(int, + os.getenv("FLAGS_selected_gpus").split(","))), + 'ring_id': 0 }) - block.append_op( - type='c_comm_init', - inputs={'X': comm_id_var}, - outputs={}, - attrs={ - 'nranks': - len(os.getenv("FLAGS_selected_gpus").split(",")), - 'rank': self.rank, - 'ring_id': 0, - self.op_role_key: OpRole.Forward - }) - else: print("begin to _transpile_startup_program for single-node") block = self.startup_program.global_block() @@ -515,6 +496,11 @@ class MultiThread(GradAllReduce): elif self.trans_mode == "fuse_all_reduce": print("begin to transpile in fuse all-reduce mode") self._insert_fuse_allreduce_ops() + elif self.trans_mode == "all_reduce_xpu" and len( + os.getenv("FLAGS_selected_gpus").split(",")) == 1: + print( + "skip transpile in all-reduce-xpu mode when number of devices is only one" + ) else: print("begin to transpile in all-reduce mode") self._insert_allreduce_ops()