From 4da841e0caeb36b758039b4afa8758dd91d6252c Mon Sep 17 00:00:00 2001 From: ShenLiang <1422485404@qq.com> Date: Tue, 1 Mar 2022 10:53:16 +0800 Subject: [PATCH] [DP] Construct reducer group (#39987) * add reducer --- .../distributed/collective/CMakeLists.txt | 1 + .../fluid/distributed/collective/reducer.cc | 131 ++++++++++++++ paddle/fluid/distributed/collective/reducer.h | 32 ++++ paddle/fluid/pybind/CMakeLists.txt | 2 +- paddle/fluid/pybind/distributed_py.cc | 14 ++ python/paddle/fluid/dygraph/parallel.py | 8 +- .../tests/unittests/test_imperative_group.py | 168 ++++++++---------- 7 files changed, 265 insertions(+), 91 deletions(-) create mode 100644 paddle/fluid/distributed/collective/reducer.cc create mode 100644 paddle/fluid/distributed/collective/reducer.h diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index 41652f8b6ed..a5b40f8aa07 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -1,4 +1,5 @@ cc_library(processgroup SRCS ProcessGroup.cc DEPS phi phi_api eager_api) +cc_library(eager_reducer SRCS reducer.cc DEPS eager_api processgroup) if(WITH_NCCL) cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api) diff --git a/paddle/fluid/distributed/collective/reducer.cc b/paddle/fluid/distributed/collective/reducer.cc new file mode 100644 index 00000000000..59f3ea3b0a7 --- /dev/null +++ b/paddle/fluid/distributed/collective/reducer.cc @@ -0,0 +1,131 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/reducer.h" +#include "paddle/phi/common/data_type.h" + +namespace paddle { +namespace distributed { + +std::vector> Eager_AssignGroupBySize( + const std::vector tensors, + const std::vector &is_sparse_gradient, + const std::vector &group_size_limits, + const std::vector &tensor_indices) { + PADDLE_ENFORCE_EQ( + tensors.size(), is_sparse_gradient.size(), + platform::errors::PreconditionNotMet( + "tensors len must be equal to is_sparse_gradient len, but " + "[%lu] != [%lu]", + tensors.size(), is_sparse_gradient.size())); + auto check_perm = [](const std::vector &x) -> bool { + size_t len = x.size(); + std::vector cnt(len, 0); + for (size_t i = 0; i < len; ++i) { + if (x[i] >= static_cast(len) || x[i] < 0 || cnt[x[i]]) { + return false; + } + cnt[x[i]]++; + } + return true; + }; + + PADDLE_ENFORCE_EQ(true, check_perm(tensor_indices), + platform::errors::PreconditionNotMet( + "tensor_indices must be a permutation from 0 to %lu", + tensor_indices.size())); + // the return vector + std::vector> res; + + // Key: the var type + // Value: should use which index in group_size_limits for group size limit + std::map group_limit_index; + + // Key: the var type + // Value: + std::map, size_t>> + next_group; + + for (size_t i = 0; i < tensors.size(); ++i) { + const auto &var = tensors[i]; + + size_t tensor_real_index = i; + if (!tensor_indices.empty()) { + tensor_real_index = tensor_indices[i]; + } + + if (is_sparse_gradient[tensor_real_index]) { + // we keep sparse var a single group + res.push_back({tensor_real_index}); + continue; + } + + const auto &var_dtype = var.dtype(); + VLOG(3) << "var[" << var.name() << "] 's type is " << var_dtype; + auto &group_info = next_group[var_dtype]; + + int64_t var_size = -1; + + if (var.is_dense_tensor()) { + var_size = + std::dynamic_pointer_cast(var.impl())->numel(); + } else { + VLOG(3) << "var " << var.name() + << " is not tensor or selected_rows, so skip it"; + continue; + } + + group_info.first.push_back(tensor_real_index); + group_info.second += experimental::SizeOf(var_dtype) * var_size; + // group_info.second += framework::SizeOfType(var_dtype) * var_size; + + if (group_limit_index.find(var_dtype) == group_limit_index.end()) { + // means it is the first var of var_dtype + group_limit_index[var_dtype] = 0; + } + auto &cur_limit_index = group_limit_index[var_dtype]; + if (group_info.second >= group_size_limits[cur_limit_index]) { + // exceed group capacity and create a new group + res.emplace_back(std::move(group_info.first)); + group_info = std::pair, size_t>(); + cur_limit_index = + (std::min)(cur_limit_index + 1, group_size_limits.size() - 1); + } + } + + // add the final groups + for (auto &e : next_group) { + auto &group_info = e.second; + if (!group_info.first.empty()) { + res.emplace_back(std::move(group_info.first)); + } + } + + for (const auto &group_index : res) { + PADDLE_ENFORCE_NE( + group_index.empty(), true, + platform::errors::PreconditionNotMet( + "AssignGroupBySize construct empty group, please check.")); + } + if (tensor_indices.empty()) { + std::sort(res.begin(), res.end(), + [](const std::vector &x, const std::vector &y) { + return x.front() < y.front(); + }); + } + return res; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/reducer.h b/paddle/fluid/distributed/collective/reducer.h new file mode 100644 index 00000000000..f8c75385ef8 --- /dev/null +++ b/paddle/fluid/distributed/collective/reducer.h @@ -0,0 +1,32 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/fluid/eager/api/utils/tensor_utils.h" + +namespace paddle { +namespace distributed { +using Tensor = paddle::experimental::Tensor; + +std::vector> Eager_AssignGroupBySize( + const std::vector, const std::vector& is_sparse_gradient, + const std::vector& group_size_limits, + const std::vector& tensor_indices = {}); + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 1f06eda8a2e..c61e8212b02 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -81,7 +81,7 @@ set(PYBIND_SRCS cuda_streams_py.cc) if(NOT ON_INFER) - set (PYBIND_DEPS ${PYBIND_DEPS} processgroup) + set (PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer) if (WITH_NCCL) set (PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl) endif() diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index e057fb53cce..7b59188a9f3 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -23,6 +23,7 @@ limitations under the License. */ #include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/Types.h" +#include "paddle/fluid/distributed/collective/reducer.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/imperative/layer.h" @@ -143,6 +144,19 @@ void BindDistributed(py::module *m) { [](distributed::ProcessGroupStrategy &self, int nrings) { self.nrings_ = nrings; }); + + m->def("eager_assign_group_by_size", + [](py::handle py_tensors, std::vector is_sparse_gradient, + std::vector group_size_limits, + std::vector tensor_indices) { + auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); + return distributed::Eager_AssignGroupBySize( + tensors, is_sparse_gradient, group_size_limits, tensor_indices); + }, + py::arg("tensors"), py::arg("is_sparse_gradient"), + py::arg("group_size_limits") = std::vector{25 * 1024 * 1024}, + py::arg("tensor_indices") = std::vector{}, + py::call_guard()); } } // end namespace pybind diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index ddb86848f84..0049f387b70 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -560,13 +560,19 @@ class DataParallel(layers.Layer): strategy=None, comm_buffer_size=25, last_comm_buffer_size=1, - find_unused_parameters=False): + find_unused_parameters=False, + process_group=None, + gradient_as_buffer_view=False, + static_graph=False): super(DataParallel, self).__init__(layers.full_name() + "_data_parallel") self._layers = layers self.find_unused_parameters = find_unused_parameters self.grad_need_sync = True + self.process_group = process_group + self.gradient_as_buffer_view = gradient_as_buffer_view + self.static_graph = static_graph # NOTE(chenweihang): The ParallelStrategy here is not strictly a strategy. # It just stores some environment variables, which can be constructed by diff --git a/python/paddle/fluid/tests/unittests/test_imperative_group.py b/python/paddle/fluid/tests/unittests/test_imperative_group.py index f9635809651..89535797ed0 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_group.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_group.py @@ -26,159 +26,149 @@ import paddle.fluid.dygraph as dygraph from paddle.fluid.dygraph.nn import Linear import paddle.fluid.core as core from paddle.fluid.optimizer import SGDOptimizer - - -class MLP(fluid.Layer): - def __init__(self, param_attr=None, bias_attr=None): - super(MLP, self).__init__() - - self._linear1 = Linear(784, 10) - self._linear2 = Linear(10, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - return y +from paddle.fluid.framework import _test_eager_guard class TestDataParallelGroup(unittest.TestCase): - def create_varbase(self, dtype, shape, - type=core.VarDesc.VarType.LOD_TENSOR): - return core.VarBase(dtype, shape, "", type, True) + def create_varbase(self, dtype, shape): + return paddle.rand(shape=shape, dtype=dtype) + + def assign_group_by_size(self, *args): + return core.assign_group_by_size(*args) def test_construct_group0(self): # one dtype & one limit capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP32, [2, 100])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - res = core.assign_group_by_size(var_list, [False, False, False, False], + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 100])) + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 25])) + res = self.assign_group_by_size(var_list, [False, False, False, False], [400]) self.assertEqual([[0], [1], [2], [3]], res) def test_construct_group1(self): # multi dtype & one limit capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - res = core.assign_group_by_size( + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [False, False, False, False, False, False], [400]) self.assertEqual([[0, 2], [1, 3], [4], [5]], res) def test_construct_group2(self): # one dtype & multi limit capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - res = core.assign_group_by_size(var_list, [False, False, False, False], + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 50])) + res = self.assign_group_by_size(var_list, [False, False, False, False], [400, 800]) self.assertEqual([[0], [1, 2], [3]], res) def test_construct_group3(self): # multi dtype & multi limit capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - res = core.assign_group_by_size( + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [False, False, False, False, False, False], [200, 400]) self.assertEqual([[0], [1], [2, 4], [3, 5]], res) def test_construct_group4(self): # multi dtype & zero limit capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - res = core.assign_group_by_size( + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [False, False, False, False, False, False], [0]) self.assertEqual([[0], [1], [2], [3], [4], [5]], res) def test_construct_group5(self): # multi dtype & infinite capability var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - res = core.assign_group_by_size( + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [False, False, False, False, False, False], [10000]) self.assertEqual([[0, 2, 4], [1, 3, 5]], res) def test_construct_group6(self): # multi dtype & limit capability & multi tensor type var_list = [] - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP32, [1, 50], - core.VarDesc.VarType.SELECTED_ROWS)) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP64, [1, 25], - core.VarDesc.VarType.SELECTED_ROWS)) - res = core.assign_group_by_size( + var_list.append(self.create_varbase( + "float32", + [1, 50], )) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [True, False, False, False, False, True], [400]) self.assertEqual([[0], [1, 3], [2, 4], [5]], res) def test_construct_group7(self): # multi dtype & multi limit capability & multi tensor type var_list = [] - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP32, [1, 50], - core.VarDesc.VarType.SELECTED_ROWS)) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50])) - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP64, [1, 25], - core.VarDesc.VarType.SELECTED_ROWS)) - res = core.assign_group_by_size( + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + var_list.append(self.create_varbase("float32", [1, 50])) + var_list.append(self.create_varbase("float64", [1, 25])) + res = self.assign_group_by_size( var_list, [True, False, False, False, False, True], [200, 400]) self.assertEqual([[0], [1], [2], [3], [4], [5]], res) def test_construct_group8(self): # one dtype & one limit capability & have tensor_indices var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP32, [2, 100])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - res = core.assign_group_by_size(var_list, [False, False, False, False], + var_list.append(self.create_varbase("float32", [2, 25])) + var_list.append(self.create_varbase("float32", [2, 100])) + var_list.append(self.create_varbase("float32", [2, 50])) + var_list.append(self.create_varbase("float32", [2, 25])) + res = self.assign_group_by_size(var_list, [False, False, False, False], [400], [3, 0, 1, 2]) self.assertEqual([[3, 0], [1], [2]], res) def test_construct_group9(self): # one dtype & one limit capability & have tensor_indices var_list = [] - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25])) - var_list.append( - self.create_varbase(core.VarDesc.VarType.FP32, [2, 1000])) - res = core.assign_group_by_size(var_list, [False, False, False, True], + var_list.append(self.create_varbase("float32", [2, 25])) + var_list.append(self.create_varbase("float32", [2, 25])) + var_list.append(self.create_varbase("float32", [2, 25])) + var_list.append(self.create_varbase("float32", [2, 1000])) + res = self.assign_group_by_size(var_list, [False, False, False, True], [300], [1, 0, 2, 3]) self.assertEqual([[1, 0], [3], [2]], res) +class TestDataParallelGroupEager(TestDataParallelGroup): + def create_varbase(self, dtype, shape): + with _test_eager_guard(): + return paddle.rand(shape=shape, dtype=dtype) + + def assign_group_by_size(self, *args): + return core.eager_assign_group_by_size(*args) + + if __name__ == '__main__': unittest.main() -- GitLab