diff --git a/paddle/fluid/imperative/CMakeLists.txt b/paddle/fluid/imperative/CMakeLists.txt index 8f196636af4894deee2044586fb7903e2780ba5a..9121610d29eaa0f64ef3041caa6a9a2c89a1b038 100644 --- a/paddle/fluid/imperative/CMakeLists.txt +++ b/paddle/fluid/imperative/CMakeLists.txt @@ -26,11 +26,15 @@ if(NOT WIN32) cc_library(bkcl_context SRCS bkcl_context.cc DEPS collective_helper device_context tensor var_type_traits) cc_library(reducer SRCS reducer.cc DEPS layer) endif() + if(WITH_ASCEND_CL) + cc_library(hccl_context SRCS hccl_context.cc DEPS collective_helper device_context tensor var_type_traits) + cc_library(reducer SRCS reducer.cc DEPS layer) + endif() cc_library(data_loader SRCS data_loader.cc DEPS enforce) endif(NOT WIN32) if(WITH_GLOO) cc_library(imperative_gloo_context SRCS gloo_context.cc DEPS collective_helper device_context tensor var_type_traits) - if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL) )) + if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL OR WITH_ASCEND_CL) )) cc_library(reducer SRCS reducer.cc DEPS layer) endif() endif() diff --git a/paddle/fluid/imperative/hccl_context.cc b/paddle/fluid/imperative/hccl_context.cc new file mode 100644 index 0000000000000000000000000000000000000000..5d2183743b32b2bbccd842413be6dcb0381452de --- /dev/null +++ b/paddle/fluid/imperative/hccl_context.cc @@ -0,0 +1,218 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/imperative/hccl_context.h" + +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable.h" + +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/gen_comm_id_helper.h" +#include "paddle/fluid/platform/place.h" + +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" + +namespace paddle { +namespace framework { +class Variable; +} // namespace framework +} // namespace paddle + +namespace paddle { +namespace imperative { + +static void AllReduce(const framework::Tensor &src, framework::Tensor *dst, + const aclrtStream stream, + const platform::HCCLComm *comm) { + const auto &place = src.place(); + PADDLE_ENFORCE_EQ( + platform::is_npu_place(place), true, + platform::errors::Unimplemented( + "Imperative mode does not support multi-CPU training yet.")); + + void *src_ptr = const_cast(src.data()); + dst->Resize(src.dims()); + void *dst_ptr = dst->mutable_data(src.place(), src.type()); + HcclDataType hccl_dtype = platform::ToHCCLDataType(src.type()); + + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclAllReduce( + src_ptr, dst_ptr, src.numel(), hccl_dtype, HCCL_REDUCE_SUM, comm->comm(), + reinterpret_cast(stream))); +} + +void HCCLParallelContext::BcastHCCLId( + std::vector &hccl_ids, // NOLINT + int root, int server_fd) { + if (strategy_.local_rank_ == root) { + std::vector other_trainers; + for (auto &ep : strategy_.trainer_endpoints_) { + if (ep != strategy_.current_endpoint_) { + other_trainers.push_back(ep); + } + } + platform::SendBroadCastCommID(other_trainers, &hccl_ids); + } else { + platform::RecvBroadCastCommID(server_fd, strategy_.current_endpoint_, + &hccl_ids); + } +} + +void HCCLParallelContext::Init() { + int server_fd = -1; + + std::vector hccl_ids; + hccl_ids.resize(strategy_.nrings_); + + if (strategy_.local_rank_ == 0) { + // generate the unique hcclid on the root worker + for (size_t i = 0; i < hccl_ids.size(); ++i) { + platform::dynload::HcclGetRootInfo(&hccl_ids[i]); + } + } else { + server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_) + .socket(); + } + BcastHCCLId(hccl_ids, 0, server_fd); + + int npu_id = BOOST_GET_CONST(platform::NPUPlace, place_).device; + for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) { + VLOG(0) << "init hccl context nranks: " << strategy_.nranks_ + << " local rank: " << strategy_.local_rank_ << " npu id: " << npu_id + << " ring id: " << ring_id; + // it will assign hccl_comm in NPUDeviceContext within ring_id + platform::HCCLCommContext::Instance().CreateHCCLComm( + &hccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, npu_id, + ring_id); + + compute_events_.emplace_back(platform::NpuEventResourcePool::Instance().New( + BOOST_GET_CONST(platform::NPUPlace, place_).device)); + comm_events_.emplace_back(platform::NpuEventResourcePool::Instance().New( + BOOST_GET_CONST(platform::NPUPlace, place_).device)); + } +} + +void HCCLParallelContext::InitWithRingID(int ring_id) { + int server_fd = -1; + std::vector hccl_ids; + hccl_ids.resize(1); + + if (strategy_.local_rank_ == 0) { + // generate the unique hcclid on the root worker + platform::dynload::HcclGetRootInfo(&hccl_ids[0]); + } else { + server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_) + .socket(); + } + BcastHCCLId(hccl_ids, 0, server_fd); + + int npu_id = BOOST_GET_CONST(platform::NPUPlace, place_).device; + VLOG(0) << "init hccl context nranks: " << strategy_.nranks_ + << " local rank: " << strategy_.local_rank_ << " npu id: " << npu_id + << " ring id: " << ring_id; + // it will assign hccl_comm in NPUDeviceContext within ring_id + platform::HCCLCommContext::Instance().CreateHCCLComm( + &hccl_ids[0], strategy_.nranks_, strategy_.local_rank_, npu_id, ring_id); + + compute_events_.emplace_back(platform::NpuEventResourcePool::Instance().New( + BOOST_GET_CONST(platform::NPUPlace, place_).device)); + comm_events_.emplace_back(platform::NpuEventResourcePool::Instance().New( + BOOST_GET_CONST(platform::NPUPlace, place_).device)); +} + +void HCCLParallelContext::AllReduceByStream(const framework::Variable &src, + framework::Variable *dst, + int ring_id, bool use_calc_stream) { + PADDLE_ENFORCE_EQ( + platform::is_npu_place(place_), true, + platform::errors::Unimplemented( + "Dynamic graph mode does not support multi-CPU training yet.")); + auto *dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place_)); + platform::HCCLComm *comm = + platform::HCCLCommContext::Instance().Get(ring_id, place_); + aclrtStream stream = use_calc_stream ? dev_ctx->stream() : comm->stream(); + + if (src.IsType()) { + if (!dst->IsType()) { + dst->Clear(); + } + AllReduce(src.Get(), + dst->GetMutable(), stream, comm); + } else { + PADDLE_THROW(platform::errors::InvalidArgument( + "XPU unsupported variable type %s for imperative allreduce, only " + "LoDTensor are supported.", + platform::demangle(framework::ToTypeName(src.Type())))); + } +} + +paddle::platform::DeviceContext *HCCLParallelContext::GetDeviceContext( + int ring_id) { + return static_cast( + platform::HCCLCommContext::Instance() + .Get(ring_id, place_) + ->dev_context()); +} + +void HCCLParallelContext::WaitCompute(int ring_id) { + PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange( + "ring id must >= 0, but got %d", ring_id)); + PADDLE_ENFORCE_LT(ring_id, compute_events_.size(), + platform::errors::OutOfRange( + "ring id must < compute events size," + "but got ring id = %d, compute events size = %d", + ring_id, compute_events_.size())); + + auto compute_stream = static_cast( + platform::DeviceContextPool::Instance().Get(place_)) + ->stream(); + auto comm_stream = + platform::HCCLCommContext::Instance().Get(ring_id, place_)->stream(); + auto event = compute_events_[ring_id].get(); + + // compute_stream-->event-->comm_stream + PADDLE_ENFORCE_NPU_SUCCESS(aclrtRecordEvent(event, compute_stream)); + PADDLE_ENFORCE_NPU_SUCCESS(aclrtStreamWaitEvent(comm_stream, event)); +} + +void HCCLParallelContext::WaitComm(int ring_id) { + PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange( + "ring id must >= 0, but got %d", ring_id)); + PADDLE_ENFORCE_LT(ring_id, comm_events_.size(), + platform::errors::OutOfRange( + "ring id must < comm events size," + "but got ring id = %d, comm events size = %d", + ring_id, comm_events_.size())); + + auto compute_stream = static_cast( + platform::DeviceContextPool::Instance().Get(place_)) + ->stream(); + auto comm_stream = + platform::HCCLCommContext::Instance().Get(ring_id, place_)->stream(); + auto event = comm_events_[ring_id].get(); + + // comm_stream-->event-->compute_stream + PADDLE_ENFORCE_NPU_SUCCESS(aclrtRecordEvent(event, comm_stream)); + PADDLE_ENFORCE_NPU_SUCCESS(aclrtStreamWaitEvent(compute_stream, event)); +} + +void HCCLParallelContext::SynchronizeCompute() { + auto *compute_dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place_)); + compute_dev_ctx->Wait(); +} + +} // namespace imperative +} // namespace paddle diff --git a/paddle/fluid/imperative/hccl_context.h b/paddle/fluid/imperative/hccl_context.h new file mode 100644 index 0000000000000000000000000000000000000000..9ee82d496bade5f504288bc2aab68955b56c6f3b --- /dev/null +++ b/paddle/fluid/imperative/hccl_context.h @@ -0,0 +1,71 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#ifdef PADDLE_WITH_ASCEND_CL +#include +#include +#include + +#include "paddle/fluid/imperative/parallel_context.h" +#include "paddle/fluid/platform/dynload/hccl.h" +#include "paddle/fluid/platform/npu_resource_pool.h" + +namespace paddle { +namespace framework { +class Variable; +} // namespace framework +} // namespace paddle + +namespace paddle { +namespace imperative { + +class HCCLParallelContext : public ParallelContext { + public: + explicit HCCLParallelContext(const ParallelStrategy& strategy, + const platform::Place& place) + : ParallelContext(strategy, place) {} + + ~HCCLParallelContext() override = default; + + void BcastHCCLId(std::vector& hccl_ids, int root, // NOLINT + int server_fd); + + void Init() override; + + void InitWithRingID(int ring_id) override; + + void AllReduceByStream(const framework::Variable& src, + framework::Variable* dst, int ring_id, + bool use_calc_stream) override; + + paddle::platform::DeviceContext* GetDeviceContext(int ring_id) override; + + void WaitCompute(int ring_id) override; + + void WaitComm(int ring_id) override; + + void SynchronizeCompute() override; + + private: + // used for comm wait compute, compute_stream-->event-->comm_stream[ring_id] + std::vector> compute_events_; + + // used for compute wait comm, comm_stream[ring_id]-->event-->compute_stream + std::vector> comm_events_; +}; + +} // namespace imperative +} // namespace paddle +#endif diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index 4ea3da949db574e5b41e89f6d632797592182cb8..2f023f644fd060d6f72002f58fe3b17f9fc469a1 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -228,6 +228,16 @@ void Group::ConcatTensors(const platform::DeviceContext &context) { PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't concat xpu grads since it's not compiled with BKCL," "Please recompile or reinstall Paddle with BKCL support.")); +#endif + } else if (platform::is_npu_place(place)) { +#ifdef PADDLE_WITH_ASCEND_CL + ConcatTensorsWithType( + static_cast(context), + dense_tensors_, &dense_contents_, dtype_); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't concat npu grads since it's not compiled with HCCL," + "Please recompile or reinstall Paddle with HCCL support.")); #endif } else if (platform::is_cpu_place(place)) { ConcatTensorsWithType( @@ -260,6 +270,16 @@ void Group::SplitTensors(const platform::DeviceContext &context) { PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't split xpu grad since it's not compiled with BKCL," "Please recompile or reinstall Paddle with BKCL support.")); +#endif + } else if (platform::is_npu_place(place)) { +#ifdef PADDLE_WITH_ASCEND_CL + SplitTensorsWithType( + static_cast(context), + &dense_contents_, &dense_tensors_, dtype_); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't split npu grad since it's not compiled with HCCL," + "Please recompile or reinstall Paddle with HCCL support.")); #endif } else if (platform::is_cpu_place(place)) { SplitTensorsWithType( diff --git a/paddle/fluid/operators/math/CMakeLists.txt b/paddle/fluid/operators/math/CMakeLists.txt index 6177ec749ac031a0749f3d5c3b090015c48d0bf7..c6005bebe18554f9928cd36b1f75a0f2553a89e6 100644 --- a/paddle/fluid/operators/math/CMakeLists.txt +++ b/paddle/fluid/operators/math/CMakeLists.txt @@ -44,7 +44,11 @@ if (WITH_ASCEND_CL) endif() # please add new math_library in alphabetical order +if (WITH_ASCEND_CL) +math_library(concat_and_split DEPS npu_op_runner) +else() math_library(concat_and_split) +endif() math_library(context_project DEPS im2col math_function) math_library(cross_entropy) math_library(cos_sim_functor) diff --git a/paddle/fluid/operators/math/concat_and_split.cc b/paddle/fluid/operators/math/concat_and_split.cc index 83b4e89fe046f4c30528c521577406d12d0b07ec..14aaaf5d721074ab63ca389e1ec9a45645a6081c 100644 --- a/paddle/fluid/operators/math/concat_and_split.cc +++ b/paddle/fluid/operators/math/concat_and_split.cc @@ -13,6 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/math/concat_and_split.h" +#ifdef PADDLE_WITH_ASCEND_CL +#include "paddle/fluid/operators/npu_op_runner.h" +#endif namespace paddle { namespace framework { @@ -215,6 +218,79 @@ class SplitFunctor { }; #endif +#ifdef PADDLE_WITH_ASCEND_CL +template +class ConcatFunctor { + public: + void operator()(const platform::NPUDeviceContext& context, + const std::vector& input, int axis, + framework::Tensor* output) { + int dev_id = + BOOST_GET_CONST(platform::NPUPlace, context.GetPlace()).GetDeviceId(); + platform::NPUDeviceGuard guard(dev_id); + + std::vector names; + for (size_t i = 0; i < input.size(); ++i) { + names.push_back("x" + std::to_string(i)); + } + NpuOpRunner runner{ + "ConcatD", + {input}, + {*output}, + {{"concat_dim", axis}, {"N", static_cast(input.size())}}}; + runner.AddInputNames(names); + runner.Run(context.stream()); + } +}; + +template +class SplitFunctor { + public: + void operator()(const platform::NPUDeviceContext& context, + const framework::Tensor& input, + const std::vector& ref_inputs, + const int axis, std::vector* outputs) { + if (input.numel() == 0) { + return; + } + + size_t num = outputs->size(); + + int input_rows = 1; + auto dim_0 = ref_inputs[0]->dims(); + for (int i = 0; i < axis; ++i) { + input_rows *= dim_0[i]; + } + + int input_cols = 0; + + std::vector output_cols(outputs->size()); + for (size_t i = 0; i < num; ++i) { + int t_cols = ref_inputs[i]->numel() / input_rows; + input_cols += t_cols; + output_cols[i] = t_cols; + } + auto npu_place = BOOST_GET_CONST(platform::NPUPlace, context.GetPlace()); + + // computation + for (int k = 0; k < input_rows; ++k) { + const T* src_ptr = input.data() + k * input_cols; + int col_idx = 0; + for (size_t j = 0; j < num; ++j) { + int col_len = output_cols[j]; + auto* out_tensor = outputs->at(j); + if (out_tensor != nullptr) { + T* dst_ptr = out_tensor->data() + k * col_len; + memory::Copy(npu_place, dst_ptr, npu_place, src_ptr + col_idx, + sizeof(T) * col_len, context.stream()); + } + col_idx += col_len; + } + } + } +}; +#endif + #define DEFINE_FUNCTOR(type) \ template class ConcatFunctor; \ template class SplitFunctor; @@ -229,6 +305,14 @@ FOR_ALL_TYPES(DEFINE_FUNCTOR); DEFINE_XPU_FUNCTOR(float) #endif +#ifdef PADDLE_WITH_ASCEND_CL +#define DEFINE_NPU_FUNCTOR(type) \ + template class ConcatFunctor; \ + template class SplitFunctor; + +FOR_ALL_TYPES(DEFINE_NPU_FUNCTOR) +#endif + } // namespace math } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/strided_memcpy.h b/paddle/fluid/operators/strided_memcpy.h index eb15fe016d911221dbd12d9243a8b675f7632451..159450aa178d11967183de3b0e1ffa6459e51d3a 100644 --- a/paddle/fluid/operators/strided_memcpy.h +++ b/paddle/fluid/operators/strided_memcpy.h @@ -104,6 +104,11 @@ inline void StridedNumelCopyWithAxis(const platform::DeviceContext& ctx, reinterpret_cast(ctx); memory::Copy(gpu_place, dst + i * dst_after, gpu_place, src + i * src_after, sizeof(T) * size, cuda_ctx.stream()); +#elif defined(PADDLE_WITH_ASCEND_CL) + auto& npu_place = BOOST_GET_CONST(platform::NPUPlace, place); + auto& npu_ctx = reinterpret_cast(ctx); + memory::Copy(npu_place, dst + i * dst_after, npu_place, + src + i * src_after, sizeof(T) * size, npu_ctx.stream()); #else PADDLE_THROW(platform::errors::PreconditionNotMet( "Paddle is not compiled with GPU.")); diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 47027d69f82e00b741a67a6406ac01c0b268cf27..588caed5a452ef456eff682be123d02296ba5736 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -33,6 +33,9 @@ if(NOT WIN32) if (WITH_NCCL OR WITH_RCCL) set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context) endif() + if (WITH_ASCEND_CL) + set(PYBIND_DEPS ${PYBIND_DEPS} hccl_context) + endif() endif(NOT WIN32) if(WITH_PYTHON) @@ -117,6 +120,10 @@ if(WITH_PYTHON) list(APPEND OP_FUNCTION_GENERETOR_DEPS bkcl_context) endif(WITH_XPU_BKCL) + if(WITH_ASCEND_CL) + list(APPEND OP_FUNCTION_GENERETOR_DEPS hccl_context) + endif(WITH_ASCEND_CL) + add_executable(op_function_generator op_function_generator.cc) target_link_libraries(op_function_generator ${OP_FUNCTION_GENERETOR_DEPS}) diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index 1423a89f5df7519fbf3f9b327cf96de70a854610..997e78e7c072bb68b0f2e84161c6d3bbcdd3a099 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -36,6 +36,7 @@ limitations under the License. */ #include "paddle/fluid/imperative/bkcl_context.h" #include "paddle/fluid/imperative/data_loader.h" #include "paddle/fluid/imperative/gloo_context.h" +#include "paddle/fluid/imperative/hccl_context.h" #include "paddle/fluid/imperative/hooks.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/imperative/nccl_context.h" @@ -2333,6 +2334,18 @@ void BindImperative(py::module *m_ptr) { py::arg("ring_id")); #endif +#if defined(PADDLE_WITH_ASCEND_CL) + py::class_>( + m, "HCCLParallelContext") + .def(py::init()) + .def("init", [](imperative::HCCLParallelContext &self) { self.Init(); }) + .def("init_with_ring_id", + &imperative::HCCLParallelContext::InitWithRingID, + py::arg("ring_id")); +#endif + m.def("pylayer_apply", [](const platform::CPUPlace &place, const py::object &cls, const py::args args, const py::kwargs kwargs) { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index bf943b7327e6d815bc90fd9dbf54a0657eb633f1..ec3a4ba78c7cca81d470ae3026d7146edf9958b3 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -119,6 +119,7 @@ limitations under the License. */ #endif #ifdef PADDLE_WITH_ASCEND_CL +#include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/npu_info.h" #include "paddle/fluid/platform/npu_profiler.h" #endif @@ -2453,6 +2454,8 @@ All parameter, weight, gradient are variables in Paddle. #ifdef PADDLE_WITH_ASCEND_CL m.def("get_npu_device_count", platform::GetNPUDeviceCount); m.def("npu_finalize", []() { + platform::HCCLCommContext::Instance().ReleaseHCCLComms(); + auto &pool = platform::DeviceContextPool::Instance(); auto devices = platform::GetSelectedNPUDevices(); for (size_t i = 0; i < devices.size(); ++i) { diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 2a251460e47bf6e0a012f7d03e5b891e48371db8..6f12e902ff14e50286cc32fa738a87ae25ee07e9 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -264,6 +264,10 @@ def new_group(ranks=None, backend=None): place = core.CUDAPlace(genv.device_id) core.NCCLParallelContext(strategy, place).init_with_ring_id(ring_id) + elif core.is_compiled_with_npu(): + place = core.NPUPlace(genv.device_id) + core.HCCLParallelContext(strategy, + place).init_with_ring_id(ring_id) else: assert False, ("no cuda device found") else: diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 34c74ad30679e4f3e343e7381935f3a31cd1edac..7ea479f0fbb14dd1a6e7d16f44e6d5ce6066edf6 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -58,8 +58,10 @@ def _start_kv_server(port, http_server_d, size): def _is_cpuonly(backend): check_backend(backend) - if backend in ['auto', 'nccl', 'bkcl'] and (core.is_compiled_with_cuda() or - core.is_compiled_with_xpu()): + if backend in ['auto', 'nccl', 'bkcl', 'hccl'] and ( + core.is_compiled_with_cuda() or core.is_compiled_with_xpu() or + core.is_compiled_with_npu()): + # passes 'auto' and can use cuda or xpu, use the default logics. so return False return False else: @@ -142,7 +144,7 @@ def init_parallel_env(): is_cpu_only = _is_cpuonly(backend) # 1. gpu xpu check, must be gpu or xpu, if not (is_cpu_only or core.is_compiled_with_cuda() or - core.is_compiled_with_xpu()): + core.is_compiled_with_xpu() or core.is_compiled_with_npu()): raise NotImplementedError( "If you want to use CPU-only version, please use 'gloo' as backend") @@ -204,6 +206,8 @@ def init_parallel_env(): place = core.CUDAPlace(parallel_env.device_id) elif core.is_compiled_with_xpu(): place = core.XPUPlace(parallel_env.device_id) + elif core.is_compiled_with_npu(): + place = core.NPUPlace(parallel_env.device_id) _set_expected_place(place) # init nccl or bkcl context @@ -216,6 +220,9 @@ def init_parallel_env(): elif core.is_compiled_with_xpu(): parallel_helper._set_parallel_ctx( core.BKCLParallelContext(strategy, place)) + elif core.is_compiled_with_npu(): + parallel_helper._set_parallel_ctx( + core.HCCLParallelContext(strategy, place)) other_endpoints = strategy.trainer_endpoints[:] other_endpoints.remove(strategy.current_endpoint) diff --git a/python/paddle/fluid/dygraph/nn.py b/python/paddle/fluid/dygraph/nn.py index 3703da08dea46ec57641a762584b6a36d7957d9b..574a56f3bcecb9e34a0b0fd08701e25fc62ae87c 100644 --- a/python/paddle/fluid/dygraph/nn.py +++ b/python/paddle/fluid/dygraph/nn.py @@ -207,9 +207,9 @@ class Conv2D(layers.Layer): if core.is_compiled_with_npu(): if (self._num_channels == self._groups and self._num_channels == self._num_filters): - l_type = 'depthwise_conv2d' + self._l_type = 'depthwise_conv2d' else: - l_type = 'conv2d' + self._l_type = 'conv2d' self._num_channels = num_channels if self._groups is None: diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 7dd8d38aa70efb48d0b37efa14dacf8e683b08f4..123f8a5876299b97b65efdb42f1d6cef3fec9ae1 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -62,9 +62,12 @@ def prepare_context(strategy=None): elif isinstance(place, core.XPUPlace): parallel_helper._set_parallel_ctx( core.BKCLParallelContext(strategy, place)) + elif isinstance(place, core.NPUPlace): + parallel_helper._set_parallel_ctx( + core.HCCLParallelContext(strategy, place)) else: # TODO(Yancey1989): add Gloo Parallel Context to support CPU parallel computation - assert ("Only support CUDAPlace or XPUPlace for now.") + assert ("Only support CUDAPlace or XPUPlace or NPUPlace for now.") parallel_helper._init_parallel_ctx() return strategy @@ -122,6 +125,9 @@ class ParallelEnv(object): elif core.is_compiled_with_xpu(): selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",") self._device_id = int(selected_xpus[0]) + elif core.is_compiled_with_npu(): + selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") + self._device_id = int(selected_npus[0]) self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(",") diff --git a/python/paddle/fluid/tests/unittests/npu/test_parallel_dygraph_mnist_npu.py b/python/paddle/fluid/tests/unittests/npu/test_parallel_dygraph_mnist_npu.py new file mode 100644 index 0000000000000000000000000000000000000000..1d09bd93e9b2214ce303c0c8af206acbb35ea3ee --- /dev/null +++ b/python/paddle/fluid/tests/unittests/npu/test_parallel_dygraph_mnist_npu.py @@ -0,0 +1,90 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest +sys.path.append("..") + +from test_dist_base import TestDistBase +import paddle.fluid as fluid + +flag_name = os.path.splitext(__file__)[0] +rank_table_file = b"""{ + "status": "completed", + "version": "1.0", + "server_count": "1", + "server_list": [ + { + "server_id": "127.0.0.1", + "device": [ + { + "device_id": "0", + "device_ip": "192.1.184.23", + "rank_id": "0" + }, + { + "device_id": "1", + "device_ip": "192.2.21.93", + "rank_id": "1" + } + ] + } + ] +}""" + +need_envs = { + "ASCEND_AICPU_PATH": + os.getenv("ASCEND_AICPU_PATH", "/usr/local/Ascend/nnae/latest"), + "ASCEND_OPP_PATH": + os.getenv("ASCEND_OPP_PATH", "/usr/local/Ascend/nnae/latest/opp"), + "HCCL_CONNECT_TIMEOUT": "7200", + "HCCL_WHITELIST_DISABLE": "1", + "HCCL_SECURITY_MODE": "1", + "RANK_TABLE_FILE": "rank_table_file.json", +} + + +class TestParallelDygraphMnistNPU(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._hccl_mode = True + self._dygraph = True + self._enforce_place = "NPU" + + def test_mnist(self): + with open("rank_table_file.json", "wb") as f: + f.write(rank_table_file) + if fluid.core.is_compiled_with_npu(): + self.check_with_place( + os.path.abspath('../parallel_dygraph_mnist.py'), + delta=1e-3, + check_error_log=True, + need_envs=need_envs, + log_name=flag_name) + + +class TestFleetDygraphMnistNPU(TestParallelDygraphMnistNPU): + def _setup_config(self): + self._sync_mode = False + self._hccl_mode = True + self._dygraph = True + self._enforce_place = "NPU" + self._use_fleet_api = True + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 0b8a80f0c837a4f66fdc678b635fb296a582138c..b1a5a5a95400a509af53854e0640279499875c98 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -551,6 +551,9 @@ class TestParallelDyGraphRunnerBase(object): elif fluid.core.is_compiled_with_xpu(): device_id = int(os.getenv("FLAGS_selected_xpus", "0")) place = fluid.XPUPlace(device_id) + elif fluid.core.is_compiled_with_npu(): + device_id = int(os.getenv("FLAGS_selected_npus", "0")) + place = fluid.NPUPlace(device_id) else: assert ("Only support CUDAPlace or XPUPlace or CPU(Gloo) for now.") @@ -564,7 +567,7 @@ class TestParallelDyGraphRunnerBase(object): nranks = len(args.endpoints.split(",")) if args.endpoints else 1 #if args.update_method == "nccl2": - if args.update_method == "nccl2" or args.update_method == "bkcl": + if args.update_method == "nccl2" or args.update_method == "bkcl" or args.update_method == "hccl": strategy = dygraph.parallel.ParallelStrategy() strategy.nranks = nranks strategy.local_rank = args.trainer_id @@ -671,12 +674,12 @@ class TestParallelDyGraphRunnerBase(object): strategy.find_unused_parameters = True # 3. init parallel env - if args.update_method == "nccl2" or "bkcl": + if args.update_method == "nccl2" or "bkcl" or "hccl": fleet.init(is_collective=True, strategy=strategy) # 4. train model model, train_reader, opt = self.get_model() - if args.update_method == "nccl2" or "bkcl": + if args.update_method == "nccl2" or "bkcl" or "hccl": opt = fleet.distributed_optimizer(opt) model = fleet.distributed_model(model) @@ -706,7 +709,8 @@ def runtime_main(test_class): type=str, default="local", choices=[ - "pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo" + "pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo", + "hccl" ]) parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) @@ -728,6 +732,7 @@ def runtime_main(test_class): parser.add_argument('--use_cpu', action='store_true') parser.add_argument('--use_xpu', action='store_true') parser.add_argument('--use_dgc', action='store_true') + parser.add_argument('--use_npu', action='store_true') parser.add_argument('--accumulate_gradient', action='store_true') parser.add_argument('--find_unused_parameters', action='store_true') parser.add_argument('--use_reduce', action='store_true') @@ -784,13 +789,21 @@ class TestDistBase(unittest.TestCase): self.__use_cuda = False self.__use_xpu = False self._use_dgc = False + self.__use_npu = False elif self._enforce_place == "GPU": self.__use_cuda = True self.__use_xpu = False + self.__use_npu = False elif self._enforce_place == "XPU": self.__use_cuda = False self.__use_xpu = True self._use_dgc = False + self.__use_npu = False + elif self._enforce_place == "NPU": + self.__use_cuda = False + self.__use_xpu = False + self._use_dgc = False + self.__use_npu = True else: if fluid.core.is_compiled_with_cuda(): self.__use_cuda = True @@ -815,6 +828,7 @@ class TestDistBase(unittest.TestCase): self._nccl2_mode = False self._bkcl_mode = False self._gloo_mode = False # now, support gloo backend + self._hccl_mode = False self._pipeline_mode = False self._mp_mode = False self._diff_batch = False @@ -953,6 +967,13 @@ class TestDistBase(unittest.TestCase): "PADDLE_TRAINERS_NUM": "1", "PADDLE_TRAINER_ID": "0" } + elif self.__use_npu: + cmd += " --use_npu" + env_local = { + "FLAGS_selected_npus": devices, + "PADDLE_TRAINERS_NUM": "1", + "PADDLE_TRAINER_ID": "0" + } else: env_local = {'CPU_NUM': '1'} @@ -1199,6 +1220,16 @@ class TestDistBase(unittest.TestCase): "PADDLE_CURRENT_ENDPOINT": ep, "GLOG_v": "2", }) + elif self.__use_npu: + tr_cmd += " --use_npu" + env.update({ + "FLAGS_selected_npus": "{}".format(trainer_id), + "PADDLE_TRAINERS_NUM": "{}".format(trainer_num), + "PADDLE_TRAINER_ID": "{}".format(trainer_id), + "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints, + "PADDLE_CURRENT_ENDPOINT": ep, + "GLOG_v": "2", + }) else: env.update({'CPU_NUM': '1'}) @@ -1471,6 +1502,13 @@ class TestDistBase(unittest.TestCase): update_method='gloo', check_error_log=check_error_log, log_name=log_name) + elif self._hccl_mode: + tr0_losses, tr1_losses = self._run_cluster_nccl2( + model_file, + required_envs, + update_method='hccl', + check_error_log=check_error_log, + log_name=log_name) elif self._pipeline_mode: tr0_losses, tr1_losses = self._run_pipeline(