未验证 提交 4da841e0 编写于 作者: S ShenLiang 提交者: GitHub

[DP] Construct reducer group (#39987)

* add reducer
上级 657dd5a9
cc_library(processgroup SRCS ProcessGroup.cc DEPS phi phi_api eager_api)
cc_library(eager_reducer SRCS reducer.cc DEPS eager_api processgroup)
if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/reducer.h"
#include "paddle/phi/common/data_type.h"
namespace paddle {
namespace distributed {
std::vector<std::vector<size_t>> Eager_AssignGroupBySize(
const std::vector<Tensor> tensors,
const std::vector<bool> &is_sparse_gradient,
const std::vector<size_t> &group_size_limits,
const std::vector<int64_t> &tensor_indices) {
PADDLE_ENFORCE_EQ(
tensors.size(), is_sparse_gradient.size(),
platform::errors::PreconditionNotMet(
"tensors len must be equal to is_sparse_gradient len, but "
"[%lu] != [%lu]",
tensors.size(), is_sparse_gradient.size()));
auto check_perm = [](const std::vector<int64_t> &x) -> bool {
size_t len = x.size();
std::vector<size_t> cnt(len, 0);
for (size_t i = 0; i < len; ++i) {
if (x[i] >= static_cast<int64_t>(len) || x[i] < 0 || cnt[x[i]]) {
return false;
}
cnt[x[i]]++;
}
return true;
};
PADDLE_ENFORCE_EQ(true, check_perm(tensor_indices),
platform::errors::PreconditionNotMet(
"tensor_indices must be a permutation from 0 to %lu",
tensor_indices.size()));
// the return vector
std::vector<std::vector<size_t>> res;
// Key: the var type
// Value: should use which index in group_size_limits for group size limit
std::map<experimental::DataType, size_t> group_limit_index;
// Key: the var type
// Value: <the var index in input tensors, total numel in this group>
std::map<experimental::DataType, std::pair<std::vector<size_t>, size_t>>
next_group;
for (size_t i = 0; i < tensors.size(); ++i) {
const auto &var = tensors[i];
size_t tensor_real_index = i;
if (!tensor_indices.empty()) {
tensor_real_index = tensor_indices[i];
}
if (is_sparse_gradient[tensor_real_index]) {
// we keep sparse var a single group
res.push_back({tensor_real_index});
continue;
}
const auto &var_dtype = var.dtype();
VLOG(3) << "var[" << var.name() << "] 's type is " << var_dtype;
auto &group_info = next_group[var_dtype];
int64_t var_size = -1;
if (var.is_dense_tensor()) {
var_size =
std::dynamic_pointer_cast<phi::DenseTensor>(var.impl())->numel();
} else {
VLOG(3) << "var " << var.name()
<< " is not tensor or selected_rows, so skip it";
continue;
}
group_info.first.push_back(tensor_real_index);
group_info.second += experimental::SizeOf(var_dtype) * var_size;
// group_info.second += framework::SizeOfType(var_dtype) * var_size;
if (group_limit_index.find(var_dtype) == group_limit_index.end()) {
// means it is the first var of var_dtype
group_limit_index[var_dtype] = 0;
}
auto &cur_limit_index = group_limit_index[var_dtype];
if (group_info.second >= group_size_limits[cur_limit_index]) {
// exceed group capacity and create a new group
res.emplace_back(std::move(group_info.first));
group_info = std::pair<std::vector<size_t>, size_t>();
cur_limit_index =
(std::min)(cur_limit_index + 1, group_size_limits.size() - 1);
}
}
// add the final groups
for (auto &e : next_group) {
auto &group_info = e.second;
if (!group_info.first.empty()) {
res.emplace_back(std::move(group_info.first));
}
}
for (const auto &group_index : res) {
PADDLE_ENFORCE_NE(
group_index.empty(), true,
platform::errors::PreconditionNotMet(
"AssignGroupBySize construct empty group, please check."));
}
if (tensor_indices.empty()) {
std::sort(res.begin(), res.end(),
[](const std::vector<size_t> &x, const std::vector<size_t> &y) {
return x.front() < y.front();
});
}
return res;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/eager/api/utils/tensor_utils.h"
namespace paddle {
namespace distributed {
using Tensor = paddle::experimental::Tensor;
std::vector<std::vector<size_t>> Eager_AssignGroupBySize(
const std::vector<Tensor>, const std::vector<bool>& is_sparse_gradient,
const std::vector<size_t>& group_size_limits,
const std::vector<int64_t>& tensor_indices = {});
} // namespace distributed
} // namespace paddle
......@@ -81,7 +81,7 @@ set(PYBIND_SRCS
cuda_streams_py.cc)
if(NOT ON_INFER)
set (PYBIND_DEPS ${PYBIND_DEPS} processgroup)
set (PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer)
if (WITH_NCCL)
set (PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl)
endif()
......
......@@ -23,6 +23,7 @@ limitations under the License. */
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/distributed/collective/reducer.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/imperative/layer.h"
......@@ -143,6 +144,19 @@ void BindDistributed(py::module *m) {
[](distributed::ProcessGroupStrategy &self, int nrings) {
self.nrings_ = nrings;
});
m->def("eager_assign_group_by_size",
[](py::handle py_tensors, std::vector<bool> is_sparse_gradient,
std::vector<size_t> group_size_limits,
std::vector<int64_t> tensor_indices) {
auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0);
return distributed::Eager_AssignGroupBySize(
tensors, is_sparse_gradient, group_size_limits, tensor_indices);
},
py::arg("tensors"), py::arg("is_sparse_gradient"),
py::arg("group_size_limits") = std::vector<size_t>{25 * 1024 * 1024},
py::arg("tensor_indices") = std::vector<int64_t>{},
py::call_guard<py::gil_scoped_release>());
}
} // end namespace pybind
......
......@@ -560,13 +560,19 @@ class DataParallel(layers.Layer):
strategy=None,
comm_buffer_size=25,
last_comm_buffer_size=1,
find_unused_parameters=False):
find_unused_parameters=False,
process_group=None,
gradient_as_buffer_view=False,
static_graph=False):
super(DataParallel,
self).__init__(layers.full_name() + "_data_parallel")
self._layers = layers
self.find_unused_parameters = find_unused_parameters
self.grad_need_sync = True
self.process_group = process_group
self.gradient_as_buffer_view = gradient_as_buffer_view
self.static_graph = static_graph
# NOTE(chenweihang): The ParallelStrategy here is not strictly a strategy.
# It just stores some environment variables, which can be constructed by
......
......@@ -26,159 +26,149 @@ import paddle.fluid.dygraph as dygraph
from paddle.fluid.dygraph.nn import Linear
import paddle.fluid.core as core
from paddle.fluid.optimizer import SGDOptimizer
class MLP(fluid.Layer):
def __init__(self, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(784, 10)
self._linear2 = Linear(10, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
return y
from paddle.fluid.framework import _test_eager_guard
class TestDataParallelGroup(unittest.TestCase):
def create_varbase(self, dtype, shape,
type=core.VarDesc.VarType.LOD_TENSOR):
return core.VarBase(dtype, shape, "", type, True)
def create_varbase(self, dtype, shape):
return paddle.rand(shape=shape, dtype=dtype)
def assign_group_by_size(self, *args):
return core.assign_group_by_size(*args)
def test_construct_group0(self):
# one dtype & one limit capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP32, [2, 100]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
res = core.assign_group_by_size(var_list, [False, False, False, False],
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 100]))
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 25]))
res = self.assign_group_by_size(var_list, [False, False, False, False],
[400])
self.assertEqual([[0], [1], [2], [3]], res)
def test_construct_group1(self):
# multi dtype & one limit capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
res = core.assign_group_by_size(
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [False, False, False, False, False, False], [400])
self.assertEqual([[0, 2], [1, 3], [4], [5]], res)
def test_construct_group2(self):
# one dtype & multi limit capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
res = core.assign_group_by_size(var_list, [False, False, False, False],
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 50]))
res = self.assign_group_by_size(var_list, [False, False, False, False],
[400, 800])
self.assertEqual([[0], [1, 2], [3]], res)
def test_construct_group3(self):
# multi dtype & multi limit capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
res = core.assign_group_by_size(
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [False, False, False, False, False, False], [200, 400])
self.assertEqual([[0], [1], [2, 4], [3, 5]], res)
def test_construct_group4(self):
# multi dtype & zero limit capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
res = core.assign_group_by_size(
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [False, False, False, False, False, False], [0])
self.assertEqual([[0], [1], [2], [3], [4], [5]], res)
def test_construct_group5(self):
# multi dtype & infinite capability
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
res = core.assign_group_by_size(
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [False, False, False, False, False, False], [10000])
self.assertEqual([[0, 2, 4], [1, 3, 5]], res)
def test_construct_group6(self):
# multi dtype & limit capability & multi tensor type
var_list = []
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP32, [1, 50],
core.VarDesc.VarType.SELECTED_ROWS))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP64, [1, 25],
core.VarDesc.VarType.SELECTED_ROWS))
res = core.assign_group_by_size(
var_list.append(self.create_varbase(
"float32",
[1, 50], ))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [True, False, False, False, False, True], [400])
self.assertEqual([[0], [1, 3], [2, 4], [5]], res)
def test_construct_group7(self):
# multi dtype & multi limit capability & multi tensor type
var_list = []
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP32, [1, 50],
core.VarDesc.VarType.SELECTED_ROWS))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP64, [1, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [1, 50]))
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP64, [1, 25],
core.VarDesc.VarType.SELECTED_ROWS))
res = core.assign_group_by_size(
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
var_list.append(self.create_varbase("float32", [1, 50]))
var_list.append(self.create_varbase("float64", [1, 25]))
res = self.assign_group_by_size(
var_list, [True, False, False, False, False, True], [200, 400])
self.assertEqual([[0], [1], [2], [3], [4], [5]], res)
def test_construct_group8(self):
# one dtype & one limit capability & have tensor_indices
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP32, [2, 100]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 50]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
res = core.assign_group_by_size(var_list, [False, False, False, False],
var_list.append(self.create_varbase("float32", [2, 25]))
var_list.append(self.create_varbase("float32", [2, 100]))
var_list.append(self.create_varbase("float32", [2, 50]))
var_list.append(self.create_varbase("float32", [2, 25]))
res = self.assign_group_by_size(var_list, [False, False, False, False],
[400], [3, 0, 1, 2])
self.assertEqual([[3, 0], [1], [2]], res)
def test_construct_group9(self):
# one dtype & one limit capability & have tensor_indices
var_list = []
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
var_list.append(self.create_varbase(core.VarDesc.VarType.FP32, [2, 25]))
var_list.append(
self.create_varbase(core.VarDesc.VarType.FP32, [2, 1000]))
res = core.assign_group_by_size(var_list, [False, False, False, True],
var_list.append(self.create_varbase("float32", [2, 25]))
var_list.append(self.create_varbase("float32", [2, 25]))
var_list.append(self.create_varbase("float32", [2, 25]))
var_list.append(self.create_varbase("float32", [2, 1000]))
res = self.assign_group_by_size(var_list, [False, False, False, True],
[300], [1, 0, 2, 3])
self.assertEqual([[1, 0], [3], [2]], res)
class TestDataParallelGroupEager(TestDataParallelGroup):
def create_varbase(self, dtype, shape):
with _test_eager_guard():
return paddle.rand(shape=shape, dtype=dtype)
def assign_group_by_size(self, *args):
return core.eager_assign_group_by_size(*args)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册