From d0a9393668e0e6e49cd7355e91cc2c4ea59db622 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Sun, 21 Jan 2018 16:31:57 +0800 Subject: [PATCH] Split SelectedRows to multiple pservers --- paddle/operators/recv_op.cc | 2 +- paddle/operators/split_selected_rows_op.cc | 21 ++---- paddle/operators/split_selected_rows_op.h | 68 +++++++++++++------ .../paddle/v2/fluid/distribute_transpiler.py | 32 ++++++--- .../tests/test_split_selected_rows_op.py | 41 +++++------ 5 files changed, 101 insertions(+), 63 deletions(-) diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 8d1479bdd6..93a7e84488 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -107,7 +107,7 @@ class RecvOp : public framework::OperatorBase { // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; - int64_t barrier_size = param_count * fan_in; + size_t barrier_size = param_count * fan_in; while (!exit_flag) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. diff --git a/paddle/operators/split_selected_rows_op.cc b/paddle/operators/split_selected_rows_op.cc index d9a023987b..0515ea13aa 100644 --- a/paddle/operators/split_selected_rows_op.cc +++ b/paddle/operators/split_selected_rows_op.cc @@ -23,8 +23,6 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker { : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "The input SelectedRows."); AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable(); - AddAttr>("rows_sections", "Rows section for output.") - .SetDefault(std::vector({})); AddAttr>("height_sections", "Height for each output SelectedRows.") .SetDefault(std::vector({})); @@ -35,16 +33,16 @@ height_sections is only needed when need to split the dims of the original tenso Example: Input: - X.rows = {0, 7, 5} + X.rows = {7, 5} X.height = 12 Attr: - rows_sections = {1, 2} - height_sections = {} + height_sections = {4, 8} Out: - out0.rows = {0} - out0.height = 12 - out1.rows = {7, 5} - out2.height = 12 + out0.rows = {} + out0.height = 4 + + out1.rows = {5, 7} + out2.height = 8 )DOC"); } @@ -61,11 +59,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel { std::vector height_sections = ctx->Attrs().Get>("height_sections"); - std::vector rows_sections = - ctx->Attrs().Get>("rows_sections"); - PADDLE_ENFORCE_EQ( - rows_sections.size(), ctx->Outputs("Out").size(), - "The size of rows section should be the same with Outputs size."); int64_t n = ctx->Outputs("Out").size(); std::vector outs_dims; diff --git a/paddle/operators/split_selected_rows_op.h b/paddle/operators/split_selected_rows_op.h index 1cae53f1af..12e64e2901 100644 --- a/paddle/operators/split_selected_rows_op.h +++ b/paddle/operators/split_selected_rows_op.h @@ -16,40 +16,70 @@ limitations under the License. */ #include #include "paddle/framework/op_registry.h" +#include "paddle/operators/math/selected_rows_functor.h" namespace paddle { namespace operators { +static int FindOutIdx(int row, const std::vector& height_sections) { + int offset = 0; + for (size_t i = 0; i < height_sections.size(); ++i) { + if (row >= offset && row < (offset + height_sections[i])) { + return i; + } + offset += height_sections[i]; + } + return -1; +} + template class SplitSelectedRowsOpKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { auto* x = ctx.Input("X"); auto outs = ctx.MultiOutput("Out"); - - auto rows_sections = ctx.Attr>("rows_sections"); auto height_sections = ctx.Attr>("height_sections"); - int64_t n = outs.size(); - int offset = 0; + auto x_rows = x->rows(); + std::vector> outs_rows_idx; + outs_rows_idx.resize(outs.size()); - for (int64_t i = 0; i < n; ++i) { - framework::Vector out_rows; - for (int64_t j = 0; j < rows_sections[i]; ++j) { - out_rows.push_back(x->rows()[offset + j]); - } + auto row_numel = x->value().numel() / x->value().dims()[0]; + auto src = x->value().data(); + + for (size_t i = 0; i < x_rows.size(); ++i) { + int out_idx = FindOutIdx(x_rows[i], height_sections); + outs_rows_idx[out_idx].push_back(i); + } + auto place = ctx.GetPlace(); - auto& out = outs[i]; - auto x_dims = x->GetCompleteDims(); - x_dims[0] = rows_sections[i]; - out->mutable_value()->mutable_data(x_dims, ctx.GetPlace()); - framework::Copy(x->value().Slice(offset, rows_sections[i] + offset), - x->place(), ctx.device_context(), out->mutable_value()); - outs[i]->set_rows(out_rows); - if (height_sections.size()) { - outs[i]->set_height(height_sections[i]); + for (size_t i = 0; i < outs_rows_idx.size(); ++i) { + auto rows_idx = outs_rows_idx[i]; + if (rows_idx.size() > 0) { + auto dims = x->GetCompleteDims(); + dims[0] = rows_idx.size(); + outs[i]->mutable_value()->mutable_data(dims, x->place()); + for (auto idx : rows_idx) { + outs[i]->mutable_rows()->push_back(x_rows[idx]); + } + auto dst = outs[i]->mutable_value()->mutable_data(ctx.GetPlace()); + for (size_t j = 0; j < rows_idx.size(); j++) { + if (platform::is_cpu_place(place)) { + memory::Copy(platform::CPUPlace(), dst + j * row_numel, + platform::CPUPlace(), src + rows_idx[j] * row_numel, + sizeof(T) * row_numel); + } else { +#ifdef PADDLE_WITH_CUDA + auto stream = ctx.cuda_device_context().stream(); + memory::Copy(platform::CUDAPlace(), dst + j * row_numel, + platform::CUDAPlace(), src + rows_idx[j] * row_numel, + sizeof(T) * row_numel, stream); +#else + PADDLE_THROW("Paddle is not compiled with GPU"); +#endif + } + } } - offset += rows_sections[i]; } } }; diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index bd957f88de..5d42725e80 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -18,6 +18,7 @@ import optimizer from layer_helper import LayerHelper from distributed_spliter import * import math +from . import core class VarBlock: @@ -216,15 +217,28 @@ class DistributeTranspiler: if len(splited_vars) <= 1: continue orig_var = program.global_block().vars[varname] - sections = [] - for v in splited_vars: - sections.append(v.shape[0]) - program.global_block().append_op( - type="split", - inputs={"X": orig_var}, - outputs={"Out": splited_vars}, - attrs={"sections": sections} # assume split evenly - ) + if orig_var == core.VarDesc.VarType.SELECTED_ROWS: + height_sections = [] + for v in splited_vars: + height_sections.append(v.shape[0]) + program.global_block().append_op( + type="split_selected_rows", + inputs={"X": orig_var}, + outputs={"Out": splited_vars}, + attrs={"height_sections": height_sections}) + elif orig_var == core.VarDesc.VarType.LOD_TENSOR: + sections = [] + for v in splited_vars: + sections.append(v.shape[0]) + program.global_block().append_op( + type="split", + inputs={"X": orig_var}, + outputs={"Out": splited_vars}, + attrs={"sections": sections} # assume split evenly + ) + else: + AssertionError("Variable type should be in set " + "[LOD_TENSOR, SELECTED_ROWS]") return var_mapping def get_trainer_program(self): diff --git a/python/paddle/v2/fluid/tests/test_split_selected_rows_op.py b/python/paddle/v2/fluid/tests/test_split_selected_rows_op.py index a6cc4f6c6d..7a18dd8ad6 100644 --- a/python/paddle/v2/fluid/tests/test_split_selected_rows_op.py +++ b/python/paddle/v2/fluid/tests/test_split_selected_rows_op.py @@ -34,8 +34,8 @@ class TestSpliteSelectedRows(unittest.TestCase): def check_with_place(self, place): scope = core.Scope() - rows = [0, 5, 7, 4] - height = 10 + rows = [0, 5, 7, 4, 20] + height = 20 row_numel = 2 # initialize input variable X @@ -45,38 +45,41 @@ class TestSpliteSelectedRows(unittest.TestCase): np_array = np.ones((len(rows), row_numel)).astype("float32") np_array[0, 0] = 2.0 np_array[2, 1] = 4.0 + np_array[4, 1] = 8.0 x_tensor = x.get_tensor() x_tensor.set(np_array, place) - rows_sections = [2, 2] - height_sections = [] + height_sections = [5, 5, 5, 5, 3] # initialize output variables [out0, out1] - out0 = scope.var('out0').get_selected_rows() - out1 = scope.var('out1').get_selected_rows() + outs_name = ["out%d" % i for i in xrange(len(height_sections))] + outs = [ + scope.var(var_name).get_selected_rows() for var_name in outs_name + ] # expected output selected rows - expected_out0_rows = [0, 5] - expected_out1_rows = [7, 4] - expected_height = height + expected_out0_rows = [0, 4] + expected_out1_rows = [5, 7] + expected_out4_rows = [20] op = Operator( "split_selected_rows", X="X", - Out=["out0", "out1"], - rows_sections=rows_sections, + Out=outs_name, height_sections=height_sections) op.run(scope, place) - self.assertEqual(out0.rows(), expected_out0_rows) - self.assertEqual(out1.rows(), expected_out1_rows) + self.assertEqual(outs[0].rows(), expected_out0_rows) + self.assertEqual(outs[1].rows(), expected_out1_rows) + self.assertEqual(outs[4].rows(), expected_out4_rows) - self.assertEqual(out0.height(), expected_height) - self.assertEqual(out1.height(), expected_height) + self.assertEqual(outs[0].height(), height_sections[0]) + self.assertEqual(outs[4].height(), height_sections[4]) - self.assertAlmostEqual(2.0, np.array(out0.get_tensor())[0, 0]) - self.assertAlmostEqual(4.0, np.array(out1.get_tensor())[0, 1]) + self.assertAlmostEqual(2.0, np.array(outs[0].get_tensor())[0, 0]) + self.assertAlmostEqual(4.0, np.array(outs[1].get_tensor())[1, 1]) + self.assertAlmostEqual(8.0, np.array(outs[4].get_tensor())[0, 1]) def check_grad_with_place(self, place): scope = core.Scope() @@ -84,8 +87,7 @@ class TestSpliteSelectedRows(unittest.TestCase): row_numel = 2 # attr - rows_sections = [2, 2] - height_sections = [] + height_sections = [5, 5] # initialize input variable X out0_grad = scope.var("out0@GRAD").get_selected_rows() @@ -112,7 +114,6 @@ class TestSpliteSelectedRows(unittest.TestCase): "sum", X=["out0@GRAD", "out1@GRAD"], Out="X@GRAD", - rows_sections=rows_sections, height_sections=height_sections) grad_op.run(scope, place) -- GitLab