From b8a17987ec92d413a403a00117ce495d22ddd13f Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Thu, 18 Jan 2018 06:37:51 +0800 Subject: [PATCH] Feature/parallel for bug fix (#7474) * Fix ParallelDo not support empty input gradient * Polish ParallelDo and fix several bugs * Fix CI * Fix CI --- paddle/framework/lod_tensor.cc | 46 +++--- paddle/operators/parallel_do_op.cc | 135 +++++++++--------- paddle/string/to_string.h | 11 ++ .../paddle/v2/fluid/tests/test_parallel_op.py | 28 ++-- 4 files changed, 128 insertions(+), 92 deletions(-) diff --git a/paddle/framework/lod_tensor.cc b/paddle/framework/lod_tensor.cc index 9a115bb60..3e239e991 100644 --- a/paddle/framework/lod_tensor.cc +++ b/paddle/framework/lod_tensor.cc @@ -291,23 +291,32 @@ std::vector LoDTensor::SplitLoDTensor( const std::vector places) const { check_memory_size(); PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now"); - PADDLE_ENFORCE(dims()[0] % places.size() == 0, - "Batch size should be divided by places size"); - - std::vector lods; - for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { - int begin = place_idx * dims()[0] / places.size(); - int end = (place_idx + 1) * dims()[0] / places.size(); + size_t result_size = std::min(static_cast(dims()[0]), places.size()); + size_t remainder = dims()[0] % places.size(); + + std::vector results; + results.reserve(result_size); + + int step_width = static_cast(dims()[0] / result_size); + for (size_t i = 0; i < result_size; ++i) { + int begin = static_cast(i * step_width); + int end = static_cast((i + 1) * step_width); + if (i + 1 == places.size()) { // last + end += remainder; + } auto src = Slice(begin, end); - auto &dst_place = places[place_idx]; + auto &dst_place = places[i]; LoDTensor dst; - framework::Copy(src, dst_place, &dst); - - lods.emplace_back(dst); + if (!(dst_place == place())) { + framework::Copy(src, dst_place, &dst); + } else { // It is no need to copy if src_place and dst_place are same. + dst.ShareDataWith(src); + } + results.emplace_back(dst); } - return lods; + return results; } // TODO(tonyyang-svail): make this function support LoD @@ -318,12 +327,17 @@ void LoDTensor::MergeLoDTensor( framework::DDim new_dim = lod_tensors[0]->dims(); std::type_index new_type = lod_tensors[0]->type(); auto new_layout = lod_tensors[0]->layout(); + int64_t new_height = 0; for (auto *lod : lod_tensors) { - PADDLE_ENFORCE(new_dim == lod->dims()); - PADDLE_ENFORCE(new_type == lod->type()); - PADDLE_ENFORCE(new_layout == lod->layout()); + new_height += lod->dims()[0]; + for (int i = 1; i < new_dim.size(); ++i) { + PADDLE_ENFORCE_EQ(new_dim[i], lod->dims()[i]); + } + + PADDLE_ENFORCE_EQ(new_type, lod->type()); + PADDLE_ENFORCE_EQ(new_layout, lod->layout()); } - new_dim[0] *= lod_tensors.size(); + new_dim[0] = new_height; Resize(new_dim); set_layout(new_layout); diff --git a/paddle/operators/parallel_do_op.cc b/paddle/operators/parallel_do_op.cc index e1bec0421..c2561fa2b 100644 --- a/paddle/operators/parallel_do_op.cc +++ b/paddle/operators/parallel_do_op.cc @@ -30,16 +30,13 @@ static constexpr char kParallelScopes[] = "parallel_scopes"; static constexpr char kParallelBlock[] = "sub_block"; -// using ParallelScopeVar = std::vector; using LoDTensor = framework::LoDTensor; -using OperatorBase = framework::OperatorBase; -void SplitTensorAndMoveTensorToScopes( - const framework::Scope &scope, - const std::vector &sub_scopes, +static void SplitTensorAndMoveTensorToScopes( + const framework::Scope &scope, std::vector *sub_scopes, const std::vector &places, const std::vector &names) { - PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size()); + size_t num_sub_scopes = 0; for (auto &argu : names) { auto *var = scope.FindVar(argu); const auto &tensor = var->Get(); @@ -48,9 +45,21 @@ void SplitTensorAndMoveTensorToScopes( for (auto &lod : lod_tensors) { VLOG(3) << lod.dims(); } + if (num_sub_scopes == 0) { + num_sub_scopes = lod_tensors.size(); + } else { + PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size()); + } + PADDLE_ENFORCE_NE(num_sub_scopes, 0); + if (sub_scopes->size() == 0) { + sub_scopes->reserve(num_sub_scopes); + for (size_t i = 0; i < num_sub_scopes; ++i) { + sub_scopes->emplace_back(&scope.NewScope()); + } + } - for (size_t i = 0; i < sub_scopes.size(); ++i) { - *sub_scopes[i]->Var(argu)->GetMutable() = lod_tensors[i]; + for (size_t i = 0; i < lod_tensors.size(); ++i) { + *(*sub_scopes)[i]->Var(argu)->GetMutable() = lod_tensors[i]; } } } @@ -70,7 +79,7 @@ class ParallelDoOp : public framework::OperatorBase { const framework::VariableNameMap &inputs, const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs) - : OperatorBase(type, inputs, outputs, attrs) {} + : framework::OperatorBase(type, inputs, outputs, attrs) {} void Run(const framework::Scope &scope, const platform::Place &place) const override { @@ -85,19 +94,17 @@ class ParallelDoOp : public framework::OperatorBase { auto &sub_scopes = *scope.FindVar(Output(kParallelScopes)) ->GetMutable>(); - for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { - sub_scopes.push_back(&scope.NewScope()); - } // split input - SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, + SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places, Inputs(kInputs)); + // copy parameter for (auto ¶m : Inputs(kParameters)) { PADDLE_ENFORCE(scope.FindVar(param)->IsType(), "Only support parameter type as LoDTensor"); auto &src = scope.FindVar(param)->Get(); - for (size_t i = 0; i < places.size(); ++i) { + for (size_t i = 0; i < sub_scopes.size(); ++i) { auto &place = places[i]; auto *sub_scope = sub_scopes[i]; auto *dst = sub_scope->Var(param)->GetMutable(); @@ -108,9 +115,7 @@ class ParallelDoOp : public framework::OperatorBase { std::vector> workers; workers.reserve(places.size()); - for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { - VLOG(3) << "Run " << place_idx; - + for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) { auto &place = places[place_idx]; auto *cur_scope = sub_scopes[place_idx]; @@ -157,21 +162,16 @@ ParallelDo Operator. } }; -class ParallelDoGradOp : public OperatorBase { +class ParallelDoGradOp : public framework::OperatorBase { public: ParallelDoGradOp(const std::string &type, const framework::VariableNameMap &inputs, const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs) - : OperatorBase(type, inputs, outputs, attrs) {} + : framework::OperatorBase(type, inputs, outputs, attrs) {} void Run(const framework::Scope &scope, const platform::Place &place) const override { - // // get device context from pool - // platform::DeviceContextPool &pool = - // platform::DeviceContextPool::Instance(); - // auto &dev_ctx = *pool.Get(place); - auto *block = Attr(kParallelBlock); auto *program = block->Program(); @@ -181,26 +181,16 @@ class ParallelDoGradOp : public OperatorBase { auto &places = scope.FindVar(Input(kPlaces))->Get(); // feed output@grad - SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, - Inputs(framework::GradVarName(kOutputs))); + SplitTensorAndMoveTensorToScopes( + scope, const_cast *>(&sub_scopes), + places, Inputs(framework::GradVarName(kOutputs))); WaitOnPlaces(places); - // for debugging - for (auto &s : Inputs(framework::GradVarName(kOutputs))) { - VLOG(3) << s; - VLOG(3) << scope.FindVar(s)->Get(); - for (auto *sub_scope : sub_scopes) { - VLOG(3) << sub_scope->FindVar(s)->Get(); - } - } - // exe run std::vector> workers; - for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { - VLOG(3) << "Run " << place_idx; - - auto &place = places[place_idx]; - auto *cur_scope = sub_scopes[place_idx]; + for (size_t i = 0; i < sub_scopes.size(); ++i) { + auto &place = places[i]; + auto *cur_scope = sub_scopes[i]; // execute workers.emplace_back(framework::Async([program, cur_scope, place, block] { @@ -216,33 +206,38 @@ class ParallelDoGradOp : public OperatorBase { // merge grad for (auto &s : Outputs(framework::GradVarName(kParameters))) { - VLOG(3) << "merge grad " << s; - - auto &t = sub_scopes[0]->FindVar(s)->Get(); - VLOG(3) << t; - - std::string s_buf = s + "@BUF"; - auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable(); - - for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) { - auto &tt = sub_scopes[place_idx]->FindVar(s)->Get(); - VLOG(3) << place_idx; - VLOG(3) << tt; - framework::Copy(tt, places[0], t_buf); + auto &result = sub_scopes[0]->FindVar(s)->Get(); + std::string tmp_name; + auto *tmp = sub_scopes[0]->Var(&tmp_name)->GetMutable(); + + for (size_t i = 1; i < sub_scopes.size(); ++i) { + auto &tensor_to_merge = sub_scopes[i]->FindVar(s)->Get(); + if (!(places[i] == places[0])) { + framework::Copy(tensor_to_merge, places[0], tmp); + } else { + tmp->ShareDataWith(tensor_to_merge); + } auto sum_op = framework::OpRegistry::CreateOp( - "sum", {{"X", {s, s_buf}}}, {{"Out", {s}}}, + "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, framework::AttributeMap{}); sum_op->Run(*sub_scopes[0], places[0]); WaitOnPlaces(places); } - VLOG(3) << t; - framework::Copy(t, place, scope.FindVar(s)->GetMutable()); + VLOG(3) << result; + framework::Copy(result, place, scope.FindVar(s)->GetMutable()); } } }; +std::ostream &operator<<(std::ostream &sout, + const std::vector &strs) { + std::copy(strs.begin(), strs.end(), + std::ostream_iterator(sout, ",")); + return sout; +} + class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { public: using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; @@ -283,18 +278,30 @@ class ParallelDoGradOpShapeInference : public framework::InferShapeBase { void operator()(framework::InferShapeContext *ctx) const override { std::vector input{kParameters, kInputs}; std::vector output{kOutputs}; - for (auto &s : input) { - PADDLE_ENFORCE(ctx->HasInputs(s)); - PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)), - "Cannot find the gradient variable %s", - framework::GradVarName(s)); - } + + PADDLE_ENFORCE(ctx->HasInputs(kParameters)); + PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters))); + PADDLE_ENFORCE(ctx->HasInput(kInputs)); + for (auto &s : output) { PADDLE_ENFORCE(ctx->HasInputs(s)); } - for (auto &s : input) { - ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s)); + + ctx->SetOutputsDim(framework::GradVarName(kParameters), + ctx->GetInputsDim(kParameters)); + + auto i_dims = ctx->GetInputsDim(kInputs); + auto ig_names = ctx->Outputs(framework::GradVarName(kInputs)); + + for (size_t i = 0; i < ig_names.size(); ++i) { + auto &ig_name = ig_names[i]; + if (ig_name == framework::kEmptyVarName) { + continue; + } + + ctx->SetDims({ig_name}, {i_dims[i]}); } + if (ctx->HasInputs(kParameters)) { PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters))); ctx->SetOutputsDim(framework::GradVarName(kParameters), diff --git a/paddle/string/to_string.h b/paddle/string/to_string.h index 3b3bcc69a..178edc189 100644 --- a/paddle/string/to_string.h +++ b/paddle/string/to_string.h @@ -15,9 +15,15 @@ limitations under the License. */ #pragma once #include #include +#include namespace paddle { namespace string { +inline std::ostream& operator<<(std::ostream& s, const std::type_index& t) { + s << t.name(); + return s; +} + template inline std::string to_string(T v) { std::ostringstream sout; @@ -25,6 +31,11 @@ inline std::string to_string(T v) { return sout.str(); } +template <> +inline std::string to_string(std::type_index t) { + return t.name(); +} + // Faster std::string/const char* type template <> inline std::string to_string(std::string v) { diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 3c190477d..45196ef6f 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -151,24 +151,28 @@ class BaseParallelForTest(unittest.TestCase): class ParallelOpTest(BaseParallelForTest): - def test_simple_fc(self): - def __network__(): - x = fluid.layers.data(shape=[784], dtype='float32', name='img') - # FIXME: This is a bug of parallel.do - x.stop_gradient = False - x = yield x - hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') - loss = fluid.layers.mean(x=hidden) - yield loss + @staticmethod + def __network__(): + x = fluid.layers.data(shape=[784], dtype='float32', name='img') + x = yield x + hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') + loss = fluid.layers.mean(x=hidden) + yield loss + def test_simple_fc(self): self.run_test( - callback=__network__, + callback=ParallelOpTest.__network__, feed={ - 'img': - numpy.random.random(size=(128 * 3, 784)).astype('float32') + 'img': numpy.random.random(size=(51, 784)).astype('float32') }, fetch='fc1.w@GRAD') + def test_fc_with_tiny_data(self): + self.run_test( + callback=ParallelOpTest.__network__, + feed={'img': numpy.random.random(size=(1, 784)).astype('float32')}, + fetch='fc1.w@GRAD') + if __name__ == '__main__': unittest.main() -- GitLab