From 513d1f972c45eaaaf6875d38945857245459aa4b Mon Sep 17 00:00:00 2001 From: zhaocaibei123 <48509226+zhaocaibei123@users.noreply.github.com> Date: Fri, 10 Dec 2021 17:15:06 +0800 Subject: [PATCH] fix pscore geo&lr_decay (#37995) * fix * modify log * fix batch_size --- paddle/fluid/distributed/common/cost_timer.h | 5 ++--- paddle/fluid/distributed/fleet.cc | 9 +++++--- .../fluid/distributed/service/communicator.cc | 4 ++++ paddle/fluid/operators/pscore/send_op.cc | 21 ++++++++++++------- .../fleet/parameter_server/ir/trainer_pass.py | 2 +- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/distributed/common/cost_timer.h b/paddle/fluid/distributed/common/cost_timer.h index d7bf4cc11e..5073dc9cf5 100644 --- a/paddle/fluid/distributed/common/cost_timer.h +++ b/paddle/fluid/distributed/common/cost_timer.h @@ -75,9 +75,8 @@ class CostTimer { } ~CostTimer() { if (_is_print_cost) { - LOG(INFO) << "CostTimer label:" << _label - << ", cost:" << butil::gettimeofday_ms() - _start_time_ms - << "ms"; + VLOG(3) << "CostTimer label:" << _label + << ", cost:" << butil::gettimeofday_ms() - _start_time_ms << "ms"; } else { *(_profiler_node->recorder) << butil::gettimeofday_ms() - _start_time_ms; } diff --git a/paddle/fluid/distributed/fleet.cc b/paddle/fluid/distributed/fleet.cc index ba614179b3..9eb6cbecdc 100644 --- a/paddle/fluid/distributed/fleet.cc +++ b/paddle/fluid/distributed/fleet.cc @@ -439,13 +439,16 @@ void FleetWrapper::PushSparseFromTensorAsync( const LoDTensor* shows, const LoDTensor* clks, std::vector* outputs) { int batch_size = -1; + bool batch_size_consist = true; for (auto* input : *inputs) { int cur_batch_size = input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0]; if (batch_size == -1) { batch_size = cur_batch_size; } else { - CHECK(batch_size == cur_batch_size); // NOLINT + // CHECK(batch_size == cur_batch_size); // NOLINT + batch_size_consist = false; + break; } } CHECK(batch_size > 0); // NOLINT @@ -461,8 +464,8 @@ void FleetWrapper::PushSparseFromTensorAsync( for (framework::LoDTensor* g_tensor : *outputs) { float* g_ori = g_tensor->data(); // no cvm - if (true) { // TODO(zhaocaibei123): add config - // scale_sparse_gradient_with_batch_size_ + if (batch_size_consist) { // TODO(zhaocaibei123): add config + // scale_sparse_gradient_with_batch_size_ Eigen::Map< Eigen::Matrix> g_mat(g_ori, g_tensor->numel() / fea_dim, fea_dim); diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index b4d2fa3e82..eefafafcdf 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -949,6 +949,10 @@ void GeoCommunicator::InitDense(std::vector &varnames, auto *old_var = old_scope_->Var(t); old_var->GetMutable(); framework::CopyVariable(*global_var, old_var); + // init pserver_scope_ + auto *pserver_var = pserver_scope_->Var(t); + pserver_var->GetMutable(); + framework::CopyVariable(*global_var, pserver_var); } VLOG(1) << "init dense table " << table_id << " done"; } diff --git a/paddle/fluid/operators/pscore/send_op.cc b/paddle/fluid/operators/pscore/send_op.cc index a496d0d5a0..482c6ba60d 100644 --- a/paddle/fluid/operators/pscore/send_op.cc +++ b/paddle/fluid/operators/pscore/send_op.cc @@ -42,17 +42,24 @@ class SendOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { auto ins = Inputs("X"); - // auto is_sparse = Attr("is_sparse"); + auto is_sparse = Attr("is_sparse"); auto table_id = Attr("table_id"); auto send_varnames = Attr>("send_varnames"); - auto fleet = paddle::distributed::FleetWrapper::GetInstance(); - std::vector<::std::future> status; - // Note: only send push_dense now! - // communicator->Send(ins, scope) can be used to push_sparse or push_dense - fleet->PushDenseVarsAsync(scope, table_id, ins, &status, 0, -1); - + // for common_dense_table, distributed_push_sparse op for push sparse in + // async + if (is_sparse == 0 && send_varnames.size() >= 1 && + send_varnames[0] != "@PS_STEP_COUNTER@") { + auto fleet = paddle::distributed::FleetWrapper::GetInstance(); + std::vector<::std::future> status; + fleet->PushDenseVarsAsync(scope, table_id, ins, &status, 0, -1); + } else { + auto* communicator = paddle::distributed::Communicator::GetInstance(); + if (communicator->Check(send_varnames)) { + communicator->Send(ins, scope); + } + } // auto fleet = paddle::distributed::FleetWrapper::GetInstance(); // if (is_sparse == 0) { // std::vector<::std::future> status; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index ff10c8ea09..46f26e8e52 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -503,7 +503,7 @@ def append_send_ops_pass(program, config): split_dense_table=config.is_heter_ps_mode) for merged_name, send in sends.items(): - if send.is_sparse(): + if send.is_sparse() and not config.is_geo_mode(): continue is_sparse = 1 if send.is_sparse() else 0 is_sparse = 2 if send.is_distributed() else is_sparse -- GitLab