diff --git a/paddle/fluid/distributed/common/cost_timer.h b/paddle/fluid/distributed/common/cost_timer.h index d7bf4cc11e0a30bb819cb14005e3c0acf2f4e53d..5073dc9cf50845e5569965ca1251b99df6868608 100644 --- a/paddle/fluid/distributed/common/cost_timer.h +++ b/paddle/fluid/distributed/common/cost_timer.h @@ -75,9 +75,8 @@ class CostTimer { } ~CostTimer() { if (_is_print_cost) { - LOG(INFO) << "CostTimer label:" << _label - << ", cost:" << butil::gettimeofday_ms() - _start_time_ms - << "ms"; + VLOG(3) << "CostTimer label:" << _label + << ", cost:" << butil::gettimeofday_ms() - _start_time_ms << "ms"; } else { *(_profiler_node->recorder) << butil::gettimeofday_ms() - _start_time_ms; } diff --git a/paddle/fluid/distributed/fleet.cc b/paddle/fluid/distributed/fleet.cc index ba614179b33b9991ef354c7c16fdbcb7980c1953..9eb6cbecdc752f96a9e17d9be0d3d5d4c2dfee11 100644 --- a/paddle/fluid/distributed/fleet.cc +++ b/paddle/fluid/distributed/fleet.cc @@ -439,13 +439,16 @@ void FleetWrapper::PushSparseFromTensorAsync( const LoDTensor* shows, const LoDTensor* clks, std::vector* outputs) { int batch_size = -1; + bool batch_size_consist = true; for (auto* input : *inputs) { int cur_batch_size = input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0]; if (batch_size == -1) { batch_size = cur_batch_size; } else { - CHECK(batch_size == cur_batch_size); // NOLINT + // CHECK(batch_size == cur_batch_size); // NOLINT + batch_size_consist = false; + break; } } CHECK(batch_size > 0); // NOLINT @@ -461,8 +464,8 @@ void FleetWrapper::PushSparseFromTensorAsync( for (framework::LoDTensor* g_tensor : *outputs) { float* g_ori = g_tensor->data(); // no cvm - if (true) { // TODO(zhaocaibei123): add config - // scale_sparse_gradient_with_batch_size_ + if (batch_size_consist) { // TODO(zhaocaibei123): add config + // scale_sparse_gradient_with_batch_size_ Eigen::Map< Eigen::Matrix> g_mat(g_ori, g_tensor->numel() / fea_dim, fea_dim); diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index b4d2fa3e82e88476cf42cc38347cb07cc4ae232c..eefafafcdf6641aa1b26b304fba90a2cc0f19c5b 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -949,6 +949,10 @@ void GeoCommunicator::InitDense(std::vector &varnames, auto *old_var = old_scope_->Var(t); old_var->GetMutable(); framework::CopyVariable(*global_var, old_var); + // init pserver_scope_ + auto *pserver_var = pserver_scope_->Var(t); + pserver_var->GetMutable(); + framework::CopyVariable(*global_var, pserver_var); } VLOG(1) << "init dense table " << table_id << " done"; } diff --git a/paddle/fluid/operators/pscore/send_op.cc b/paddle/fluid/operators/pscore/send_op.cc index a496d0d5a078ea3109954581b0b48281079f2623..482c6ba60d26fdab5776e99d036162a7c67b21f8 100644 --- a/paddle/fluid/operators/pscore/send_op.cc +++ b/paddle/fluid/operators/pscore/send_op.cc @@ -42,17 +42,24 @@ class SendOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { auto ins = Inputs("X"); - // auto is_sparse = Attr("is_sparse"); + auto is_sparse = Attr("is_sparse"); auto table_id = Attr("table_id"); auto send_varnames = Attr>("send_varnames"); - auto fleet = paddle::distributed::FleetWrapper::GetInstance(); - std::vector<::std::future> status; - // Note: only send push_dense now! - // communicator->Send(ins, scope) can be used to push_sparse or push_dense - fleet->PushDenseVarsAsync(scope, table_id, ins, &status, 0, -1); - + // for common_dense_table, distributed_push_sparse op for push sparse in + // async + if (is_sparse == 0 && send_varnames.size() >= 1 && + send_varnames[0] != "@PS_STEP_COUNTER@") { + auto fleet = paddle::distributed::FleetWrapper::GetInstance(); + std::vector<::std::future> status; + fleet->PushDenseVarsAsync(scope, table_id, ins, &status, 0, -1); + } else { + auto* communicator = paddle::distributed::Communicator::GetInstance(); + if (communicator->Check(send_varnames)) { + communicator->Send(ins, scope); + } + } // auto fleet = paddle::distributed::FleetWrapper::GetInstance(); // if (is_sparse == 0) { // std::vector<::std::future> status; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index ff10c8ea097ad0ab3385018b89e3f0d9ec7f0408..46f26e8e52cd5c24d368f9ee242c809336304664 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -503,7 +503,7 @@ def append_send_ops_pass(program, config): split_dense_table=config.is_heter_ps_mode) for merged_name, send in sends.items(): - if send.is_sparse(): + if send.is_sparse() and not config.is_geo_mode(): continue is_sparse = 1 if send.is_sparse() else 0 is_sparse = 2 if send.is_distributed() else is_sparse