From 9ba2ae1015d261ea7738fe3f8ba1afbf6ad8581e Mon Sep 17 00:00:00 2001 From: seiriosPlus Date: Fri, 21 Aug 2020 16:42:45 +0800 Subject: [PATCH] optimize init from pserver --- .../operators/distributed/communicator.cc | 90 ++++++++----------- .../operators/distributed/communicator.h | 6 +- .../fleet/meta_optimizers/async_optimizer.py | 1 - .../distribute_transpiler/__init__.py | 1 - 4 files changed, 42 insertions(+), 56 deletions(-) diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index c144686a353..817071d3a57 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -74,8 +74,12 @@ void AsyncCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, } else { recv_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); } + + InitParams(); } +void AsyncCommunicator::InitParams() { RecvNoBarrier(); } + AsyncCommunicator::~AsyncCommunicator() { running_ = false; if (main_thread_) main_thread_->join(); @@ -721,7 +725,7 @@ void GeoCommunicator::RecvDense(const std::string &varname) { t_timestamp->data()); } -void GeoCommunicator::Init() { +void GeoCommunicator::InitParams() { std::vector> tasks; tasks.reserve(recv_varname_to_ctx_.size()); @@ -744,12 +748,17 @@ void GeoCommunicator::Init() { } void GeoCommunicator::InitDense(const std::string varname) { - auto *var = old_scope_->Var(varname); - var->GetMutable(); - auto &ctx = recv_varname_to_ctx_.at(varname); auto recv = distributed::ParameterRecv(); - recv(ctx, *old_scope_); + recv(ctx, *recv_scope_); + + auto *global_var = recv_scope_->FindVar(varname); + global_var->GetMutable(); + + auto *old_var = old_scope_->Var(varname); + old_var->GetMutable(); + + framework::CopyVariable(*global_var, old_var); VLOG(1) << "init dense variable " << varname << " done"; } @@ -781,68 +790,43 @@ void GeoCommunicator::InitSparse() { LargeScaleKV::Init(metas); - distributed::RPCClient *rpc_client = - distributed::RPCClient::GetInstance(trainer_id_); - - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto cpu_place = platform::CPUPlace(); - auto &cpu_ctx = *pool.Get(cpu_place); - - framework::Scope &local_scope = send_scope_->NewScope(); - for (auto &meta : metas) { auto &ctx = recv_varname_to_ctx_.at(meta.name); - auto pserver_num = ctx.splited_varnames.size(); - - for (size_t i = 0; i < ctx.splited_varnames.size(); i++) { - auto &recv_var_name = ctx.splited_varnames[i]; - auto *var = local_scope.Var(recv_var_name); - var->GetMutable(); - - distributed::VarHandlePtr ret; - - ret = rpc_client->AsyncGetVarNoBarrier(ctx.epmap[i], cpu_ctx, local_scope, - recv_var_name, recv_var_name); - - auto *recv_var = local_scope.FindVar(recv_var_name); - auto &recv_t = recv_var->Get(); + auto recv = distributed::ParameterRecv(); - auto width = recv_t.dims()[1]; - auto rows = recv_t.dims()[0]; + auto *global_var = recv_scope_->FindVar(meta.name); + auto global_value = global_var->Get(); + auto rows = global_value.dims()[0]; + auto dim1 = global_value.dims()[1]; - PADDLE_ENFORCE_EQ( - width, meta.value_dims[0], - platform::errors::InvalidArgument("sparse params do not match")); + recv(ctx, *recv_scope_); + VLOG(1) << "recv " << meta.name << " with global scope for init"; - std::vector ids; + auto n_rows = global_var->Get().dims()[0]; - for (int x = 0; x < rows; x++) { - ids.push_back(x * pserver_num + i); - } + PADDLE_ENFORCE_EQ( + rows, n_rows, + platform::errors::InvalidArgument( + "global var: %s origin dim must equal recved rows", meta.name)); - std::vector *>> values; - auto *ins = distributed::LargeScaleKV::GetInstance(); + std::vector ids(rows); + std::iota(ids.begin(), ids.end(), 0); - ins->Get(meta.name)->Init(ids); - ins->Get(meta.name)->Get(ids, {"Param"}, &values); + auto *ins = distributed::LargeScaleKV::GetInstance(); + std::vector *>> values; - PADDLE_ENFORCE_NE(ret->Wait(), 0U, platform::errors::ExecutionTimeout( - "internal error in RPCClient")); + ins->Get(meta.name)->Init(ids); + ins->Get(meta.name)->Get(ids, {"Param"}, &values); - auto blas = math::GetBlas( - paddle::platform::CPUDeviceContext()); + auto blas = math::GetBlas( + paddle::platform::CPUDeviceContext()); - for (size_t k = 0; k < ids.size(); ++k) { - blas.VCOPY(width, recv_t.data() + k * width, - values[k][0]->data()); - } - - local_scope.EraseVars({recv_var_name}); + for (auto &id : ids) { + blas.VCOPY(dim1, global_value.data() + k * width, + values[id][0]->data()); } } - send_scope_->DeleteScope(&local_scope); - VLOG(3) << "init sparse variable done"; } diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 98a2aba2ec2..4a9a9eb1701 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -19,6 +19,7 @@ limitations under the License. */ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ limitations under the License. */ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/operators/distributed/communicator_common.h" #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/large_scale_kv.h" @@ -279,6 +281,8 @@ class AsyncCommunicator : public Communicator { const RpcCtxMap &recv_varname_to_ctx, Scope *recv_scope) override; + void InitParams(); + void MainThread(); void Send(const std::vector &var_names, @@ -435,7 +439,7 @@ class GeoCommunicator : public AsyncCommunicator { void RecvDense(const std::string &varname); - void Init(); + void InitParams(); void InitSparse(); diff --git a/python/paddle/distributed/fleet/meta_optimizers/async_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/async_optimizer.py index b6543549728..0e4876fc650 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/async_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/async_optimizer.py @@ -65,7 +65,6 @@ class AsyncMetaOptimizer(MetaOptimizerBase): # for startup program _startup = worker.fake_init_ops_pass(_startup, compiled_config) - _startup = worker.init_from_server_pass(_startup, compiled_config) _startup = worker.delet_extra_optimizes_pass(_startup, compiled_config) else: diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index d2c7397c85f..e2d0f675216 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -771,7 +771,6 @@ class ParameterServerOptimizer(DistributedOptimizer): # for startup program _startup = worker.fake_init_ops_pass(_startup, compiled_config) - _startup = worker.init_from_server_pass(_startup, compiled_config) _startup = worker.delet_extra_optimizes_pass(_startup, compiled_config) else: -- GitLab