From e92f770c425d7ebf0056b98a59756a321d8ddb88 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Fri, 21 Aug 2020 08:04:35 +0800 Subject: [PATCH] Add collective ops (reduce) (#26340) --- .../operators/collective/c_reduce_max_op.cc | 39 +++++ .../collective/c_reduce_max_op.cu.cc | 25 +++ .../operators/collective/c_reduce_min_op.cc | 39 +++++ .../collective/c_reduce_min_op.cu.cc | 25 +++ .../fluid/operators/collective/c_reduce_op.h | 151 ++++++++++++++++++ .../operators/collective/c_reduce_prod_op.cc | 39 +++++ .../collective/c_reduce_prod_op.cu.cc | 25 +++ .../operators/collective/c_reduce_sum_op.cc | 39 +++++ .../collective/c_reduce_sum_op.cu.cc | 25 +++ .../operators/collective/c_scatter_op.cc | 92 +++++++++++ .../operators/collective/c_scatter_op.cu.cc | 101 ++++++++++++ .../fluid/operators/collective/c_scatter_op.h | 39 +++++ .../fluid/tests/unittests/CMakeLists.txt | 2 + .../tests/unittests/collective_reduce_op.py | 70 ++++++++ .../collective_reduce_op_calc_stream.py | 73 +++++++++ .../tests/unittests/collective_scatter_op.py | 71 ++++++++ .../tests/unittests/test_collective_base.py | 9 ++ .../tests/unittests/test_collective_reduce.py | 34 ++++ .../unittests/test_collective_scatter.py | 31 ++++ 19 files changed, 929 insertions(+) create mode 100644 paddle/fluid/operators/collective/c_reduce_max_op.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_max_op.cu.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_min_op.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_min_op.cu.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_op.h create mode 100644 paddle/fluid/operators/collective/c_reduce_prod_op.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_prod_op.cu.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_sum_op.cc create mode 100644 paddle/fluid/operators/collective/c_reduce_sum_op.cu.cc create mode 100644 paddle/fluid/operators/collective/c_scatter_op.cc create mode 100644 paddle/fluid/operators/collective/c_scatter_op.cu.cc create mode 100644 paddle/fluid/operators/collective/c_scatter_op.h create mode 100644 python/paddle/fluid/tests/unittests/collective_reduce_op.py create mode 100644 python/paddle/fluid/tests/unittests/collective_reduce_op_calc_stream.py create mode 100644 python/paddle/fluid/tests/unittests/collective_scatter_op.py create mode 100644 python/paddle/fluid/tests/unittests/test_collective_reduce.py create mode 100644 python/paddle/fluid/tests/unittests/test_collective_scatter.py diff --git a/paddle/fluid/operators/collective/c_reduce_max_op.cc b/paddle/fluid/operators/collective/c_reduce_max_op.cc new file mode 100644 index 00000000000..42535187768 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_max_op.cc @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace paddle { +namespace operators { + +class CReduceMaxOpMaker : public CReduceOpMaker { + protected: + std::string GetName() const override { return "Max"; } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_WITHOUT_GRADIENT(c_reduce_max, ops::CReduceOp, + ops::CReduceMaxOpMaker); + +REGISTER_OP_CPU_KERNEL(c_reduce_max, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel); diff --git a/paddle/fluid/operators/collective/c_reduce_max_op.cu.cc b/paddle/fluid/operators/collective/c_reduce_max_op.cu.cc new file mode 100644 index 00000000000..7e260346b4b --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_max_op.cu.cc @@ -0,0 +1,25 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_CUDA_KERNEL(c_reduce_max, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel) diff --git a/paddle/fluid/operators/collective/c_reduce_min_op.cc b/paddle/fluid/operators/collective/c_reduce_min_op.cc new file mode 100644 index 00000000000..8e849641e63 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_min_op.cc @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace paddle { +namespace operators { + +class CReduceMinOpMaker : public CReduceOpMaker { + protected: + std::string GetName() const override { return "Min"; } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_WITHOUT_GRADIENT(c_reduce_min, ops::CReduceOp, + ops::CReduceMinOpMaker); + +REGISTER_OP_CPU_KERNEL(c_reduce_min, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel); diff --git a/paddle/fluid/operators/collective/c_reduce_min_op.cu.cc b/paddle/fluid/operators/collective/c_reduce_min_op.cu.cc new file mode 100644 index 00000000000..77a75ed0b7a --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_min_op.cu.cc @@ -0,0 +1,25 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_CUDA_KERNEL(c_reduce_min, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel) diff --git a/paddle/fluid/operators/collective/c_reduce_op.h b/paddle/fluid/operators/collective/c_reduce_op.h new file mode 100644 index 00000000000..7474a6a7c27 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_op.h @@ -0,0 +1,151 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/ddim.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" + +#if defined(PADDLE_WITH_NCCL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/nccl_helper.h" +#endif + +namespace paddle { +namespace operators { + +enum ReduceType { kRedSum, kRedMax, kRedMin, kRedProd }; + +class CReduceOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext* ctx) const override { + ctx->SetOutputDim("Out", ctx->GetInputDim("X")); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace()); + } +}; + +template +class CReduceOpCPUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + PADDLE_ENFORCE_EQ( + true, false, + platform::errors::Unavailable("Unimplemented CReduceOpCPUKernel now.")); + } +}; + +template +class CReduceOpCUDAKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_NCCL) + auto in = ctx.Input("X"); + auto out = ctx.Output("Out"); + + auto place = ctx.GetPlace(); + ncclDataType_t dtype = platform::ToNCCLDataType(in->type()); + int64_t numel = in->numel(); + const void* sendbuff = in->data(); + out->Resize(in->dims()); + void* recvbuff = out->mutable_data(place); + + int rid = ctx.Attr("ring_id"); + int root = ctx.Attr("root_id"); + auto comm = platform::NCCLCommContext::Instance().Get(rid, place); + + cudaStream_t stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + ncclRedOp_t nccl_red_type = ncclSum; + switch (red_type) { + case kRedSum: + nccl_red_type = ncclSum; + break; + + case kRedMax: + nccl_red_type = ncclMax; + break; + + case kRedMin: + nccl_red_type = ncclMin; + break; + + case kRedProd: + nccl_red_type = ncclProd; + break; + + default: + PADDLE_ENFORCE_EQ(true, false, platform::errors::InvalidArgument( + "red_type must be one of kRedSum, " + "kRedMax, kRedMin, kRedProd.")); + } + + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclReduce( + sendbuff, recvbuff, numel, dtype, nccl_red_type, root, comm->comm(), + stream)); +#else + PADDLE_ENFORCE_EQ(true, false, + platform::errors::Unavailable( + "PaddlePaddle should compile with GPU..")); +#endif + } +}; + +class CReduceOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() { + AddInput("X", "(Tensor), tensor to be reduced."); + AddOutput("Out", "(Tensor) the reduced result."); + AddAttr("ring_id", "(int default 0) communication ring id.") + .SetDefault(0); + AddAttr("root_id", "(int default 0) root id.").SetDefault(0); + AddAttr( + "use_calc_stream", + "(bool default false) eject CUDA operations to calculation stream.") + .SetDefault(false); + AddComment(string::Sprintf(R"DOC( +CReduce %s Operator + +Call collective Reduce with reduce type %s. If input and output are +the same variable, in-place reduce will be used. +)DOC", + GetName(), GetName())); + } + + protected: + virtual std::string GetName() const = 0; +}; + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/collective/c_reduce_prod_op.cc b/paddle/fluid/operators/collective/c_reduce_prod_op.cc new file mode 100644 index 00000000000..64935df856e --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_prod_op.cc @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace paddle { +namespace operators { + +class CReduceProdOpMaker : public CReduceOpMaker { + protected: + std::string GetName() const override { return "Prod"; } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_WITHOUT_GRADIENT(c_reduce_prod, ops::CReduceOp, + ops::CReduceProdOpMaker); + +REGISTER_OP_CPU_KERNEL(c_reduce_prod, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel) diff --git a/paddle/fluid/operators/collective/c_reduce_prod_op.cu.cc b/paddle/fluid/operators/collective/c_reduce_prod_op.cu.cc new file mode 100644 index 00000000000..07e431f7bc8 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_prod_op.cu.cc @@ -0,0 +1,25 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_CUDA_KERNEL(c_reduce_prod, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel) diff --git a/paddle/fluid/operators/collective/c_reduce_sum_op.cc b/paddle/fluid/operators/collective/c_reduce_sum_op.cc new file mode 100644 index 00000000000..3e20cee7e18 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_sum_op.cc @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace paddle { +namespace operators { + +class CReduceSumOpMaker : public CReduceOpMaker { + protected: + std::string GetName() const override { return "Sum"; } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_WITHOUT_GRADIENT(c_reduce_sum, ops::CReduceOp, + ops::CReduceSumOpMaker); + +REGISTER_OP_CPU_KERNEL(c_reduce_sum, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel, + ops::CReduceOpCPUKernel) diff --git a/paddle/fluid/operators/collective/c_reduce_sum_op.cu.cc b/paddle/fluid/operators/collective/c_reduce_sum_op.cu.cc new file mode 100644 index 00000000000..d9826422c16 --- /dev/null +++ b/paddle/fluid/operators/collective/c_reduce_sum_op.cu.cc @@ -0,0 +1,25 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_reduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_CUDA_KERNEL(c_reduce_sum, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel, + ops::CReduceOpCUDAKernel) diff --git a/paddle/fluid/operators/collective/c_scatter_op.cc b/paddle/fluid/operators/collective/c_scatter_op.cc new file mode 100644 index 00000000000..908708e6e32 --- /dev/null +++ b/paddle/fluid/operators/collective/c_scatter_op.cc @@ -0,0 +1,92 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_scatter_op.h" + +namespace paddle { +namespace operators { + +class CScatterOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext* ctx) const override { + OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "CScatter"); + OP_INOUT_CHECK(ctx->HasOutput("Out"), "Output", "Out", "CScatter"); + int root_id = ctx->Attrs().Get("root"); + int ring_id = ctx->Attrs().Get("ring_id"); + int nranks = ctx->Attrs().Get("nranks"); + PADDLE_ENFORCE_GE(nranks, 2, + platform::errors::InvalidArgument( + "The number of ranks (%d) must be greater than 1 " + "to use collective op (c_scatter op).", + nranks)); + PADDLE_ENFORCE_GE( + root_id, 0, + platform::errors::InvalidArgument( + "The root_id (%d) for c_scatter_op must be non-negative.", + root_id)); + PADDLE_ENFORCE_GE( + ring_id, 0, + platform::errors::InvalidArgument( + "The ring_id (%d) for c_scatter_op must be non-negative.", + root_id)); + framework::DDim dim = ctx->GetInputDim("X"); + dim[0] = dim[0] / nranks; + if (dim[0] < 0) dim[0] = -1; + ctx->SetOutputDim("Out", dim); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace()); + } +}; + +class CScatterOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() { + AddInput("X", "(Tensor) tensor to be broadcasted."); + AddOutput("Out", "(Tensor) the result of broadcast."); + AddAttr("ring_id", "(int default 0) nccl communication ring id.") + .SetDefault(0); + AddAttr("root", "(int default 0) root id for broadcasting.") + .SetDefault(0); + AddAttr("nranks", "(int default 1) number of ranks.").SetDefault(0); + AddAttr( + "use_calc_stream", + "(bool default false) eject CUDA operations to calculation stream.") + .SetDefault(false); + AddComment(R"DOC( +CScatter Operator +Scatter the source to all participators. +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_WITHOUT_GRADIENT(c_scatter, ops::CScatterOp, ops::CScatterOpMaker); + +REGISTER_OP_CPU_KERNEL(c_scatter, ops::CScatterOpCPUKernel, + ops::CScatterOpCPUKernel, + ops::CScatterOpCPUKernel, + ops::CScatterOpCPUKernel, + ops::CScatterOpCPUKernel); diff --git a/paddle/fluid/operators/collective/c_scatter_op.cu.cc b/paddle/fluid/operators/collective/c_scatter_op.cu.cc new file mode 100644 index 00000000000..c5cd32ef07a --- /dev/null +++ b/paddle/fluid/operators/collective/c_scatter_op.cu.cc @@ -0,0 +1,101 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_scatter_op.h" + +#if defined(PADDLE_WITH_NCCL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/nccl_helper.h" +#endif + +namespace paddle { +namespace operators { + +template +class CScatterOpCUDAKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_NCCL) + auto x = ctx.Input("X"); + auto out = ctx.Output("Out"); + int numel = x->numel(); + ncclDataType_t dtype = platform::ToNCCLDataType(x->type()); + + int nranks = ctx.Attr("nranks"); + int root_id = ctx.Attr("root"); + int ring_id = ctx.Attr("ring_id"); + auto place = ctx.GetPlace(); + auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); + PADDLE_ENFORCE_EQ(nranks, comm->nranks(), + platform::errors::InvalidArgument( + "The number of ranks (%d) you set of must " + "be equal to comm->nranks (%d).", + nranks, comm->nranks())); + PADDLE_ENFORCE_GE( + root_id, 0, + platform::errors::InvalidArgument( + "The root_id (%d) for c_scatter_op must be non-negative.", + root_id)); + PADDLE_ENFORCE_GE( + ring_id, 0, + platform::errors::InvalidArgument( + "The ring_id (%d) for c_scatter_op must be non-negative.", + ring_id)); + + cudaStream_t stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + framework::DDim x_dims = x->dims(); + framework::DDim out_dims(x_dims); + framework::Tensor temp; + auto in_data_ptr = x->data(); + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBroadcast( + reinterpret_cast(in_data_ptr), + temp.mutable_data(out_dims, place), numel, dtype, root_id, + comm->comm(), stream)); + VLOG(3) << "rank " << comm->rank() << " invoke Scatter."; + + out_dims[0] = out_dims[0] / nranks; + auto start_index = out_dims[0] * comm->rank(); + auto end_index = start_index + out_dims[0]; + temp = temp.Slice(start_index, end_index); + temp.Resize(out_dims); + out->mutable_data(out_dims, place); + framework::TensorCopySync(*static_cast(&temp), + place, static_cast(out)); + out->Resize(out_dims); +#else + PADDLE_ENFORCE_EQ( + true, false, + platform::errors::Unavailable("PaddlePaddle should compile with GPU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_CUDA_KERNEL(c_scatter, ops::CScatterOpCUDAKernel, + ops::CScatterOpCUDAKernel, + ops::CScatterOpCUDAKernel, + ops::CScatterOpCUDAKernel, + ops::CScatterOpCUDAKernel); diff --git a/paddle/fluid/operators/collective/c_scatter_op.h b/paddle/fluid/operators/collective/c_scatter_op.h new file mode 100644 index 00000000000..6aba3dc5858 --- /dev/null +++ b/paddle/fluid/operators/collective/c_scatter_op.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace operators { + +template +class CScatterOpCPUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + PADDLE_ENFORCE_EQ(true, false, + platform::errors::Unavailable( + "Unimplemented cpu kernel for CScatterOp.")); + } +}; + +} // namespace operators +} // namespace paddle diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 6e51ecac85b..c565f8da4fa 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -55,6 +55,8 @@ if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_allgather) LIST(REMOVE_ITEM TEST_OPS test_allreduce) LIST(REMOVE_ITEM TEST_OPS test_broadcast) + LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) + LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) LIST(REMOVE_ITEM TEST_OPS test_reducescatter) LIST(REMOVE_ITEM TEST_OPS test_reducescatter_api) endif() diff --git a/python/paddle/fluid/tests/unittests/collective_reduce_op.py b/python/paddle/fluid/tests/unittests/collective_reduce_op.py new file mode 100644 index 00000000000..da61284344b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective_reduce_op.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base import TestCollectiveRunnerBase, runtime_main + + +class TestCollectiveReduce(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program): + ring_id = 0 + rootid = 1 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + toutdata = main_prog.current_block().create_var( + name="outofreduce", + dtype='float32', + type=core.VarDesc.VarType.LOD_TENSOR, + persistable=False, + stop_gradient=False) + main_prog.global_block().append_op( + type="c_reduce_sum", + inputs={'X': tindata}, + attrs={'ring_id': ring_id, + 'root_id': rootid}, + outputs={'Out': toutdata}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return toutdata + + +if __name__ == "__main__": + runtime_main(TestCollectiveReduce, "reduce", 0) diff --git a/python/paddle/fluid/tests/unittests/collective_reduce_op_calc_stream.py b/python/paddle/fluid/tests/unittests/collective_reduce_op_calc_stream.py new file mode 100644 index 00000000000..7e690428623 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective_reduce_op_calc_stream.py @@ -0,0 +1,73 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base import TestCollectiveRunnerBase, runtime_main + + +class TestCollectiveReduce(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program): + ring_id = 0 + rootid = 1 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + toutdata = main_prog.current_block().create_var( + name="outofreduce", + dtype='float32', + type=core.VarDesc.VarType.LOD_TENSOR, + persistable=False, + stop_gradient=False) + main_prog.global_block().append_op( + type="c_reduce_sum", + inputs={'X': tindata}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True, + 'root_id': rootid + }, + outputs={'Out': toutdata}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return toutdata + + +if __name__ == "__main__": + runtime_main(TestCollectiveReduce, "reduce", 0) diff --git a/python/paddle/fluid/tests/unittests/collective_scatter_op.py b/python/paddle/fluid/tests/unittests/collective_scatter_op.py new file mode 100644 index 00000000000..efe5e17bcce --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective_scatter_op.py @@ -0,0 +1,71 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base import TestCollectiveRunnerBase, runtime_main + + +class TestCollectiveScatter(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program): + ring_id = 0 + rootid = 1 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + toutdata = main_prog.current_block().create_var( + name="outofreduce", + dtype='float32', + type=core.VarDesc.VarType.LOD_TENSOR, + persistable=False, + stop_gradient=False) + main_prog.global_block().append_op( + type="c_scatter", + inputs={'X': tindata}, + attrs={'ring_id': ring_id, + 'root': rootid, + 'nranks': 2}, + outputs={'Out': toutdata}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return toutdata + + +if __name__ == "__main__": + runtime_main(TestCollectiveScatter, "scatter", 0) diff --git a/python/paddle/fluid/tests/unittests/test_collective_base.py b/python/paddle/fluid/tests/unittests/test_collective_base.py index 3f3a5642abc..512b2967e02 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_base.py @@ -241,6 +241,15 @@ class TestDistBase(unittest.TestCase): need_result = input2 self.assertTrue(np.allclose(tr0_out, need_result)) self.assertTrue(np.allclose(tr1_out, need_result)) + elif col_type == "reduce": + need_result = input1 + input2 + self.assertTrue(np.allclose(tr1_out, need_result)) + elif col_type == "scatter": + need_result = input2 + need_result1 = need_result[0:need_result.shape[0] // 2] + need_result2 = need_result[need_result.shape[0] // 2:] + self.assertTrue(np.allclose(tr0_out, need_result1)) + self.assertTrue(np.allclose(tr1_out, need_result2)) elif col_type == "allreduce": need_result = input1 + input2 self.assertTrue( diff --git a/python/paddle/fluid/tests/unittests/test_collective_reduce.py b/python/paddle/fluid/tests/unittests/test_collective_reduce.py new file mode 100644 index 00000000000..36837d6a227 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_collective_reduce.py @@ -0,0 +1,34 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np + +from test_collective_base import TestDistBase + + +class TestCReduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_reduce(self): + self.check_with_place("collective_reduce_op.py", "reduce") + + def test_reduce_calc_stream(self): + self.check_with_place("collective_reduce_op_calc_stream.py", "reduce") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_collective_scatter.py b/python/paddle/fluid/tests/unittests/test_collective_scatter.py new file mode 100644 index 00000000000..7fe3ce73359 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_collective_scatter.py @@ -0,0 +1,31 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np + +from test_collective_base import TestDistBase + + +class TestCScatterOp(TestDistBase): + def _setup_config(self): + pass + + def test_scatter(self): + self.check_with_place("collective_scatter_op.py", "scatter") + + +if __name__ == '__main__': + unittest.main() -- GitLab