See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/distributed/communicator.h" #include #include #include // NOLINT #include #include // NOLINT #include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/parameter_recv.h" #include "paddle/fluid/operators/distributed/parameter_send.h" #include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/split.h" namespace paddle { namespace operators { namespace distributed { using Tree = std::map>>; using RpcCtxMap = operators::distributed::RpcCtxMap; inline double GetCurrentUS() { struct timeval time; gettimeofday(&time, NULL); return 1e+6 * time.tv_sec + time.tv_usec; } template inline void VSUB(int n, const T *x, const T *y, T *z) { for (int i = 0; i < n; ++i) { z[i] = x[i] - y[i]; } } Communicator::Communicator() {} Communicator::Communicator(const std::map &envs_) { for (auto &iter : envs_) { envs[iter.first] = iter.second; } } std::once_flag Communicator::init_flag_; std::shared_ptr Communicator::communicator_(nullptr); void AsyncCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, const RpcCtxMap &recv_varname_to_ctx, Scope *recv_scope) { send_varname_to_ctx_ = std::move(send_varname_to_ctx); recv_varname_to_ctx_ = std::move(recv_varname_to_ctx); recv_scope_ = std::move(recv_scope); if (send_varname_to_ctx.size() == 0) { VLOG(0) << "nothing need to be send, will not start send_thread"; } else { send_scope_.reset(new Scope()); for (auto &iter : send_varname_to_ctx_) { send_varname_to_queue_[iter.first] = std::make_shared>>( send_queue_size_); } send_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); } if (recv_varname_to_ctx.size() == 0) { VLOG(0) << "nothing need to be received, will not start recv_thread"; } else { recv_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); } } void AsyncCommunicator::InitImpl(const paddle::framework::ProgramDesc &program, Scope *param_scope) { RpcCtxMap send_varname_to_ctx; RpcCtxMap recv_varname_to_ctx; for (auto *op : program.Block(0).AllOps()) { VLOG(3) << "node name " << op->Type(); if (op->Type() == "send") { auto send_var_name = op->Input("X")[0]; auto send_varnames = boost::get>( op->GetNullableAttr("send_varnames")); auto epmap = boost::get>(op->GetNullableAttr("epmap")); auto height_section = boost::get>(op->GetNullableAttr("sections")); auto trainer_id = boost::get(op->GetNullableAttr("trainer_id")); auto merge_add = boost::get(op->GetNullableAttr("merge_add")); if (!merge_add) { merge_add = is_sgd_optimizer_; } auto use_send_handler = boost::get(op->GetNullableAttr("use_send_handler")); send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext( send_var_name, send_varnames, epmap, height_section, trainer_id, merge_add, use_send_handler); VLOG(3) << "find and init an send op: " << send_varname_to_ctx[send_var_name]; } else if (op->Type() == "recv") { auto do_not_run = boost::get(op->GetNullableAttr("do_not_run")); PADDLE_ENFORCE_GT(do_not_run, 0, platform::errors::InvalidArgument( "recv op's attr `do_not_run` must be True!")); auto recv_var_name = op->Output("Out")[0]; auto recv_varnames = boost::get>( op->GetNullableAttr("recv_varnames")); auto epmap = boost::get>(op->GetNullableAttr("epmap")); auto trainer_id = boost::get(op->GetNullableAttr("trainer_id")); recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext( recv_var_name, recv_varnames, epmap, {}, trainer_id); } } // init communicator here if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) { LOG(WARNING) << "no var need to send and recv!!"; } operators::distributed::AsyncCommunicator::InitImpl( send_varname_to_ctx, recv_varname_to_ctx, param_scope); } AsyncCommunicator::~AsyncCommunicator() { running_ = false; if (send_thread_) send_thread_->join(); if (recv_thread_) recv_thread_->join(); } void AsyncCommunicator::SendThread() { VLOG(3) << "SendThread start!"; while (running_) { std::vector> task_futures; task_futures.reserve(send_varname_to_ctx_.size()); VLOG(4) << "run send graph"; auto before_run_send_graph = GetCurrentUS(); for (auto &iter : send_varname_to_queue_) { auto &var_name = iter.first; auto &var_queue = iter.second; if (var_queue->Size() > 0) { auto send_task = [this, &var_name, &var_queue] { VLOG(4) << var_name << " merge and send"; std::vector> vars; int merged_var_num = 0; int wait_times = 0; while (merged_var_num < max_merge_var_num_) { if (var_queue->Size() == 0) { VLOG(4) << "wait_times -> " << wait_times; if (wait_times >= send_wait_times_) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); wait_times++; continue; } else { wait_times = 0; vars.push_back(var_queue->Pop()); // only count the send number of the first var if (var_name == send_varname_to_queue_.begin()->first) { grad_num_.fetch_add(1, std::memory_order_relaxed); } merged_var_num++; } } auto before_merge = GetCurrentUS(); auto &ctx = send_varname_to_ctx_.at(var_name); if (ctx.use_send_handler) { MergeVars(var_name, vars, send_scope_.get(), ctx.merge_add); } else { MergeVars(var_name, vars, send_scope_.get(), ctx.merge_add); } auto after_merge = GetCurrentUS(); VLOG(4) << "merge " << merged_var_num << " " << var_name << " use time " << after_merge - before_merge; auto send_functor = distributed::ParameterSend(); send_functor(ctx, *send_scope_, true, 1); auto after_send = GetCurrentUS(); VLOG(4) << "send " << var_name << " use time " << after_send - after_merge; }; task_futures.emplace_back( send_threadpool_->enqueue(std::move(send_task))); } else { VLOG(4) << var_name << " queue empty"; } } for (auto &task_f : task_futures) { task_f.wait(); } auto after_run_send_graph = GetCurrentUS(); VLOG(4) << "run send graph use time " << after_run_send_graph - before_run_send_graph; Recv(); } VLOG(0) << "communicator stopped, send thread exit"; } void AsyncCommunicator::RecvThread() { VLOG(3) << "RecvThread start!"; while (running_) { int grad_num = grad_num_.load(); if (grad_num > min_send_grad_num_before_recv_) { VLOG(1) << "current grad num " << grad_num; RecvAll(); grad_num_.store(0); } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } VLOG(0) << "communicator stopped, recv thread exit"; } void AsyncCommunicator::Recv() { if (independent_recv_thread_) { return; } auto grad_num = grad_num_.load(); if (grad_num > 0) { RecvAll(); grad_num_.store(0); } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void AsyncCommunicator::RecvAll() { VLOG(3) << "parallel run recv graph"; if (!running_) return; auto before_send = GetCurrentUS(); std::vector> task_futures; task_futures.reserve(recv_varname_to_ctx_.size()); for (auto &iter : recv_varname_to_ctx_) { auto recv_task = [this, &iter] { auto &var_name = iter.first; VLOG(4) << "recv var " << var_name; auto recv_functor = distributed::ParameterRecv(); recv_functor(iter.second, *recv_scope_); }; task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task))); } for (auto &task : task_futures) { task.wait(); } auto after_recv = GetCurrentUS(); VLOG(3) << "run recv graph use time " << after_recv - before_send; } void AsyncCommunicator::Start() { VLOG(0) << "Communicator start"; if (!communicator_) { VLOG(0) << "Communicator is not inited, do nothing"; } else { VLOG(1) << "start send thread and recv thread"; running_ = true; // start send and recv thread send_thread_.reset( new std::thread(std::bind(&AsyncCommunicator::SendThread, this))); if (independent_recv_thread_) { recv_thread_.reset( new std::thread(std::bind(&AsyncCommunicator::RecvThread, this))); } } } void AsyncCommunicator::Stop() { VLOG(0) << "Communicator stop"; running_ = false; if (!communicator_) { VLOG(0) << "Communicator is not inited, do nothing"; } else { if (send_thread_) { VLOG(1) << "stop send thread"; send_thread_->join(); send_thread_.reset(nullptr); } if (recv_thread_) { VLOG(1) << "stop recv thread"; recv_thread_->join(); recv_thread_.reset(nullptr); } } VLOG(0) << "Communicator stop done"; } void AsyncCommunicator::Send(const std::vector &var_names, const std::vector &var_tables, const framework::Scope &scope) { PADDLE_ENFORCE_EQ( var_names.size(), 1, platform::errors::InvalidArgument("var_names.size() == 1 is permitted")); auto var_name = var_names[0]; // push var into send queue by var_name auto *grad_var = scope.FindVar(var_name); PADDLE_ENFORCE_EQ( grad_var->IsInitialized(), true, platform::errors::InvalidArgument("grad var should be inited")); auto tmp_grad_var = std::make_shared(); framework::CopyVariable(*grad_var, tmp_grad_var.get()); auto &queue = send_varname_to_queue_.at(var_name); VLOG(3) << "send " << var_name << " queue size " << queue->Size(); queue->Push(tmp_grad_var); } GeoSgdCommunicator::~GeoSgdCommunicator() { running_ = false; if (send_thread_) send_thread_->join(); } void GeoSgdCommunicator::InitImpl(const paddle::framework::ProgramDesc &program, Scope *recv_scope) { training_scope_ = std::move(recv_scope); auto geo_send_varnames = envs["geo_send_varnames"]; auto varnames = paddle::string::Split(geo_send_varnames, '#'); for (auto &var_name : varnames) { auto var_attr_str = envs.at(var_name); auto var_attrs = paddle::string::Split(var_attr_str, '#'); auto split_varnames = paddle::string::Split(var_attrs[0], '&'); auto sections = paddle::string::Split(var_attrs[1], '&'); auto endpoints = paddle::string::Split(var_attrs[2], '&'); bool is_sparse = static_cast(std::stoi(var_attrs[3])); std::string send_var_name = VarToDeltaVar(var_name); std::vector send_var_names; for (auto origin_var_name : split_varnames) { send_var_names.push_back(VarToDeltaVar(origin_var_name)); } std::vector vars_sections_int = {}; for (std::string str : sections) { int64_t str2i = std::stol(str.c_str()); vars_sections_int.push_back(str2i); } var_list_[var_name] = is_sparse; send_varname_to_ctx_[send_var_name] = operators::distributed::RpcContext( send_var_name, send_var_names, endpoints, vars_sections_int, 0); recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext( var_name, split_varnames, endpoints, vars_sections_int, 0); absolute_section_[var_name] = operators::ToAbsoluteSection( send_varname_to_ctx_[send_var_name].height_sections); vars_first_dimension_[var_name] = 0; for (int64_t section : vars_sections_int) { vars_first_dimension_[var_name] += section; } send_var_nums_ += split_varnames.size(); } if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) { LOG(WARNING) << "no var need to send and recv!!"; } send_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); need_push_queue_ = std::make_shared>>( geo_need_push_nums_); delta_scope_.reset(new Scope()); old_scope_.reset(new Scope()); pserver_scope_.reset(new Scope()); } void GeoSgdCommunicator::Start() { VLOG(0) << "Geo Sgd Communicator start"; if (!communicator_) { VLOG(0) << "Geo Sgd Communicator is not inited, do nothing"; } else { VLOG(0) << "start send thread "; running_ = true; // start send and recv thread send_thread_.reset( new std::thread(std::bind(&GeoSgdCommunicator::SendThread, this))); } } void GeoSgdCommunicator::Stop() { VLOG(0) << "Geo Sgd Communicator stop"; running_ = false; if (!communicator_) { VLOG(0) << "Geo Sgd Communicator is not inited, do nothing"; } else { if (send_thread_) { VLOG(1) << "stop send thread"; send_thread_->join(); send_thread_.reset(nullptr); } } VLOG(0) << "Geo Sgd Communicator stop done"; } void GeoSgdCommunicator::Send(const std::vector &sparse_var_names, const std::vector &sparse_var_tables, const framework::Scope &scope) { if (sparse_var_names.size() == 1 && sparse_var_names[0] == "param_init") { for (auto &iter : var_list_) { // For sparse param, old_scope store LoDTensor, // pserver_scope store SelectedRows. auto local_var_name = iter.first; if (var_list_[local_var_name] == true) { GeoSgdSparseParamInit(training_scope_, pserver_scope_.get(), local_var_name); } else { GeoSgdDenseParamInit(training_scope_, pserver_scope_.get(), local_var_name); } GeoSgdDenseParamInit(training_scope_, old_scope_.get(), local_var_name); } return; } std::shared_ptr ids_table = std::make_shared(); auto before_run_send = GetCurrentUS(); for (size_t i = 0; i < sparse_var_tables.size(); i++) { if (ids_table->find(sparse_var_tables[i]) == ids_table->end()) { // create empty set for new sparse var auto splited_var_nums = recv_varname_to_ctx_[sparse_var_tables[i]].splited_var_names.size(); ids_table->insert( std::pair>>( sparse_var_tables[i], std::vector>{splited_var_nums})); } auto *var = scope.FindVar(sparse_var_names[i]); auto var_tensor = var->Get(); int element_number = var_tensor.numel(); int *var_mutable_data = var_tensor.mutable_data(var_tensor.place()); // insert ids which has not been record for (int j = 0; j < element_number; j++) { auto ep_idx = GetSectionIndex(var_mutable_data[j], absolute_section_[sparse_var_tables[i]]); ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]); VLOG(4) << "Sparse var " << sparse_var_tables[i] << " insert " << var_mutable_data[j]; } } need_push_queue_->Push(ids_table); auto after_run_send = GetCurrentUS(); VLOG(4) << "run send_op use time " << after_run_send - before_run_send; } void GeoSgdCommunicator::SendThread() { VLOG(0) << "SendThread start!"; auto before_run_training = GetCurrentUS(); while (running_) { std::vector> task_futures; task_futures.reserve(send_var_nums_); int wait_times = 0; while (ids_send_vec_.size() < static_cast(geo_need_push_nums_)) { VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size(); if (need_push_queue_->Size() > 0) { wait_times = 0; ids_send_vec_.push_back(*(need_push_queue_->Pop())); VLOG(4) << "ids_send_vec_ pushed"; } else if (need_push_queue_->Size() == 0) { VLOG(4) << "wait_times -> " << wait_times; if (wait_times >= send_wait_times_) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); wait_times++; continue; } } if (ids_send_vec_.size() >= static_cast(geo_need_push_nums_)) { auto after_run_training = GetCurrentUS(); VLOG(4) << "run Training use time " << after_run_training - before_run_training; before_run_training = GetCurrentUS(); VLOG(4) << "Start send after get need_push_num"; for (auto &iter : send_varname_to_ctx_) { auto &var_name = iter.first; if (var_list_[DeltaVarToVar(var_name)] == true) { // sparse var: merge->send->recv for (auto &splited_var_name : iter.second.splited_var_names) { auto send_task = [this, &var_name, &splited_var_name] { auto before_run_geo = GetCurrentUS(); VLOG(4) << "ids_send_vec_ size: " << ids_send_vec_.size(); auto ids_set = SparseIdsMerge(ids_send_vec_, var_name, splited_var_name); SendUpdateSparseVars(var_name, splited_var_name, ids_set); RecvUpdateSparseVars(var_name, splited_var_name); auto after_run_geo = GetCurrentUS(); VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time " << after_run_geo - before_run_geo; }; task_futures.emplace_back( send_threadpool_->enqueue(std::move(send_task))); } } else { for (auto &splited_var_name : iter.second.splited_var_names) { auto send_task = [this, &var_name, &splited_var_name] { auto before_run_geo = GetCurrentUS(); SendUpdateDenseVars(var_name, splited_var_name); RecvUpdateDenseVars(var_name, splited_var_name); auto after_run_geo = GetCurrentUS(); VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time " << after_run_geo - before_run_geo; }; task_futures.emplace_back( send_threadpool_->enqueue(std::move(send_task))); } } } for (auto &task_f : task_futures) { task_f.wait(); } ids_send_vec_.clear(); } } } std::unordered_set GeoSgdCommunicator::SparseIdsMerge( const std::vector &ids_send_vec, const std::string &var_name, const std::string &splited_var_name) { // every batch has some sparse id, merge them into one unoredered_set VLOG(4) << "Sparse Ids merge var: " << var_name << " splited var: " << splited_var_name; auto before_run_ids_merge_ = GetCurrentUS(); auto origin_var_name = DeltaVarToVar(var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); std::unordered_set ids_set; for (auto ids_map : ids_send_vec) { for (auto id : ids_map[origin_var_name][splited_var_index]) { ids_set.insert(id); } } auto after_run_ids_merge_ = GetCurrentUS(); VLOG(4) << "run SparseIdsMerge " << splited_var_name << " has nums " << ids_set.size() << " use time " << after_run_ids_merge_ - before_run_ids_merge_; return ids_set; } void GeoSgdCommunicator::SendUpdateDenseVars( const std::string &var_name, const std::string &splited_var_name) { // calc var_delata = (var_training - var_old)/trainer_nums // calc var_old += var_delta // var_name: param.delta auto origin_var_name = DeltaVarToVar(var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); VLOG(4) << "Dense var: " << var_name << " 's splited var: " << splited_var_name << " splited var index: " << splited_var_index; auto before_run_send_dense = GetCurrentUS(); auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); auto dims = var_x_tensor.dims(); auto total_element = var_x_tensor.numel(); int64_t section = 0; int64_t begin_loc = 0; int64_t dimension = 0; size_t out_num = send_varname_to_ctx_[var_name].height_sections.size(); if (out_num > 1) { section = send_varname_to_ctx_[var_name].height_sections[splited_var_index]; dims[0] = section; begin_loc = absolute_section_[origin_var_name][splited_var_index]; dimension = total_element / vars_first_dimension_[origin_var_name]; total_element = section * dimension; VLOG(4) << "Dense splited var: " << splited_var_name << " section: " << section << " dimension: " << dimension << " begin loc: " << begin_loc << " total_element " << total_element; } auto *var_x_data = var_x_tensor.mutable_data(var_x_tensor.place()) + begin_loc * dimension; VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " << var_x_data[0] << " var_x_data[end] " << var_x_data[total_element - 1]; auto *var_y_data = var_y_tensor.mutable_data(var_y_tensor.place()) + begin_loc * dimension; VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " << var_y_data[0] << " var_y_data[end] " << var_y_data[total_element - 1]; // create delta var in delta scope auto *var_z_tensor = delta_scope_->Var(splited_var_name)->GetMutable(); var_z_tensor->Resize(dims); var_z_tensor->mutable_data(dims, cpu_ctx.GetPlace()); auto *var_z_data = var_z_tensor->mutable_data(cpu_ctx.GetPlace()); VLOG(4) << "Dense splited var: " << splited_var_name << "var_z_data[0] " << var_z_data[0] << " var_z_data[end] " << var_z_data[total_element - 1]; // calc sub = var_training - var_old auto blas = math::GetBlas(cpu_ctx); blas.VSUB(total_element, var_x_data, var_y_data, var_z_data); VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] " << var_z_data[0] << " var_z_data[end] " << var_z_data[total_element - 1]; // calc var_delta = sub / trainer_nums float trainer_param = 1.0 / static_cast(trainer_nums_); blas.SCAL(total_element, trainer_param, var_z_data); // calc var_old += var_delta blas.VADD(total_element, var_y_data, var_z_data, var_y_data); VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " << var_y_data[0] << " var_y_data[end] " << var_y_data[total_element - 1]; auto after_run_send_dense = GetCurrentUS(); VLOG(4) << "run send update dense var " << var_name << " use time " << after_run_send_dense - before_run_send_dense; auto before_send_dense = GetCurrentUS(); RpcSend(var_name, splited_var_name, splited_var_index); auto after_send_dense = GetCurrentUS(); VLOG(4) << "send " << splited_var_name << " use time " << after_send_dense - before_send_dense; } void GeoSgdCommunicator::SendUpdateSparseVars( const std::string &var_name, const std::string &splited_var_name, const std::unordered_set &ids_table) { // calc var_delata = (var_training - var_old)/trainer_nums // calc var_old += var_delta // var_name: param.delta, splited_var_name: param.block0.delta // origin_var_name: param auto before_run_send_sparse = GetCurrentUS(); auto ids_num = ids_table.size(); VLOG(4) << "Sparse Ids nums is : " << ids_num; auto origin_var_name = DeltaVarToVar(var_name); auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); auto *var_y = old_scope_.get()->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); auto dims = var_x_tensor.dims(); auto row_numel = dims[1]; float *x_value = var_x_tensor.mutable_data(var_x_tensor.place()); float *y_value = var_y_tensor.mutable_data(var_y_tensor.place()); auto *var_z = delta_scope_->Var(splited_var_name); auto *var_z_select_rows = var_z->GetMutable(); auto *var_z_value = var_z_select_rows->mutable_value(); var_z_value->Resize({static_cast(ids_num), row_numel}); auto *z_value = var_z_value->mutable_data(var_x_tensor.place()); std::vector new_rows; new_rows.insert(new_rows.begin(), ids_table.begin(), ids_table.end()); auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto blas = math::GetBlas(cpu_ctx); float avg = 1 / static_cast(trainer_nums_); for (size_t y = 0; y < new_rows.size(); y++) { auto ids = new_rows[y]; float *x_val = x_value + ids * row_numel; float *y_val = y_value + ids * row_numel; float *z_val = z_value + y * row_numel; std::vector row_delta(row_numel, 0); blas.VSUB(row_numel, x_val, y_val, row_delta.data()); blas.SCAL(row_numel, avg, row_delta.data()); blas.VADD(row_numel, row_delta.data(), y_val, y_val); blas.VCOPY(row_numel, row_delta.data(), z_val); } auto after_run_send_sparse = GetCurrentUS(); VLOG(4) << "run send update sparse var " << splited_var_name << " use time " << after_run_send_sparse - before_run_send_sparse; auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); std::vector send_rows; send_rows.reserve(new_rows.size()); for (auto idx : new_rows) { send_rows.push_back(idx - absolute_section_[origin_var_name][splited_var_index]); } var_z_select_rows->set_rows(send_rows); var_z_select_rows->set_height( send_varname_to_ctx_[var_name].height_sections[splited_var_index]); auto before_send_sparse = GetCurrentUS(); RpcSend(var_name, splited_var_name, splited_var_index); auto after_send_sparse = GetCurrentUS(); VLOG(4) << "send " << splited_var_name << " has nums " << new_rows.size() << " use time " << after_send_sparse - before_send_sparse; } void GeoSgdCommunicator::RecvUpdateDenseVars( const std::string &var_name, const std::string &splited_var_name) { // calc var_training += var_pserver - var_old // calc var_old = var_pserver // var_name: param.delta // step1: recv dense var from pserver auto origin_var_name = DeltaVarToVar(var_name); auto origin_splited_var_name = DeltaVarToVar(splited_var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto before_run_recv = GetCurrentUS(); VLOG(4) << "Dense recv origin_var_name: " << origin_var_name << " origin_splited_var_name: " << origin_splited_var_name << " splited_var_index: " << splited_var_index; RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); auto after_run_recv = GetCurrentUS(); VLOG(4) << "recv var " << origin_splited_var_name << " use time " << after_run_recv - before_run_recv; // step2: update dense var auto before_run_update = GetCurrentUS(); auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name); auto var_z_tensor = var_z->Get(); auto dims = var_z_tensor.dims(); auto total_element = var_z_tensor.numel(); int64_t section = 0; int64_t begin_loc = 0; int64_t dimension = 0; size_t out_num = recv_varname_to_ctx_[origin_var_name].height_sections.size(); if (out_num > 1) { section = dims[0]; begin_loc = absolute_section_[origin_var_name][splited_var_index]; dimension = total_element / section; VLOG(4) << "Dense splited var: " << splited_var_name << " section: " << section << " dimension: " << dimension << " begin loc: " << begin_loc << " total_element " << total_element; } auto *var_x_data = var_x_tensor.mutable_data(var_x_tensor.place()) + begin_loc * dimension; VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " << var_x_data[0] << " var_x_data[end] " << var_x_data[total_element - 1]; auto *var_y_data = var_y_tensor.mutable_data(var_y_tensor.place()) + begin_loc * dimension; VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " << var_y_data[0] << " var_y_data[end] " << var_y_data[total_element - 1]; auto *var_z_data = var_z_tensor.mutable_data(cpu_ctx.GetPlace()); VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] " << var_z_data[0] << " var_z_data[end] " << var_z_data[total_element - 1]; auto *var_y_sub_tensor = old_scope_->Var(origin_splited_var_name) ->GetMutable(); var_y_sub_tensor->Resize(dims); var_y_sub_tensor->mutable_data(dims, cpu_ctx.GetPlace()); auto *var_y_sub_data = var_y_sub_tensor->mutable_data(cpu_ctx.GetPlace()); VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] " << var_y_sub_data[0] << " var_y_sub_data[end] " << var_y_sub_data[total_element - 1]; auto blas = math::GetBlas(cpu_ctx); // calc sub = pserver - old blas.VSUB(total_element, var_z_data, var_y_data, var_y_sub_data); VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] " << var_y_sub_data[0] << " var_y_sub_data[end] " << var_y_sub_data[total_element - 1]; // calc train += sub blas.VADD(total_element, var_x_data, var_y_sub_data, var_x_data); VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " << var_x_data[0] << " var_x_data[end] " << var_x_data[total_element - 1]; // calc old = pserver blas.VCOPY(total_element, var_z_data, var_y_data); VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " << var_y_data[0] << " var_y_data[end] " << var_y_data[total_element - 1]; auto after_run_update = GetCurrentUS(); VLOG(4) << "dense var update " << origin_splited_var_name << " use time " << after_run_update - before_run_update; } void GeoSgdCommunicator::RecvUpdateSparseVars( const std::string &var_name, const std::string &splited_var_name) { // step 1: recv splited var from pserver auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); auto origin_var_name = DeltaVarToVar(var_name); auto origin_splited_var_name = DeltaVarToVar(splited_var_name); auto before_run_recv = GetCurrentUS(); RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); auto after_run_recv = GetCurrentUS(); VLOG(4) << "recv var " << origin_splited_var_name << " use time " << after_run_recv - before_run_recv; // step 2: update sparse var auto before_run_update = GetCurrentUS(); auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); auto dims = var_x_tensor.dims(); float *x_value = var_x_tensor.mutable_data(var_x_tensor.place()); auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); float *y_value = var_y_tensor.mutable_data(var_y_tensor.place()); auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name); auto var_z_slr = var_z->GetMutable(); auto row_size = var_z_slr->rows().size(); std::vector new_rows; new_rows.reserve(row_size); for (auto ids : var_z_slr->rows()) { new_rows.push_back(ids + absolute_section_[origin_var_name][splited_var_index]); } auto *new_value = var_z_slr->mutable_value(); auto row_numel = dims[1]; auto *z_value = new_value->mutable_data(var_x_tensor.place()); auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto blas = math::GetBlas(cpu_ctx); for (size_t y = 0; y < new_rows.size(); y++) { std::vector row_delta(row_numel, 0); auto ids = new_rows[y]; float *x_val = x_value + ids * row_numel; float *y_val = y_value + ids * row_numel; float *z_val = z_value + y * row_numel; blas.VSUB(row_numel, z_val, y_val, row_delta.data()); blas.VADD(row_numel, row_delta.data(), x_val, x_val); blas.VCOPY(row_numel, z_val, y_val); } auto after_run_update = GetCurrentUS(); VLOG(4) << "sparse var recv update " << origin_splited_var_name << " has num " << new_rows.size() << " use time " << after_run_update - before_run_update; } void GeoSgdCommunicator::GeoSgdSparseParamInit(framework::Scope *scope_x, framework::Scope *scope_y, const std::string var_name) { // create selectedrows var from lodtensor var info auto *var_x = scope_x->Var(var_name); auto *var_y = scope_y->Var(var_name); auto var_x_tensor = var_x->Get(); auto *var_y_select_rows = var_y->GetMutable(); auto dims = var_x_tensor.dims(); auto rows = dims[0]; auto row_numel = dims[1]; var_y_select_rows->set_height(rows); std::vector new_rows{}; var_y_select_rows->set_rows(new_rows); auto *var_y_value = var_y_select_rows->mutable_value(); var_y_value->Resize({rows, row_numel}); var_y_value->mutable_data(var_x_tensor.place()); } void GeoSgdCommunicator::GeoSgdDenseParamInit(framework::Scope *scope_x, framework::Scope *scope_y, const std::string var_name) { auto *var_x = scope_x->Var(var_name); auto *var_y = scope_y->Var(var_name); framework::CopyVariable(*var_x, var_y); } void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name, const std::string &splited_var_name, const size_t &splited_var_index) { auto trainer_id = send_varname_to_ctx_[origin_var_name].trainer_id; auto endpoint = send_varname_to_ctx_[origin_var_name].epmap[splited_var_index]; platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &cpu_ctx_send = *pool.Get(platform::CPUPlace()); distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(trainer_id); auto handle = rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(), splited_var_name); handle->Wait(); } void GeoSgdCommunicator::RpcRecv(const std::string &var_name, const std::string &splited_var_name, const size_t &splited_var_index) { auto train_id = recv_varname_to_ctx_[var_name].trainer_id; auto endpoint = recv_varname_to_ctx_[var_name].epmap[splited_var_index]; platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace()); distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(train_id); pserver_scope_->Var(splited_var_name); auto handle = rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, *pserver_scope_.get(), splited_var_name, splited_var_name, splited_var_name); handle->Wait(); } void GeoSgdCommunicator::Recv() {} void HalfAsyncCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, const RpcCtxMap &recv_varname_to_ctx, Scope *recv_scope) { send_varname_to_ctx_ = std::move(send_varname_to_ctx); recv_varname_to_ctx_ = std::move(recv_varname_to_ctx); recv_scope_ = std::move(recv_scope); if (send_varname_to_ctx.size() == 0) { VLOG(0) << "nothing need to be send, will not start send_thread"; } else { send_scope_.reset(new Scope()); for (auto &iter : send_varname_to_ctx_) { send_varname_to_queue_[iter.first] = std::make_shared>>( send_queue_size_); } consume_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); } if (recv_varname_to_ctx.size() == 0) { VLOG(0) << "nothing need to be received, will not start recv_thread"; } else { recv_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); } } void HalfAsyncCommunicator::InitImpl( const paddle::framework::ProgramDesc &program, Scope *param_scope) { RpcCtxMap send_varname_to_ctx; RpcCtxMap recv_varname_to_ctx; for (auto *op : program.Block(0).AllOps()) { VLOG(3) << "node name " << op->Type(); if (op->Type() == "send") { auto send_var_name = op->Input("X")[0]; auto send_varnames = boost::get>( op->GetNullableAttr("send_varnames")); auto epmap = boost::get>(op->GetNullableAttr("epmap")); auto height_section = boost::get>(op->GetNullableAttr("sections")); auto trainer_id = boost::get(op->GetNullableAttr("trainer_id")); send_varname_to_ctx[send_var_name] = operators::distributed::RpcContext( send_var_name, send_varnames, epmap, height_section, trainer_id); VLOG(3) << "find and init an send op: " << send_varname_to_ctx[send_var_name]; } else if (op->Type() == "recv") { auto do_not_run = boost::get(op->GetNullableAttr("do_not_run")); PADDLE_ENFORCE_GT(do_not_run, 0, platform::errors::InvalidArgument( "recv op's attr `do_not_run` must be True!")); auto recv_var_name = op->Output("Out")[0]; auto recv_varnames = boost::get>( op->GetNullableAttr("recv_varnames")); auto epmap = boost::get>(op->GetNullableAttr("epmap")); auto trainer_id = boost::get(op->GetNullableAttr("trainer_id")); recv_varname_to_ctx[recv_var_name] = operators::distributed::RpcContext( recv_var_name, recv_varnames, epmap, {}, trainer_id); VLOG(3) << "find and init an recv op: " << recv_varname_to_ctx[recv_var_name]; } } // init communicator here if (send_varname_to_ctx.size() == 0 && recv_varname_to_ctx.size() == 0) { LOG(WARNING) << "no var need to send and recv!!"; } operators::distributed::HalfAsyncCommunicator::InitImpl( send_varname_to_ctx, recv_varname_to_ctx, param_scope); } HalfAsyncCommunicator::~HalfAsyncCommunicator() { running_ = false; if (consume_thread_) consume_thread_->join(); } void HalfAsyncCommunicator::ConsumeThread() { VLOG(3) << "ConsumeThread start!"; while (running_) { while (running_) { if (barrier_counter_.load() >= barrier_trigger_.load() && barrier_trigger_.load() != 0) { break; } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } std::vector> task_futures; task_futures.reserve(send_varname_to_ctx_.size()); VLOG(3) << "run send graph"; auto before_run_send_graph = GetCurrentUS(); for (auto &iter : send_varname_to_queue_) { auto &var_name = iter.first; auto &var_queue = iter.second; if (var_queue->Size() > 0) { auto send_task = [this, &var_name, &var_queue] { VLOG(3) << var_name << " merge and send"; std::vector> vars; size_t merged_var_num = 0; size_t wait_times = 0; while (merged_var_num < static_cast(max_merge_var_num_)) { if (var_queue->Size() == 0) { VLOG(3) << "wait_times -> " << wait_times; if (wait_times >= static_cast(send_wait_times_)) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); wait_times++; continue; } else { wait_times = 0; vars.push_back(var_queue->Pop()); merged_var_num++; } } auto before_merge = GetCurrentUS(); MergeVars(var_name, vars, send_scope_.get(), false); auto after_merge = GetCurrentUS(); VLOG(3) << "merge " << merged_var_num << " " << var_name << " use time " << after_merge - before_merge; auto send_functor = distributed::ParameterSend(); auto &ctx = send_varname_to_ctx_.at(var_name); send_functor(ctx, *send_scope_, true, 1); auto after_send = GetCurrentUS(); VLOG(3) << "send " << var_name << " use time " << after_send - after_merge; }; task_futures.emplace_back( consume_threadpool_->enqueue(std::move(send_task))); } else { VLOG(4) << var_name << " queue empty"; } } for (auto &task_f : task_futures) { task_f.wait(); } auto after_run_send_graph = GetCurrentUS(); VLOG(3) << "run send graph use time " << after_run_send_graph - before_run_send_graph; BarrierSend(); Recv(); BarrierRecv(); BarrierWeakUp(); } VLOG(0) << "communicator stopped, send thread exit"; } void HalfAsyncCommunicator::Send(const std::vector &var_names, const std::vector &var_tables, const framework::Scope &scope) { PADDLE_ENFORCE_EQ( var_names.size(), 1, platform::errors::InvalidArgument("var_names.size() == 1 is permitted")); auto var_name = var_names[0]; VLOG(3) << "communicator send " << var_name; // push var into send queue by var_name auto *grad_var = scope.FindVar(var_name); PADDLE_ENFORCE_EQ( grad_var->IsInitialized(), true, platform::errors::InvalidArgument("grad var should is not initialized.")); auto tmp_grad_var = std::make_shared(); framework::CopyVariable(*grad_var, tmp_grad_var.get()); auto &queue = send_varname_to_queue_.at(var_name); VLOG(3) << "send " << var_name << " queue size " << queue->Size(); queue->Push(tmp_grad_var); } void HalfAsyncCommunicator::Recv() { VLOG(3) << "parallel run recv graph"; if (!running_) return; auto before_send = GetCurrentUS(); std::vector> task_futures; task_futures.reserve(recv_varname_to_ctx_.size()); for (auto &iter : recv_varname_to_ctx_) { auto recv_task = [this, &iter] { auto &var_name = iter.first; VLOG(4) << "recv var " << var_name; auto recv_functor = distributed::ParameterRecv(); recv_functor(iter.second, *recv_scope_); }; task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task))); } for (auto &task : task_futures) { task.wait(); } auto after_recv = GetCurrentUS(); VLOG(3) << "run recv graph use time " << after_recv - before_send; } void HalfAsyncCommunicator::Barrier() { barrier_counter_++; { std::unique_lock lk(barrier_mutex_); barrier_cond_.wait(lk, [this] { return (barrier_counter_ == 0); }); } } void HalfAsyncCommunicator::BarrierTriggerDecrement() { barrier_trigger_--; VLOG(3) << "BarrierTriggerDecrement decrement barrier trigger to " << barrier_trigger_.load(); } void HalfAsyncCommunicator::BarrierTriggerReset(int initial_val) { barrier_trigger_.store(initial_val); VLOG(3) << "BarrierTriggerReset reset barrier trigger to " << barrier_trigger_.load(); } void HalfAsyncCommunicator::BarrierWeakUp() { barrier_counter_.store(0); barrier_cond_.notify_all(); } void HalfAsyncCommunicator::Start() { VLOG(0) << "Communicator start"; if (!communicator_) { VLOG(0) << "Communicator is not inited, do nothing"; } else { VLOG(1) << "start send thread and recv thread"; BarrierTriggerReset(max_merge_var_num_); running_ = true; consume_thread_.reset(new std::thread( std::bind(&HalfAsyncCommunicator::ConsumeThread, this))); } } void HalfAsyncCommunicator::Stop() { VLOG(0) << "Communicator stop"; running_ = false; if (!communicator_) { VLOG(0) << "Communicator is not inited, do nothing"; } else { if (consume_thread_) { VLOG(4) << "stop send thread"; consume_thread_->join(); consume_thread_.reset(nullptr); } } VLOG(0) << "Communicator stop done"; } void SyncCommunicator::BarrierSend() { if (!running_) return; distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(trainer_id_); std::vector rets; for (auto &ep : pserver_endpoints_) { rets.push_back(rpc_client->AsyncSendBatchBarrier(ep)); } for (size_t i = 0; i < rets.size(); i++) { PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, platform::errors::External( "internal error in RPCClient")); } VLOG(4) << "BarrierSend with SyncCommunicator"; } void SyncCommunicator::BarrierRecv() { if (!running_) return; distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(trainer_id_); std::vector rets; for (auto &ep : pserver_endpoints_) { rets.push_back(rpc_client->AsyncSendFetchBarrier(ep)); } for (size_t i = 0; i < rets.size(); i++) { PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, platform::errors::External( "internal error in RPCClient")); } VLOG(4) << "BarrierRecv with SyncCommunicator"; } SyncCommunicator::~SyncCommunicator() { running_ = false; if (consume_thread_) consume_thread_->join(); } } // namespace distributed } // namespace operators } // namespace paddle