From bad312c3ef0229fa4844f791fd3f984f0920376e Mon Sep 17 00:00:00 2001 From: Chengmo Date: Tue, 8 Oct 2019 13:10:00 +0800 Subject: [PATCH] [cherry-pick][release/1.6]Speed GEO-SGD (#20177) * Speed GEO-SGD (#20158) * delete debug vlog & add rpc function & fix word2vec bug & speed GEO-SGD --- .../operators/distributed/communicator.cc | 476 ++++++++++-------- .../operators/distributed/communicator.h | 43 +- .../fluid/transpiler/geo_sgd_transpiler.py | 4 +- 3 files changed, 301 insertions(+), 222 deletions(-) diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index 16546bd158..1d6732dd21 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -24,6 +24,7 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/variable_helper.h" +#include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/parameter_recv.h" #include "paddle/fluid/operators/distributed/parameter_send.h" @@ -62,23 +63,6 @@ inline void VSUB(int n, const T *x, const T *y, T *z) { } } -inline std::vector bucket(const int v_size, const int b_size) { - int remainder = v_size % b_size; - int bucket = v_size / b_size; - std::vector ret_vec(b_size, bucket); - for (int i = 0; i < remainder; ++i) { - ret_vec[i] = ret_vec[i] + 1; - } - int cur_bucket = 0; - for (int j = 0; j < ret_vec.size(); ++j) { - int tmp = ret_vec[j]; - ret_vec[j] = cur_bucket; - cur_bucket += tmp; - } - ret_vec.push_back(cur_bucket); - return ret_vec; -} - std::once_flag Communicator::init_flag_; std::shared_ptr Communicator::communicator_(nullptr); @@ -447,6 +431,14 @@ void GeoSgdCommunicator::InitImpl( send_var_name, send_var_names, vars_epmap, vars_sections_int, 0); recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext( var_name, vars_names, vars_epmap, vars_sections_int, 0); + + // record sparse section + if (is_sparse) { + need_thread_nums_ += + send_varname_to_ctx_[send_var_name].height_sections.size(); + absolute_section_[var_name] = operators::ToAbsoluteSection( + send_varname_to_ctx_[send_var_name].height_sections); + } } if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) { @@ -454,17 +446,12 @@ void GeoSgdCommunicator::InitImpl( } send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); - recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); need_push_queue_ = std::make_shared>>( geo_need_push_nums); delta_scope_.reset(new Scope()); old_scope_.reset(new Scope()); pserver_scope_.reset(new Scope()); - - // for coverage test, please ignore follow code - InitImpl(send_varname_to_ctx_, recv_varname_to_ctx_, training_scope_); - InitImpl(program, training_scope_); } void GeoSgdCommunicator::Start() { @@ -521,11 +508,16 @@ void GeoSgdCommunicator::Send(const std::vector &sparse_var_names, const framework::Scope &scope) { // SparseIdsMap = std::unordered_map> std::shared_ptr ids_table = std::make_shared(); + auto before_run_send = GetCurrentUS(); for (size_t i = 0; i < sparse_var_tables.size(); i++) { if (ids_table->find(sparse_var_tables[i]) == ids_table->end()) { // create empty set for new sparse var - ids_table->insert(std::pair>( - sparse_var_tables[i], std::unordered_set{})); + auto splited_var_nums = + recv_varname_to_ctx_[sparse_var_tables[i]].splited_var_names.size(); + ids_table->insert( + std::pair>>( + sparse_var_tables[i], + std::vector>{splited_var_nums})); } auto *var = scope.FindVar(sparse_var_names[i]); auto var_tensor = var->Get(); @@ -533,12 +525,16 @@ void GeoSgdCommunicator::Send(const std::vector &sparse_var_names, int *var_mutable_data = var_tensor.mutable_data(var_tensor.place()); // insert ids which has not been record for (size_t j = 0; j < element_number; j++) { - ids_table->at(sparse_var_tables[i]).insert(var_mutable_data[j]); + auto ep_idx = GetSectionIndex(var_mutable_data[j], + absolute_section_[sparse_var_tables[i]]); + ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]); VLOG(4) << "Sparse var " << sparse_var_tables[i] << " insert " << var_mutable_data[j]; } } need_push_queue_->Push(ids_table); + auto after_run_send = GetCurrentUS(); + VLOG(3) << "run send_op use time " << after_run_send - before_run_send; } void GeoSgdCommunicator::SendThread() { @@ -548,13 +544,22 @@ void GeoSgdCommunicator::SendThread() { while (running_) { std::vector> task_futures; task_futures.reserve(send_varname_to_ctx_.size()); - auto before_run_send_graph = GetCurrentUS(); - if (ids_send_vec_.size() < geo_need_push_nums_) { + size_t wait_times = 0; + while (ids_send_vec_.size() < geo_need_push_nums_) { VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size(); if (need_push_queue_->Size() > 0) { + wait_times = 0; ids_send_vec_.push_back(*(need_push_queue_->Pop())); VLOG(4) << "ids_send_vec_ pushed"; + } else if (need_push_queue_->Size() == 0) { + VLOG(3) << "wait_times -> " << wait_times; + if (wait_times >= FLAGS_communicator_send_wait_times) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + wait_times++; + continue; } } @@ -567,86 +572,62 @@ void GeoSgdCommunicator::SendThread() { for (auto &iter : send_varname_to_ctx_) { auto &var_name = iter.first; - auto send_task = [this, &var_name] { - auto origin_var_name = DeltaVarToVar(var_name); - - if (var_list_[origin_var_name] == true) { - auto ids_set = SparseIdsMerge(ids_send_vec_, origin_var_name); - SendUpdateSparseVars(origin_var_name, ids_set); - } else { - SendUpdateDenseVars(origin_var_name); + if (var_list_[DeltaVarToVar(var_name)] == true) { + // sparse var: merge->send->recv + for (auto &splited_var_name : iter.second.splited_var_names) { + auto send_task = [this, &var_name, &splited_var_name] { + auto before_run_geo = GetCurrentUS(); + auto ids_set = + SparseIdsMerge(ids_send_vec_, var_name, splited_var_name); + SendUpdateSparseVars(var_name, splited_var_name, ids_set); + RecvUpdateSparseVars(var_name, splited_var_name); + auto after_run_geo = GetCurrentUS(); + VLOG(1) << "run GEO-SGD var " << splited_var_name << " use time " + << after_run_geo - before_run_geo; + }; + task_futures.emplace_back( + send_threadpool_->enqueue(std::move(send_task))); } - auto before_send = GetCurrentUS(); - auto send_functor = distributed::ParameterSend(); - auto &ctx = send_varname_to_ctx_.at(var_name); - send_functor(ctx, *delta_scope_.get(), true, 1); - - auto after_send = GetCurrentUS(); - VLOG(3) << "send " << var_name << " use time " - << after_send - before_send; - }; - task_futures.emplace_back( - send_threadpool_->enqueue(std::move(send_task))); + } else { + auto send_task = [this, &var_name] { + auto before_run_geo = GetCurrentUS(); + SendUpdateDenseVars(var_name); + RecvUpdateDenseVars(var_name); + auto after_run_geo = GetCurrentUS(); + VLOG(3) << "run GEO-SGD var " << var_name << " use time " + << after_run_geo - before_run_geo; + }; + task_futures.emplace_back( + send_threadpool_->enqueue(std::move(send_task))); + } } + for (auto &task_f : task_futures) { + task_f.wait(); + } + ids_send_vec_.clear(); } - for (auto &task_f : task_futures) { - task_f.wait(); - have_push_.fetch_add(1, std::memory_order_relaxed); - } - auto after_run_send_graph = GetCurrentUS(); - VLOG(4) << "run send graph use time " - << after_run_send_graph - before_run_send_graph; - Recv(); } } -void GeoSgdCommunicator::Recv() { - auto push_nums = have_push_.load(); - if (push_nums >= send_varname_to_ctx_.size()) { - ids_send_vec_.clear(); - RecvAll(); - have_push_.store(0); - } -} - -void GeoSgdCommunicator::RecvAll() { - if (!running_) return; - auto before_recv = GetCurrentUS(); - std::vector> task_futures; - task_futures.reserve(recv_varname_to_ctx_.size()); - for (auto &iter : recv_varname_to_ctx_) { - auto recv_task = [this, &iter] { - auto &var_name = iter.first; - auto recv_functor = distributed::ParameterRecv(); - auto before_parameter_recv = GetCurrentUS(); - recv_functor(iter.second, *pserver_scope_.get()); - auto after_parameter_recv = GetCurrentUS(); - VLOG(3) << "run parameter recv var " << var_name << " use time " - << after_parameter_recv - before_parameter_recv; - RecvUpdateVars(var_name); - }; - task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task))); - } - for (auto &task : task_futures) { - task.wait(); - } - auto after_recv = GetCurrentUS(); - VLOG(3) << "run recv graph use time " << after_recv - before_recv; -} - std::unordered_set GeoSgdCommunicator::SparseIdsMerge( - const std::vector &ids_send_vec, - const std::string &var_name) { + const std::vector &ids_send_vec, const std::string &var_name, + const std::string &splited_var_name) { // every batch has some sparse id, merge them into one unoredered_set + VLOG(3) << "Sparse Ids merge var: " << var_name + << " splited var: " << splited_var_name; auto before_run_ids_merge_ = GetCurrentUS(); + auto origin_var_name = DeltaVarToVar(var_name); + auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); std::unordered_set ids_set; + for (auto ids_map : ids_send_vec) { - for (auto id : ids_map[var_name]) { + for (auto id : ids_map[origin_var_name][splited_var_index]) { ids_set.insert(id); } } auto after_run_ids_merge_ = GetCurrentUS(); - VLOG(3) << "run SparseIdsMerge use time " + VLOG(3) << "run SparseIdsMerge " << splited_var_name << " has nums " + << ids_set.size() << " use time " << after_run_ids_merge_ - before_run_ids_merge_; return ids_set; } @@ -654,24 +635,26 @@ std::unordered_set GeoSgdCommunicator::SparseIdsMerge( void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { // calc var_delata = (var_training - var_old)/trainer_nums // calc var_old += var_delta + // var_name: param.delta + auto origin_var_name = DeltaVarToVar(var_name); auto before_run_send_dense = GetCurrentUS(); - auto *var_x = training_scope_->FindVar(var_name); + auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); - auto *var_y = old_scope_->FindVar(var_name); + auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto dims = var_x_tensor.dims(); // create temp var for sub - auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name)); + auto *var_y_sub = old_scope_->Var(var_name); framework::CopyVariable(*var_y, var_y_sub); auto var_y_sub_tensor = var_y_sub->Get(); // create delta var in delta scope - auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name)); + auto *var_z = delta_scope_->Var(var_name); auto *var_z_tensor = var_z->GetMutable(); var_z_tensor->mutable_data(dims, var_x_tensor.place()); var_z_tensor->set_lod(var_x_tensor.lod()); @@ -702,20 +685,34 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { auto after_run_send_dense = GetCurrentUS(); VLOG(3) << "run send update dense var " << var_name << " use time " << after_run_send_dense - before_run_send_dense; + + auto send_functor = distributed::ParameterSend(); + auto &ctx = send_varname_to_ctx_.at(var_name); + + auto before_send_dense = GetCurrentUS(); + send_functor(ctx, *delta_scope_.get(), true, 1); + auto after_send_denxe = GetCurrentUS(); + VLOG(3) << "send " << var_name << " use time " + << after_send_denxe - before_send_dense; } void GeoSgdCommunicator::SendUpdateSparseVars( - const std::string &var_name, const std::unordered_set &ids_table) { + const std::string &var_name, const std::string &splited_var_name, + const std::unordered_set &ids_table) { // calc var_delata = (var_training - var_old)/trainer_nums // calc var_old += var_delta + // var_name: param.delta, splited_var_name: param.block0.delta + // origin_var_name: param auto before_run_send_sparse = GetCurrentUS(); auto ids_num = ids_table.size(); - VLOG(3) << "Sparse Ids nums is : " << ids_num; - auto *var_x = training_scope_->FindVar(var_name); + VLOG(4) << "Sparse Ids nums is : " << ids_num; + auto origin_var_name = DeltaVarToVar(var_name); + + auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); - auto *var_y = old_scope_.get()->FindVar(var_name); + auto *var_y = old_scope_.get()->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); auto dims = var_x_tensor.dims(); @@ -724,7 +721,7 @@ void GeoSgdCommunicator::SendUpdateSparseVars( float *x_value = var_x_tensor.mutable_data(var_x_tensor.place()); float *y_value = var_y_tensor.mutable_data(var_y_tensor.place()); - auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name)); + auto *var_z = delta_scope_->Var(splited_var_name); auto *var_z_select_rows = var_z->GetMutable(); auto *var_z_value = var_z_select_rows->mutable_value(); var_z_value->Resize({static_cast(ids_num), row_numel}); @@ -732,135 +729,157 @@ void GeoSgdCommunicator::SendUpdateSparseVars( std::vector new_rows; new_rows.insert(new_rows.begin(), ids_table.begin(), ids_table.end()); - var_z_select_rows->set_rows(new_rows); - var_z_select_rows->set_height(new_rows.size()); - - // using multi thread speed sparse delta calc - std::vector buts = - bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket); - std::vector> fs; - - for (int x = 0; x < buts.size() - 1; x++) { - int start = buts[x]; - int end = buts[x + 1]; - float avg = 1 / static_cast(trainer_nums_); - - fs.push_back( - paddle::framework::Async([&x_value, &y_value, &z_value, &new_rows, - row_numel, start, end, avg]() { - auto cpu_ctx = paddle::platform::CPUDeviceContext(); - auto blas = - math::GetBlas(cpu_ctx); - - for (int y = start; y < end; y++) { - auto ids = new_rows[y]; - - float *x_val = x_value + ids * row_numel; - float *y_val = y_value + ids * row_numel; - float *z_val = z_value + y * row_numel; - - std::vector row_delta(row_numel, 0); - VSUB(row_numel, x_val, y_val, row_delta.data()); - blas.SCAL(row_numel, avg, row_delta.data()); - blas.VADD(row_numel, row_delta.data(), y_val, y_val); - blas.VCOPY(row_numel, row_delta.data(), z_val); - } - })); - } - for (size_t i = 0; i < fs.size(); ++i) { - fs[i].wait(); + + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + auto blas = math::GetBlas(cpu_ctx); + float avg = 1 / static_cast(trainer_nums_); + for (int y = 0; y < new_rows.size(); y++) { + auto ids = new_rows[y]; + + float *x_val = x_value + ids * row_numel; + float *y_val = y_value + ids * row_numel; + float *z_val = z_value + y * row_numel; + + std::vector row_delta(row_numel, 0); + VSUB(row_numel, x_val, y_val, row_delta.data()); + blas.SCAL(row_numel, avg, row_delta.data()); + blas.VADD(row_numel, row_delta.data(), y_val, y_val); + blas.VCOPY(row_numel, row_delta.data(), z_val); } + auto after_run_send_sparse = GetCurrentUS(); - VLOG(3) << "run send update sparse var " << var_name << " use time " + VLOG(3) << "run send update sparse var " << splited_var_name << " use time " << after_run_send_sparse - before_run_send_sparse; + + auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); + std::vector send_rows; + send_rows.reserve(new_rows.size()); + for (auto idx : new_rows) { + send_rows.push_back(idx - + absolute_section_[origin_var_name][splited_var_index]); + } + var_z_select_rows->set_rows(send_rows); + var_z_select_rows->set_height( + send_varname_to_ctx_[var_name].height_sections[splited_var_index]); + + auto before_send_sparse = GetCurrentUS(); + RpcSend(var_name, splited_var_name, splited_var_index); + auto after_send_sparse = GetCurrentUS(); + VLOG(3) << "send " << splited_var_name << " has nums " << new_rows.size() + << " use time " << after_send_sparse - before_send_sparse; } -void GeoSgdCommunicator::RecvUpdateVars(const std::string &var_name) { +void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) { // calc var_training += var_pserver - var_old // calc var_old = var_pserver + // var_name: param.delta + + // step1: recv dense var from pserver + auto origin_var_name = DeltaVarToVar(var_name); + + auto before_run_recv = GetCurrentUS(); + auto recv_functor = distributed::ParameterRecv(); + recv_functor(recv_varname_to_ctx_[origin_var_name], *pserver_scope_.get()); + auto after_run_recv = GetCurrentUS(); + VLOG(3) << "recv var " << origin_var_name << " use time " + << after_run_recv - before_run_recv; + + // step2: update dense var + auto before_run_update = GetCurrentUS(); + auto *var_x = training_scope_->FindVar(origin_var_name); + auto var_x_tensor = var_x->Get(); + + auto *var_y = old_scope_->FindVar(origin_var_name); + auto var_y_tensor = var_y->Get(); + + auto *var_y_sub = old_scope_->Var(origin_var_name); + framework::CopyVariable(*var_y, var_y_sub); + auto var_y_sub_tensor = var_y_sub->Get(); + + auto *var_z = pserver_scope_.get()->FindVar(origin_var_name); + auto var_z_tensor = var_z->Get(); + + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + auto blas = math::GetBlas(cpu_ctx); + // calc sub = pserver - old + blas.SCAL(var_y_sub_tensor.numel(), -1, + var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); + blas.VADD(var_y_tensor.numel(), + var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), + var_z_tensor.mutable_data(var_z_tensor.place()), + var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); + // calc recv += sub + blas.VADD(var_x_tensor.numel(), + var_x_tensor.mutable_data(var_x_tensor.place()), + var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), + var_x_tensor.mutable_data(var_x_tensor.place())); + // calc old = pserver + framework::CopyVariable(*var_z, var_y); + auto after_run_update = GetCurrentUS(); + VLOG(3) << "dese var update " << origin_var_name << " use time " + << after_run_update - before_run_update; +} + +void GeoSgdCommunicator::RecvUpdateSparseVars( + const std::string &var_name, const std::string &splited_var_name) { + // step 1: recv splited var from pserver + auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); + auto origin_var_name = DeltaVarToVar(var_name); + auto origin_splited_var_name = DeltaVarToVar(splited_var_name); + auto before_run_recv = GetCurrentUS(); + RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); + auto after_run_recv = GetCurrentUS(); + VLOG(3) << "recv var " << origin_splited_var_name << " use time " + << after_run_recv - before_run_recv; - auto *var_x = training_scope_->FindVar(var_name); + // step 2: update sparse var + auto before_run_update = GetCurrentUS(); + auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); + auto dims = var_x_tensor.dims(); float *x_value = var_x_tensor.mutable_data(var_x_tensor.place()); - auto *var_y = old_scope_->FindVar(var_name); + auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); float *y_value = var_y_tensor.mutable_data(var_y_tensor.place()); - if (var_list_[var_name] == true) { - // sparse param - auto *var_z = pserver_scope_.get()->FindVar(var_name); - auto var_z_slr = var_z->GetMutable(); - auto &new_rows = var_z_slr->rows(); - auto *new_value = var_z_slr->mutable_value(); - auto row_numel = new_value->numel() / new_rows.size(); - auto *z_value = new_value->mutable_data(var_x_tensor.place()); - - std::vector buts = - bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket); - std::vector> fs; - - for (int x = 0; x < buts.size() - 1; x++) { - int start = buts[x]; - int end = buts[x + 1]; - - fs.push_back(paddle::framework::Async( - [&x_value, &y_value, &z_value, &new_rows, row_numel, start, end]() { - auto cpu_ctx = paddle::platform::CPUDeviceContext(); - auto blas = - math::GetBlas( - cpu_ctx); - - for (int y = start; y < end; y++) { - std::vector row_delta(row_numel, 0); - - auto ids = new_rows[y]; - - float *x_val = x_value + ids * row_numel; - float *y_val = y_value + ids * row_numel; - float *z_val = z_value + y * row_numel; - - VSUB(row_numel, z_val, y_val, row_delta.data()); - blas.VADD(row_numel, row_delta.data(), x_val, x_val); - blas.VCOPY(row_numel, z_val, y_val); - } - })); - } - for (size_t i = 0; i < fs.size(); ++i) { - fs[i].wait(); - } - } else { - // dense param - auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name)); - framework::CopyVariable(*var_y, var_y_sub); - auto var_y_sub_tensor = var_y_sub->Get(); - - auto *var_z = pserver_scope_.get()->FindVar(var_name); - auto var_z_tensor = var_z->Get(); - - auto cpu_ctx = paddle::platform::CPUDeviceContext(); - auto blas = - math::GetBlas(cpu_ctx); - // calc sub = pserver - old - blas.SCAL(var_y_sub_tensor.numel(), -1, - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); - blas.VADD(var_y_tensor.numel(), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), - var_z_tensor.mutable_data(var_z_tensor.place()), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); - // calc recv += sub - blas.VADD(var_x_tensor.numel(), - var_x_tensor.mutable_data(var_x_tensor.place()), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), - var_x_tensor.mutable_data(var_x_tensor.place())); - // calc old = pserver - framework::CopyVariable(*var_z, var_y); + auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name); + auto var_z_slr = var_z->GetMutable(); + auto row_size = var_z_slr->rows().size(); + + std::vector new_rows; + new_rows.reserve(row_size); + + for (auto ids : var_z_slr->rows()) { + new_rows.push_back(ids + + absolute_section_[origin_var_name][splited_var_index]); } - auto after_run_recv = GetCurrentUS(); - VLOG(3) << "run recv update var " << var_name << " use time " - << after_run_recv - before_run_recv; + auto *new_value = var_z_slr->mutable_value(); + auto row_numel = dims[1]; + auto *z_value = new_value->mutable_data(var_x_tensor.place()); + + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + auto blas = math::GetBlas(cpu_ctx); + for (int y = 0; y < new_rows.size(); y++) { + std::vector row_delta(row_numel, 0); + + auto ids = new_rows[y]; + + float *x_val = x_value + ids * row_numel; + float *y_val = y_value + ids * row_numel; + float *z_val = z_value + y * row_numel; + + VSUB(row_numel, z_val, y_val, row_delta.data()); + blas.VADD(row_numel, row_delta.data(), x_val, x_val); + blas.VCOPY(row_numel, z_val, y_val); + } + + auto after_run_update = GetCurrentUS(); + VLOG(3) << "sparse var recv update " << origin_splited_var_name << " has num " + << new_rows.size() << " use time " + << after_run_update - before_run_update; } void GeoSgdCommunicator::GeoSgdSparseParamInit(framework::Scope *scope_x, @@ -893,6 +912,37 @@ void GeoSgdCommunicator::GeoSgdDenseParamInit(framework::Scope *scope_x, framework::CopyVariable(*var_x, var_y); } +void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name, + const std::string &splited_var_name, + const size_t &splited_var_index) { + auto trainer_id = send_varname_to_ctx_[origin_var_name].trainer_id; + auto endpoint = + send_varname_to_ctx_[origin_var_name].epmap[splited_var_index]; + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx_send = *pool.Get(platform::CPUPlace()); + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(trainer_id); + rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(), + splited_var_name); +} + +void GeoSgdCommunicator::RpcRecv(const std::string &var_name, + const std::string &splited_var_name, + const size_t &splited_var_index) { + auto train_id = recv_varname_to_ctx_[var_name].trainer_id; + auto endpoint = recv_varname_to_ctx_[var_name].epmap[splited_var_index]; + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace()); + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(train_id); + pserver_scope_->Var(splited_var_name); + rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, *pserver_scope_.get(), + splited_var_name, splited_var_name, splited_var_name); +} + +void GeoSgdCommunicator::Recv() {} + void GeoSgdCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, const RpcCtxMap &recv_varname_to_ctx, Scope *recv_scope) {} diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index e00d5c9918..50582e6f34 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -27,7 +27,10 @@ limitations under the License. */ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" #include "paddle/fluid/operators/distributed/rpc_common.h" +#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" #include "paddle/fluid/platform/device_context.h" @@ -268,7 +271,7 @@ class Communicator { }; using SparseIdsMap = - std::unordered_map>; + std::unordered_map>>; class AsyncCommunicator : public Communicator { public: @@ -348,15 +351,18 @@ class GeoSgdCommunicator : public Communicator { private: void SendThread(); - void RecvAll(); std::unordered_set SparseIdsMerge( const std::vector& ids_send_vec, - const std::string& var_name); + const std::string& var_name, const std::string& splited_var_name); void SendUpdateDenseVars(const std::string& var_name); void SendUpdateSparseVars(const std::string& var_name, + const std::string& splited_var_name, const std::unordered_set& ids_table); - void RecvUpdateVars(const std::string& var_name); + + void RecvUpdateDenseVars(const std::string& var_name); + void RecvUpdateSparseVars(const std::string& var_name, + const std::string& splited_var_name); void GeoSgdDenseParamInit(framework::Scope* scope_x, framework::Scope* scope_y, @@ -366,6 +372,14 @@ class GeoSgdCommunicator : public Communicator { framework::Scope* scope_y, const std::string var_name); + void RpcSend(const std::string& origin_var_name, + const std::string& splited_var_name, + const size_t& splited_var_index); + + void RpcRecv(const std::string& origin_var_name, + const std::string& splited_var_name, + const size_t& splited_var_index); + const std::string VarToDeltaVar(const std::string var_name) { std::string delta_name = var_name; const std::string send_name = delta_name.append(".delta"); @@ -379,6 +393,20 @@ class GeoSgdCommunicator : public Communicator { return param_name; } + size_t GetSplitedVarIndex(const std::string var_name, + const std::string splited_var_name) { + size_t index = 0; + for (size_t i = 0; + i < send_varname_to_ctx_[var_name].splited_var_names.size(); i++) { + if (send_varname_to_ctx_[var_name].splited_var_names[i] == + splited_var_name) { + index = i; + break; + } + } + return index; + } + private: int trainer_nums_ = 1; int geo_need_push_nums_ = 100; @@ -390,8 +418,6 @@ class GeoSgdCommunicator : public Communicator { std::shared_ptr pserver_scope_; // parameter on pserver,gloabl scope RpcCtxMap send_varname_to_ctx_; RpcCtxMap recv_varname_to_ctx_; - - std::atomic_uint have_push_{0}; std::unordered_map var_list_; // if var is sparse, using selected rows, bool=true @@ -399,9 +425,12 @@ class GeoSgdCommunicator : public Communicator { need_push_queue_; std::vector ids_send_vec_; + std::unordered_map> absolute_section_; + std::unique_ptr<::ThreadPool> send_threadpool_{nullptr}; - std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr}; std::unique_ptr send_thread_{nullptr}; + + size_t need_thread_nums_{0}; }; } // namespace distributed diff --git a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py index 658e3efd5b..bd791e50b6 100644 --- a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py +++ b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py @@ -179,8 +179,8 @@ class GeoSgdTranspiler(DistributeTranspiler): return self.vars_info def get_trainer_program(self, wait_port=True): - # if wait_port: - # wait_server_ready(self.pserver_endpoints) + if wait_port: + wait_server_ready(self.pserver_endpoints) return self.origin_program def get_pserver_programs(self, endpoint): -- GitLab