From 0fcdae8418b8bbc06013ca540d8a7b8d2e4d790e Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 12 Mar 2019 23:08:55 +0800 Subject: [PATCH] add communicator_test --- .../operators/distributed/CMakeLists.txt | 1 + .../operators/distributed/communicator.cc | 62 ---------- .../operators/distributed/communicator.h | 61 ++++++++++ .../distributed/communicator_test.cc | 110 ++++++++++++++++++ 4 files changed, 172 insertions(+), 62 deletions(-) create mode 100644 paddle/fluid/operators/distributed/communicator_test.cc diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index 750aac8dd0a..972b4f67a83 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -55,6 +55,7 @@ cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc mem cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory) cc_library(parameter_recv SRCS parameter_recv.cc DEPS sendrecvop_rpc memory) cc_library(communicator SRCS communicator.cc DEPS scope selected_rows tensor variable_helper selected_rows_functor simple_threadpool parameter_send parameter_recv) +cc_test(communicator_test SRCS communicator_test.cc DEPS communicator) if(WITH_GPU) cc_test(collective_server_test SRCS collective_server_test.cc DEPS sendrecvop_rpc executor ${RPC_DEPS} diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index f17af56400e..72f26e91b21 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -24,9 +24,6 @@ limitations under the License. */ #include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/operators/distributed/parameter_recv.h" #include "paddle/fluid/operators/distributed/parameter_send.h" -#include "paddle/fluid/operators/math/math_function.h" -#include "paddle/fluid/operators/math/selected_rows_functor.h" -#include "paddle/fluid/platform/device_context.h" DEFINE_bool(communicator_independent_recv_thread, true, "use an independent to recv vars from parameter server"); @@ -43,71 +40,12 @@ namespace paddle { namespace operators { namespace distributed { -template -using EigenVector = framework::EigenVector; - inline double GetCurrentUS() { struct timeval time; gettimeofday(&time, NULL); return 1e+6 * time.tv_sec + time.tv_usec; } -static inline void MergeVars(const std::string &var_name, - const std::vector> &vars, - Scope *scope) { - PADDLE_ENFORCE(!vars.empty(), "should have value to merge!"); - auto cpu_place = platform::CPUPlace(); - auto &var0 = vars[0]; - auto *out_var = scope->Var(var_name); - if (var0->IsType()) { - VLOG(3) << "merge " << var_name << " LoDTensor" - << var0->Get().dims(); - - // init output tensor - auto *out_t = out_var->GetMutable(); - auto numel = out_t->numel(); - - // check the input dims - for (auto &var : vars) { - auto &var_t = var->Get(); - PADDLE_ENFORCE_EQ(var_t.numel(), numel, "should have the same dims"); - } - - // set output tensor to 0. - auto cpu_ctx = paddle::platform::CPUDeviceContext(); - math::SetConstant - constant_functor; - constant_functor(cpu_ctx, out_t, static_cast(0)); - - // sum all vars to out - auto result = EigenVector::Flatten(*out_t); - for (auto &var : vars) { - auto &in_t = var->Get(); - auto in = EigenVector::Flatten(in_t); - result.device(*cpu_ctx.eigen_device()) = result + in; - } - } else if (var0->IsType()) { - auto &slr0 = var0->Get(); - auto *out_slr = out_var->GetMutable(); - out_slr->mutable_rows()->clear(); - out_slr->mutable_value()->mutable_data({{}}, cpu_place); - std::vector inputs; - inputs.reserve(vars.size()); - for (auto &var : vars) { - inputs.push_back(&var->Get()); - } - math::scatter::MergeAdd - merge_add; - auto dev_ctx = paddle::platform::CPUDeviceContext(); - merge_add(dev_ctx, inputs, out_slr, false); - VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() - << " dims: " << slr0.value().dims(); - } else { - PADDLE_THROW("unsupported var type!"); - } -} - std::unique_ptr Communicator::communicator_(nullptr); std::once_flag Communicator::init_flag_; diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 4104cb20a36..3fe2a21232a 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -24,6 +24,8 @@ limitations under the License. */ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/operators/distributed/rpc_common.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" @@ -91,6 +93,65 @@ class BlockingQueue { std::condition_variable cv_; }; +template +using EigenVector = framework::EigenVector; + +inline void MergeVars(const std::string& var_name, + const std::vector>& vars, + Scope* scope) { + PADDLE_ENFORCE(!vars.empty(), "should have value to merge!"); + auto cpu_place = platform::CPUPlace(); + auto& var0 = vars[0]; + auto* out_var = scope->Var(var_name); + if (var0->IsType()) { + auto dims = var0->Get().dims(); + VLOG(3) << "merge " << var_name << " LoDTensor " << dims; + + // init output tensor + auto* out_t = out_var->GetMutable(); + out_t->mutable_data(dims, cpu_place); + + // check the input dims + for (auto& var : vars) { + auto& var_t = var->Get(); + PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims"); + } + + // set output tensor to 0. + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + math::SetConstant + constant_functor; + constant_functor(cpu_ctx, out_t, static_cast(0)); + + // sum all vars to out + auto result = EigenVector::Flatten(*out_t); + for (auto& var : vars) { + auto& in_t = var->Get(); + auto in = EigenVector::Flatten(in_t); + result.device(*cpu_ctx.eigen_device()) = result + in; + } + } else if (var0->IsType()) { + auto& slr0 = var0->Get(); + auto* out_slr = out_var->GetMutable(); + out_slr->mutable_rows()->clear(); + out_slr->mutable_value()->mutable_data({{}}, cpu_place); + std::vector inputs; + inputs.reserve(vars.size()); + for (auto& var : vars) { + inputs.push_back(&var->Get()); + } + math::scatter::MergeAdd + merge_add; + auto dev_ctx = paddle::platform::CPUDeviceContext(); + merge_add(dev_ctx, inputs, out_slr, false); + VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() + << " dims: " << slr0.value().dims(); + } else { + PADDLE_THROW("unsupported var type!"); + } +} + using RpcCtxMap = std::unordered_map; class Communicator { diff --git a/paddle/fluid/operators/distributed/communicator_test.cc b/paddle/fluid/operators/distributed/communicator_test.cc new file mode 100644 index 00000000000..5294ac33d15 --- /dev/null +++ b/paddle/fluid/operators/distributed/communicator_test.cc @@ -0,0 +1,110 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "paddle/fluid/operators/distributed/communicator.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; + +TEST(communicator, merge_lod_tensors) { + auto cpu_place = platform::CPUPlace(); + auto dims = framework::make_ddim({2, 3}); + std::vector> in_vars; + float out_value = 0; + for (auto i = 0; i < 10; ++i) { + auto var = std::make_shared(); + in_vars.emplace_back(var); + auto *tensor = var->GetMutable(); + auto *data = tensor->mutable_data(dims, cpu_place); + for (auto j = 0; j < tensor->numel(); ++j) { + data[j] = static_cast(i); + } + out_value += static_cast(i); + } + const std::string out_name = "Out"; + std::unique_ptr scope; + scope.reset(new framework::Scope()); + scope->Var(out_name); + for (auto i = 0; i < 10; ++i) { + MergeVars(out_name, in_vars, scope.get()); + } + auto &out_tensor = scope->FindVar(out_name)->Get(); + auto *out_data = out_tensor.data(); + ASSERT_EQ(out_tensor.dims(), dims); + for (auto i = 0; i < out_tensor.numel(); ++i) { + ASSERT_EQ(out_data[i], out_value); + } +} + +TEST(communicator, merge_selected_rows) { + auto cpu_place = platform::CPUPlace(); + int64_t width = 10; + std::vector> in_vars; + const int64_t height = 100; + for (auto i = 0; i < 10; ++i) { + std::vector rows; + for (auto k = 0; k <= i; ++k) { + rows.push_back(k); + } + auto var = std::make_shared(); + in_vars.emplace_back(var); + auto *slr = var->GetMutable(); + slr->set_height(height); + slr->set_rows(rows); + auto dims = + framework::make_ddim({static_cast(rows.size()), width}); + auto *data = slr->mutable_value()->mutable_data(dims, cpu_place); + for (auto i = 0; i < rows.size(); ++i) { + for (auto j = 0; j < width; ++j) { + data[i * width + j] = static_cast(rows[i]); + } + } + } + const std::string out_name = "Out"; + std::unique_ptr scope; + scope.reset(new framework::Scope()); + scope->Var(out_name); + for (auto i = 0; i < 10; ++i) { + MergeVars(out_name, in_vars, scope.get()); + } + auto &out_slr = scope->FindVar(out_name)->Get(); + auto &out_t = out_slr.value(); + auto *out_data = out_t.data(); + ASSERT_EQ(out_t.dims(), framework::make_ddim({10, width})); + std::vector out_values; + out_values.reserve(10); + for (auto i = 0; i < 10; ++i) { + out_values.push_back(static_cast(i * (10 - i))); + } + for (auto i = 0; i < out_slr.rows().size(); ++i) { + ASSERT_EQ(out_slr.rows()[i], i); + for (auto j = 0; j < width; ++j) { + ASSERT_EQ(out_data[i * width + j], out_values[i]); + } + } +} + +} // namespace distributed +} // namespace operators +} // namespace paddle -- GitLab