From 95e90aa102a94182210ee9042180b3bb2d82040e Mon Sep 17 00:00:00 2001 From: 123malin Date: Sun, 20 Oct 2019 14:31:46 +0800 Subject: [PATCH] test=develop, add communicator_is_sgd_optimizer flag (#20677) * test=develop, communicator_is_sgd_optimizer flags --- .../operators/distributed/communicator.cc | 2 ++ .../operators/distributed/communicator.h | 22 ++++++++++++++----- .../distributed/communicator_test.cc | 3 +-- .../operators/distributed/parameter_send.cc | 7 ++++++ paddle/fluid/platform/flags.cc | 4 +++- python/paddle/fluid/__init__.py | 1 + .../fluid/tests/unittests/test_dist_ctr.py | 3 ++- 7 files changed, 33 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index 1d6732dd21e..ecca873c9d0 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -89,6 +89,8 @@ void AsyncCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc; VLOG(0) << "communicator_merge_sparse_grad: " << FLAGS_communicator_merge_sparse_grad; + VLOG(0) << "communicator_is_sgd_optimizer: " + << FLAGS_communicator_is_sgd_optimizer; if (send_varname_to_ctx.size() == 0) { VLOG(0) << "nothing need to be send, will not start send_thread"; diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 50582e6f34b..be61a0281cd 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -24,6 +24,7 @@ limitations under the License. */ #include #include #include +#include "gflags/gflags.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/variable.h" @@ -37,6 +38,8 @@ limitations under the License. */ #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" +DECLARE_bool(communicator_is_sgd_optimizer); + namespace paddle { namespace operators { namespace distributed { @@ -138,8 +141,10 @@ inline void MergeVars(const std::string& var_name, auto in = EigenVector::Flatten(in_t); result.device(*cpu_ctx.eigen_device()) = result + in; } - result.device(*cpu_ctx.eigen_device()) = - result / static_cast(vars.size()); + if (!FLAGS_communicator_is_sgd_optimizer) { + result.device(*cpu_ctx.eigen_device()) = + result / static_cast(vars.size()); + } } else if (var0->IsType()) { auto& slr0 = var0->Get(); auto* out_slr = out_var->GetMutable(); @@ -151,9 +156,16 @@ inline void MergeVars(const std::string& var_name, inputs.push_back(&var->Get()); } auto dev_ctx = paddle::platform::CPUDeviceContext(); - math::scatter::MergeAverage - merge_average; - merge_average(dev_ctx, inputs, out_slr); + if (FLAGS_communicator_is_sgd_optimizer) { + math::scatter::MergeAdd + merge_add; + merge_add(dev_ctx, inputs, out_slr); + } else { + math::scatter::MergeAverage + merge_average; + merge_average(dev_ctx, inputs, out_slr); + } + VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() << " dims: " << slr0.value().dims(); } else { diff --git a/paddle/fluid/operators/distributed/communicator_test.cc b/paddle/fluid/operators/distributed/communicator_test.cc index 66e36d012b1..5294ac33d15 100644 --- a/paddle/fluid/operators/distributed/communicator_test.cc +++ b/paddle/fluid/operators/distributed/communicator_test.cc @@ -42,7 +42,6 @@ TEST(communicator, merge_lod_tensors) { } out_value += static_cast(i); } - out_value = out_value / 10.0; const std::string out_name = "Out"; std::unique_ptr scope; scope.reset(new framework::Scope()); @@ -96,7 +95,7 @@ TEST(communicator, merge_selected_rows) { std::vector out_values; out_values.reserve(10); for (auto i = 0; i < 10; ++i) { - out_values.push_back(static_cast((i * (10 - i)) / 10.0)); + out_values.push_back(static_cast(i * (10 - i))); } for (auto i = 0; i < out_slr.rows().size(); ++i) { ASSERT_EQ(out_slr.rows()[i], i); diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc index f79adf70708..56362391a25 100644 --- a/paddle/fluid/operators/distributed/parameter_send.cc +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -139,6 +139,13 @@ void ParameterSend::operator()(const RpcContext &rpc_ctx, auto abs_sections = ToAbsoluteSection(rpc_ctx.height_sections); auto &send_rows = send_slr.rows(); + if (send_rows.size() == 0) { + LOG(WARNING) << "WARNING: The variable sent to pserver is empty, which " + "may cause an unknown error. Please check the state of " + "use_double_buffer in pyreader async mode, you need to " + "turn it false."; + } + std::vector> outs_rows_idx; std::vector> outs_dense_idx; diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index c77d8b4e70b..9b5e0c92fca 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -199,7 +199,9 @@ DEFINE_bool( */ DEFINE_int32(communicator_max_merge_var_num, 20, "max var num to merge and send"); - +DEFINE_bool(communicator_is_sgd_optimizer, true, + "gradient sent to the server is the sum of the gradients " + "calculated by each thread if optimizer is sgd"); /** * Distributed related FLAG * Name: FLAGS_communicator_send_queue_size diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 295dae72172..14106668531 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -206,6 +206,7 @@ def __bootstrap__(): read_env_flags.append('communicator_fake_rpc') read_env_flags.append('communicator_send_wait_times') read_env_flags.append('communicator_merge_sparse_grad') + read_env_flags.append('communicator_is_sgd_optimizer') if core.is_compiled_with_brpc(): read_env_flags.append('max_body_size') #set brpc max body size diff --git a/python/paddle/fluid/tests/unittests/test_dist_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_ctr.py index 91947ded353..f1bbce89821 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_ctr.py @@ -113,7 +113,8 @@ class TestDistCTR2x2_ASYNC2(TestDistBase): "FLAGS_communicator_send_queue_size": "2", "FLAGS_communicator_max_merge_var_num": "2", "FLAGS_communicator_max_send_grad_num_before_recv": "2", - "FLAGS_communicator_independent_recv_thread": "0" + "FLAGS_communicator_independent_recv_thread": "0", + "FLAGS_communicator_is_sgd_optimizer": "0" } self.check_with_place( -- GitLab