From 0cd21facc1c75e446b9d1f5e20d314e757c6c68d Mon Sep 17 00:00:00 2001 From: Roc <30228238+sljlp@users.noreply.github.com> Date: Mon, 19 Jul 2021 14:50:40 +0800 Subject: [PATCH] [NPU hybrid] Partial send /recv/ allgather for npu (#34189) --- .../collective/partial_allgather_op_npu.cc | 94 +++++++++++++++++++ .../collective/partial_recv_op_npu.cc | 87 +++++++++++++++++ .../collective/partial_send_op_npu.cc | 82 ++++++++++++++++ python/paddle/fluid/optimizer.py | 24 +++-- 4 files changed, 278 insertions(+), 9 deletions(-) create mode 100644 paddle/fluid/operators/collective/partial_allgather_op_npu.cc create mode 100644 paddle/fluid/operators/collective/partial_recv_op_npu.cc create mode 100644 paddle/fluid/operators/collective/partial_send_op_npu.cc diff --git a/paddle/fluid/operators/collective/partial_allgather_op_npu.cc b/paddle/fluid/operators/collective/partial_allgather_op_npu.cc new file mode 100644 index 00000000000..c984a83ab10 --- /dev/null +++ b/paddle/fluid/operators/collective/partial_allgather_op_npu.cc @@ -0,0 +1,94 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/partial_allgather_op.h" +#include + +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" + +namespace paddle { +namespace operators { + +template +class CallPartialGatherOpASCENDKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { +#if defined(PADDLE_WITH_ASCEND_CL) + auto in = ctx.Input("X"); + auto out = ctx.Output("Out"); + int64_t numel = in->numel(); + HcclDataType dtype = platform::ToHCCLDataType(in->type()); + + int rank = ctx.Attr("rank"); + int ring_id = ctx.Attr("ring_id"); + std::string group = + std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); + auto place = ctx.GetPlace(); + auto comm = platform::HCCLCommContext::Instance().Get(ring_id, place); + int nranks = comm->nranks(); + + PADDLE_ENFORCE_EQ(rank, comm->rank(), + platform::errors::InvalidArgument( + "rank: %s should equal to %s", rank, comm->rank())); + PADDLE_ENFORCE_EQ( + (numel % nranks), 0, + platform::errors::InvalidArgument( + "The input numel (%d) must be divisible by nranks(%d)", numel, + nranks)); + + framework::DDim dims = in->dims(); + out->mutable_data(dims, place); + + int64_t send_numel = numel / nranks; + int offset = send_numel * rank; + + void *send_buff = + reinterpret_cast(const_cast(in->data()) + offset); + void *recv_buff = reinterpret_cast(out->data()); + + aclrtStream stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + VLOG(3) << "begin hccl allgather, parameter is: " + << ", group is " << group << ", ring_id is " << ring_id + << ", nranks is " << nranks << ", rankid is " << rank; + + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclAllGather( + send_buff, recv_buff, send_numel, dtype, comm->comm(), + reinterpret_cast(stream))); + +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with NPU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_NPU_KERNEL(partial_allgather, + ops::CallPartialGatherOpASCENDKernel, + ops::CallPartialGatherOpASCENDKernel, + ops::CallPartialGatherOpASCENDKernel, + ops::CallPartialGatherOpASCENDKernel); diff --git a/paddle/fluid/operators/collective/partial_recv_op_npu.cc b/paddle/fluid/operators/collective/partial_recv_op_npu.cc new file mode 100644 index 00000000000..22ec33512de --- /dev/null +++ b/paddle/fluid/operators/collective/partial_recv_op_npu.cc @@ -0,0 +1,87 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/partial_recv_op.h" + +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" + +namespace paddle { +namespace operators { + +template +class PartialRecvOpASCENDKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_ASCEND_CL) + auto out = ctx.Output("Out"); + out->mutable_data(out->dims(), ctx.GetPlace()); + int num = ctx.Attr("num"); + int id = ctx.Attr("id"); + int recv_numel = out->numel() / num; + int offset = recv_numel * id; + + void* ptr = + reinterpret_cast(const_cast(out->data()) + offset); + int numel = recv_numel; + HcclDataType dtype = platform::ToHCCLDataType(out->type()); + + int ring_id = ctx.Attr("ring_id"); + + auto place = ctx.GetPlace(); + auto comm = + paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); + + aclrtStream stream = nullptr; + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + if (ctx.Attr("use_calc_stream")) { + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + int nranks = comm->nranks(); + int peer = ctx.Attr("peer"); + + PADDLE_ENFORCE_EQ(nranks, 2, platform::errors::InvalidArgument( + "The nranks must be 2, but (%d)", nranks)); + + int root = peer; + + VLOG(3) << "begin hccl recv, parameter is: " + << "ring_id:" << ring_id << ", nranks:" << nranks + << ", peer:" << peer << ", numel:" << numel << ", ptr:" << ptr + << ", dtype:" << dtype << ", root:" << root + << ", comm: " << comm->comm() << ", stream: " << stream; + + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclBroadcast( + ptr, numel, dtype, (uint32_t)root, comm->comm(), stream)); + +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with NPU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_NPU_KERNEL(partial_recv, ops::PartialRecvOpASCENDKernel, + ops::PartialRecvOpASCENDKernel, + ops::PartialRecvOpASCENDKernel, + ops::PartialRecvOpASCENDKernel); diff --git a/paddle/fluid/operators/collective/partial_send_op_npu.cc b/paddle/fluid/operators/collective/partial_send_op_npu.cc new file mode 100644 index 00000000000..acd2b83b0f4 --- /dev/null +++ b/paddle/fluid/operators/collective/partial_send_op_npu.cc @@ -0,0 +1,82 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/send_v2_op.h" + +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" + +namespace paddle { +namespace operators { + +template +class PartialSendOpASCENDKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_ASCEND_CL) + auto x = ctx.Input("X"); + int num = ctx.Attr("num"); + int id = ctx.Attr("id"); + int send_numel = x->numel() / num; + int offset = send_numel * id; + + void* ptr = reinterpret_cast(const_cast(x->data()) + offset); + int numel = send_numel; + HcclDataType dtype = platform::ToHCCLDataType(x->type()); + + int ring_id = ctx.Attr("ring_id"); + auto place = ctx.GetPlace(); + auto comm = + paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); + + aclrtStream stream = nullptr; + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + if (ctx.Attr("use_calc_stream")) { + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + int nranks = comm->nranks(); + int rank = comm->rank(); + + PADDLE_ENFORCE_EQ(nranks, 2, platform::errors::InvalidArgument( + "The nranks must be 2, but (%d)", nranks)); + + int root = rank; + + VLOG(3) << "begin hccl send, parameter is: " + << "root " << root << ", comm: " << comm->comm() + << ", stream: " << stream; + + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclBroadcast( + ptr, numel, dtype, (uint32_t)root, comm->comm(), stream)); + +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with NPU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_NPU_KERNEL(partial_send, ops::PartialSendOpASCENDKernel, + ops::PartialSendOpASCENDKernel, + ops::PartialSendOpASCENDKernel, + ops::PartialSendOpASCENDKernel); diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 9a551144148..b339d198a72 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4190,6 +4190,11 @@ class PipelineOptimizer(object): """ def __init__(self, optimizer, num_microbatches=1, start_cpu_core_id=0): + self._device = 'cpu' + if core.is_compiled_with_npu(): + self._device = "npu" + elif core.is_compiled_with_cuda(): + self._device = "gpu" if framework.in_dygraph_mode(): raise Exception("In dygraph, don't support PipelineOptimizer.") if not isinstance(optimizer, Optimizer) and not isinstance( @@ -4387,7 +4392,7 @@ class PipelineOptimizer(object): for op in block.ops: device = op.attr(self._op_device_key) # Copy ops whose op_device set to "gpu:all" to all sections. - if device == "gpu:all": + if device == f"{self._device}:all": for device in devices: program = device_program_map[device] op_desc = op.desc @@ -4539,7 +4544,7 @@ class PipelineOptimizer(object): if op.attr(self._op_role_key) == lrsched_role: # For LRSched ops, we should put them on all sub-programs to # make sure each sub-program update the lr correctly - op._set_attr(self._op_device_key, "gpu:all") + op._set_attr(self._op_device_key, f"{self._device}:all") # bugfix in hybrid parallelism elif op.type == "sum" and self._is_backward_op(op): # For sum ops that compute the sum of @RENAMED@ vars @@ -4606,10 +4611,10 @@ class PipelineOptimizer(object): op.type == 'fill_constant' or op.type == 'elementwise_max' or op.type == 'elementwise_div'): - device = "gpu:all" + device = f"{self._device}:all" op._set_attr(self._op_device_key, device) elif op.type == "alloc_float_status": - op._set_attr(self._op_device_key, "gpu:all") + op._set_attr(self._op_device_key, f"{self._device}:all") else: other_known_ops = [ 'update_loss_scaling', @@ -4623,7 +4628,7 @@ class PipelineOptimizer(object): "op_device set, they must be one of {}, but it " \ "is {}".format(other_known_ops, op.type) assert self._is_optimize_op(op) - op._set_attr(self._op_device_key, "gpu:all") + op._set_attr(self._op_device_key, f"{self._device}:all") def _add_op_device_attr(self, block): """ @@ -4638,7 +4643,7 @@ class PipelineOptimizer(object): # We use "gpu:all" to represent the op should be put on all # sub-programs, such as lr-related ops. Note that: "gpu:all" # is only used by pipeline as an indicator. - op._set_attr(self._op_device_key, "gpu:all") + op._set_attr(self._op_device_key, f"{self._device}:all") continue # op_device attribute has been set if self._get_op_device_attr(op): continue @@ -4691,7 +4696,7 @@ class PipelineOptimizer(object): device = op.attr(self._op_device_key) assert device, ("op_device attribute for op " "{} has not been set.".format(op.type)) - if device == "gpu:all" or device == "npu:all": continue + if device == f"{self._device}:all": continue dev_type = device.split(':')[0] stage_id = int(device.split(':')[1]) @@ -4745,7 +4750,7 @@ class PipelineOptimizer(object): for index, op in enumerate(list(block.ops)): cur_device = op.attr(self._op_device_key) - if cur_device == "gpu:all": continue + if cur_device == f"{self._device}:all": continue for var_name in op.input_arg_names: var = block.var(var_name) # skip data var @@ -4763,7 +4768,8 @@ class PipelineOptimizer(object): prev_device = prev_op.attr(self._op_device_key) \ if prev_op else None - if prev_device is None or prev_device == "gpu:all": continue + if prev_device is None or prev_device == f"{self._device}:all": + continue if prev_device == cur_device: continue -- GitLab