未验证 提交 bad312c3 编写于 作者: C Chengmo 提交者: GitHub

[cherry-pick][release/1.6]Speed GEO-SGD (#20177)

* Speed GEO-SGD (#20158)

* delete debug vlog & add rpc function & fix word2vec bug & speed GEO-SGD
上级 2691de18
...@@ -24,6 +24,7 @@ limitations under the License. */ ...@@ -24,6 +24,7 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h" #include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h" #include "paddle/fluid/operators/distributed/parameter_send.h"
...@@ -62,23 +63,6 @@ inline void VSUB(int n, const T *x, const T *y, T *z) { ...@@ -62,23 +63,6 @@ inline void VSUB(int n, const T *x, const T *y, T *z) {
} }
} }
inline std::vector<int> bucket(const int v_size, const int b_size) {
int remainder = v_size % b_size;
int bucket = v_size / b_size;
std::vector<int> ret_vec(b_size, bucket);
for (int i = 0; i < remainder; ++i) {
ret_vec[i] = ret_vec[i] + 1;
}
int cur_bucket = 0;
for (int j = 0; j < ret_vec.size(); ++j) {
int tmp = ret_vec[j];
ret_vec[j] = cur_bucket;
cur_bucket += tmp;
}
ret_vec.push_back(cur_bucket);
return ret_vec;
}
std::once_flag Communicator::init_flag_; std::once_flag Communicator::init_flag_;
std::shared_ptr<Communicator> Communicator::communicator_(nullptr); std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
...@@ -447,6 +431,14 @@ void GeoSgdCommunicator::InitImpl( ...@@ -447,6 +431,14 @@ void GeoSgdCommunicator::InitImpl(
send_var_name, send_var_names, vars_epmap, vars_sections_int, 0); send_var_name, send_var_names, vars_epmap, vars_sections_int, 0);
recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext( recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext(
var_name, vars_names, vars_epmap, vars_sections_int, 0); var_name, vars_names, vars_epmap, vars_sections_int, 0);
// record sparse section
if (is_sparse) {
need_thread_nums_ +=
send_varname_to_ctx_[send_var_name].height_sections.size();
absolute_section_[var_name] = operators::ToAbsoluteSection(
send_varname_to_ctx_[send_var_name].height_sections);
}
} }
if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) { if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) {
...@@ -454,17 +446,12 @@ void GeoSgdCommunicator::InitImpl( ...@@ -454,17 +446,12 @@ void GeoSgdCommunicator::InitImpl(
} }
send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
need_push_queue_ = need_push_queue_ =
std::make_shared<BlockingQueue<std::shared_ptr<SparseIdsMap>>>( std::make_shared<BlockingQueue<std::shared_ptr<SparseIdsMap>>>(
geo_need_push_nums); geo_need_push_nums);
delta_scope_.reset(new Scope()); delta_scope_.reset(new Scope());
old_scope_.reset(new Scope()); old_scope_.reset(new Scope());
pserver_scope_.reset(new Scope()); pserver_scope_.reset(new Scope());
// for coverage test, please ignore follow code
InitImpl(send_varname_to_ctx_, recv_varname_to_ctx_, training_scope_);
InitImpl(program, training_scope_);
} }
void GeoSgdCommunicator::Start() { void GeoSgdCommunicator::Start() {
...@@ -521,11 +508,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names, ...@@ -521,11 +508,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
const framework::Scope &scope) { const framework::Scope &scope) {
// SparseIdsMap = std::unordered_map<std::string,std::unordered_set<int64_t>> // SparseIdsMap = std::unordered_map<std::string,std::unordered_set<int64_t>>
std::shared_ptr<SparseIdsMap> ids_table = std::make_shared<SparseIdsMap>(); std::shared_ptr<SparseIdsMap> ids_table = std::make_shared<SparseIdsMap>();
auto before_run_send = GetCurrentUS();
for (size_t i = 0; i < sparse_var_tables.size(); i++) { for (size_t i = 0; i < sparse_var_tables.size(); i++) {
if (ids_table->find(sparse_var_tables[i]) == ids_table->end()) { if (ids_table->find(sparse_var_tables[i]) == ids_table->end()) {
// create empty set for new sparse var // create empty set for new sparse var
ids_table->insert(std::pair<std::string, std::unordered_set<int64_t>>( auto splited_var_nums =
sparse_var_tables[i], std::unordered_set<int64_t>{})); recv_varname_to_ctx_[sparse_var_tables[i]].splited_var_names.size();
ids_table->insert(
std::pair<std::string, std::vector<std::unordered_set<int64_t>>>(
sparse_var_tables[i],
std::vector<std::unordered_set<int64_t>>{splited_var_nums}));
} }
auto *var = scope.FindVar(sparse_var_names[i]); auto *var = scope.FindVar(sparse_var_names[i]);
auto var_tensor = var->Get<framework::LoDTensor>(); auto var_tensor = var->Get<framework::LoDTensor>();
...@@ -533,12 +525,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names, ...@@ -533,12 +525,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
int *var_mutable_data = var_tensor.mutable_data<int>(var_tensor.place()); int *var_mutable_data = var_tensor.mutable_data<int>(var_tensor.place());
// insert ids which has not been record // insert ids which has not been record
for (size_t j = 0; j < element_number; j++) { for (size_t j = 0; j < element_number; j++) {
ids_table->at(sparse_var_tables[i]).insert(var_mutable_data[j]); auto ep_idx = GetSectionIndex(var_mutable_data[j],
absolute_section_[sparse_var_tables[i]]);
ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]);
VLOG(4) << "Sparse var " << sparse_var_tables[i] << " insert " VLOG(4) << "Sparse var " << sparse_var_tables[i] << " insert "
<< var_mutable_data[j]; << var_mutable_data[j];
} }
} }
need_push_queue_->Push(ids_table); need_push_queue_->Push(ids_table);
auto after_run_send = GetCurrentUS();
VLOG(3) << "run send_op use time " << after_run_send - before_run_send;
} }
void GeoSgdCommunicator::SendThread() { void GeoSgdCommunicator::SendThread() {
...@@ -548,13 +544,22 @@ void GeoSgdCommunicator::SendThread() { ...@@ -548,13 +544,22 @@ void GeoSgdCommunicator::SendThread() {
while (running_) { while (running_) {
std::vector<std::future<void>> task_futures; std::vector<std::future<void>> task_futures;
task_futures.reserve(send_varname_to_ctx_.size()); task_futures.reserve(send_varname_to_ctx_.size());
auto before_run_send_graph = GetCurrentUS();
if (ids_send_vec_.size() < geo_need_push_nums_) { size_t wait_times = 0;
while (ids_send_vec_.size() < geo_need_push_nums_) {
VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size(); VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size();
if (need_push_queue_->Size() > 0) { if (need_push_queue_->Size() > 0) {
wait_times = 0;
ids_send_vec_.push_back(*(need_push_queue_->Pop())); ids_send_vec_.push_back(*(need_push_queue_->Pop()));
VLOG(4) << "ids_send_vec_ pushed"; VLOG(4) << "ids_send_vec_ pushed";
} else if (need_push_queue_->Size() == 0) {
VLOG(3) << "wait_times -> " << wait_times;
if (wait_times >= FLAGS_communicator_send_wait_times) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait_times++;
continue;
} }
} }
...@@ -567,86 +572,62 @@ void GeoSgdCommunicator::SendThread() { ...@@ -567,86 +572,62 @@ void GeoSgdCommunicator::SendThread() {
for (auto &iter : send_varname_to_ctx_) { for (auto &iter : send_varname_to_ctx_) {
auto &var_name = iter.first; auto &var_name = iter.first;
auto send_task = [this, &var_name] { if (var_list_[DeltaVarToVar(var_name)] == true) {
auto origin_var_name = DeltaVarToVar(var_name); // sparse var: merge->send->recv
for (auto &splited_var_name : iter.second.splited_var_names) {
if (var_list_[origin_var_name] == true) { auto send_task = [this, &var_name, &splited_var_name] {
auto ids_set = SparseIdsMerge(ids_send_vec_, origin_var_name); auto before_run_geo = GetCurrentUS();
SendUpdateSparseVars(origin_var_name, ids_set); auto ids_set =
} else { SparseIdsMerge(ids_send_vec_, var_name, splited_var_name);
SendUpdateDenseVars(origin_var_name); SendUpdateSparseVars(var_name, splited_var_name, ids_set);
RecvUpdateSparseVars(var_name, splited_var_name);
auto after_run_geo = GetCurrentUS();
VLOG(1) << "run GEO-SGD var " << splited_var_name << " use time "
<< after_run_geo - before_run_geo;
};
task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task)));
} }
auto before_send = GetCurrentUS(); } else {
auto send_functor = distributed::ParameterSend<float>(); auto send_task = [this, &var_name] {
auto &ctx = send_varname_to_ctx_.at(var_name); auto before_run_geo = GetCurrentUS();
send_functor(ctx, *delta_scope_.get(), true, 1); SendUpdateDenseVars(var_name);
RecvUpdateDenseVars(var_name);
auto after_send = GetCurrentUS(); auto after_run_geo = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time " VLOG(3) << "run GEO-SGD var " << var_name << " use time "
<< after_send - before_send; << after_run_geo - before_run_geo;
}; };
task_futures.emplace_back( task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task))); send_threadpool_->enqueue(std::move(send_task)));
}
} }
for (auto &task_f : task_futures) {
task_f.wait();
}
ids_send_vec_.clear();
} }
for (auto &task_f : task_futures) {
task_f.wait();
have_push_.fetch_add(1, std::memory_order_relaxed);
}
auto after_run_send_graph = GetCurrentUS();
VLOG(4) << "run send graph use time "
<< after_run_send_graph - before_run_send_graph;
Recv();
} }
} }
void GeoSgdCommunicator::Recv() {
auto push_nums = have_push_.load();
if (push_nums >= send_varname_to_ctx_.size()) {
ids_send_vec_.clear();
RecvAll();
have_push_.store(0);
}
}
void GeoSgdCommunicator::RecvAll() {
if (!running_) return;
auto before_recv = GetCurrentUS();
std::vector<std::future<void>> task_futures;
task_futures.reserve(recv_varname_to_ctx_.size());
for (auto &iter : recv_varname_to_ctx_) {
auto recv_task = [this, &iter] {
auto &var_name = iter.first;
auto recv_functor = distributed::ParameterRecv<float>();
auto before_parameter_recv = GetCurrentUS();
recv_functor(iter.second, *pserver_scope_.get());
auto after_parameter_recv = GetCurrentUS();
VLOG(3) << "run parameter recv var " << var_name << " use time "
<< after_parameter_recv - before_parameter_recv;
RecvUpdateVars(var_name);
};
task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
}
for (auto &task : task_futures) {
task.wait();
}
auto after_recv = GetCurrentUS();
VLOG(3) << "run recv graph use time " << after_recv - before_recv;
}
std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge( std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
const std::vector<SparseIdsMap> &ids_send_vec, const std::vector<SparseIdsMap> &ids_send_vec, const std::string &var_name,
const std::string &var_name) { const std::string &splited_var_name) {
// every batch has some sparse id, merge them into one unoredered_set // every batch has some sparse id, merge them into one unoredered_set
VLOG(3) << "Sparse Ids merge var: " << var_name
<< " splited var: " << splited_var_name;
auto before_run_ids_merge_ = GetCurrentUS(); auto before_run_ids_merge_ = GetCurrentUS();
auto origin_var_name = DeltaVarToVar(var_name);
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
std::unordered_set<int64_t> ids_set; std::unordered_set<int64_t> ids_set;
for (auto ids_map : ids_send_vec) { for (auto ids_map : ids_send_vec) {
for (auto id : ids_map[var_name]) { for (auto id : ids_map[origin_var_name][splited_var_index]) {
ids_set.insert(id); ids_set.insert(id);
} }
} }
auto after_run_ids_merge_ = GetCurrentUS(); auto after_run_ids_merge_ = GetCurrentUS();
VLOG(3) << "run SparseIdsMerge use time " VLOG(3) << "run SparseIdsMerge " << splited_var_name << " has nums "
<< ids_set.size() << " use time "
<< after_run_ids_merge_ - before_run_ids_merge_; << after_run_ids_merge_ - before_run_ids_merge_;
return ids_set; return ids_set;
} }
...@@ -654,24 +635,26 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge( ...@@ -654,24 +635,26 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) {
// calc var_delata = (var_training - var_old)/trainer_nums // calc var_delata = (var_training - var_old)/trainer_nums
// calc var_old += var_delta // calc var_old += var_delta
// var_name: param.delta
auto origin_var_name = DeltaVarToVar(var_name);
auto before_run_send_dense = GetCurrentUS(); auto before_run_send_dense = GetCurrentUS();
auto *var_x = training_scope_->FindVar(var_name); auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>(); auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_->FindVar(var_name); auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>(); auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto dims = var_x_tensor.dims(); auto dims = var_x_tensor.dims();
// create temp var for sub // create temp var for sub
auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name)); auto *var_y_sub = old_scope_->Var(var_name);
framework::CopyVariable(*var_y, var_y_sub); framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>(); auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
// create delta var in delta scope // create delta var in delta scope
auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name)); auto *var_z = delta_scope_->Var(var_name);
auto *var_z_tensor = var_z->GetMutable<framework::LoDTensor>(); auto *var_z_tensor = var_z->GetMutable<framework::LoDTensor>();
var_z_tensor->mutable_data<float>(dims, var_x_tensor.place()); var_z_tensor->mutable_data<float>(dims, var_x_tensor.place());
var_z_tensor->set_lod(var_x_tensor.lod()); var_z_tensor->set_lod(var_x_tensor.lod());
...@@ -702,20 +685,34 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { ...@@ -702,20 +685,34 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) {
auto after_run_send_dense = GetCurrentUS(); auto after_run_send_dense = GetCurrentUS();
VLOG(3) << "run send update dense var " << var_name << " use time " VLOG(3) << "run send update dense var " << var_name << " use time "
<< after_run_send_dense - before_run_send_dense; << after_run_send_dense - before_run_send_dense;
auto send_functor = distributed::ParameterSend<float>();
auto &ctx = send_varname_to_ctx_.at(var_name);
auto before_send_dense = GetCurrentUS();
send_functor(ctx, *delta_scope_.get(), true, 1);
auto after_send_denxe = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time "
<< after_send_denxe - before_send_dense;
} }
void GeoSgdCommunicator::SendUpdateSparseVars( void GeoSgdCommunicator::SendUpdateSparseVars(
const std::string &var_name, const std::unordered_set<int64_t> &ids_table) { const std::string &var_name, const std::string &splited_var_name,
const std::unordered_set<int64_t> &ids_table) {
// calc var_delata = (var_training - var_old)/trainer_nums // calc var_delata = (var_training - var_old)/trainer_nums
// calc var_old += var_delta // calc var_old += var_delta
// var_name: param.delta, splited_var_name: param.block0.delta
// origin_var_name: param
auto before_run_send_sparse = GetCurrentUS(); auto before_run_send_sparse = GetCurrentUS();
auto ids_num = ids_table.size(); auto ids_num = ids_table.size();
VLOG(3) << "Sparse Ids nums is : " << ids_num; VLOG(4) << "Sparse Ids nums is : " << ids_num;
auto *var_x = training_scope_->FindVar(var_name); auto origin_var_name = DeltaVarToVar(var_name);
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>(); auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_.get()->FindVar(var_name); auto *var_y = old_scope_.get()->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>(); auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto dims = var_x_tensor.dims(); auto dims = var_x_tensor.dims();
...@@ -724,7 +721,7 @@ void GeoSgdCommunicator::SendUpdateSparseVars( ...@@ -724,7 +721,7 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place()); float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place());
float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place()); float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place());
auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name)); auto *var_z = delta_scope_->Var(splited_var_name);
auto *var_z_select_rows = var_z->GetMutable<framework::SelectedRows>(); auto *var_z_select_rows = var_z->GetMutable<framework::SelectedRows>();
auto *var_z_value = var_z_select_rows->mutable_value(); auto *var_z_value = var_z_select_rows->mutable_value();
var_z_value->Resize({static_cast<int64_t>(ids_num), row_numel}); var_z_value->Resize({static_cast<int64_t>(ids_num), row_numel});
...@@ -732,135 +729,157 @@ void GeoSgdCommunicator::SendUpdateSparseVars( ...@@ -732,135 +729,157 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
std::vector<int64_t> new_rows; std::vector<int64_t> new_rows;
new_rows.insert(new_rows.begin(), ids_table.begin(), ids_table.end()); new_rows.insert(new_rows.begin(), ids_table.begin(), ids_table.end());
var_z_select_rows->set_rows(new_rows);
var_z_select_rows->set_height(new_rows.size()); auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// using multi thread speed sparse delta calc float avg = 1 / static_cast<float>(trainer_nums_);
std::vector<int> buts = for (int y = 0; y < new_rows.size(); y++) {
bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket); auto ids = new_rows[y];
std::vector<std::future<void>> fs;
float *x_val = x_value + ids * row_numel;
for (int x = 0; x < buts.size() - 1; x++) { float *y_val = y_value + ids * row_numel;
int start = buts[x]; float *z_val = z_value + y * row_numel;
int end = buts[x + 1];
float avg = 1 / static_cast<float>(trainer_nums_); std::vector<float> row_delta(row_numel, 0);
VSUB<float>(row_numel, x_val, y_val, row_delta.data());
fs.push_back( blas.SCAL(row_numel, avg, row_delta.data());
paddle::framework::Async([&x_value, &y_value, &z_value, &new_rows, blas.VADD(row_numel, row_delta.data(), y_val, y_val);
row_numel, start, end, avg]() { blas.VCOPY(row_numel, row_delta.data(), z_val);
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
for (int y = start; y < end; y++) {
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
std::vector<float> row_delta(row_numel, 0);
VSUB<float>(row_numel, x_val, y_val, row_delta.data());
blas.SCAL(row_numel, avg, row_delta.data());
blas.VADD(row_numel, row_delta.data(), y_val, y_val);
blas.VCOPY(row_numel, row_delta.data(), z_val);
}
}));
}
for (size_t i = 0; i < fs.size(); ++i) {
fs[i].wait();
} }
auto after_run_send_sparse = GetCurrentUS(); auto after_run_send_sparse = GetCurrentUS();
VLOG(3) << "run send update sparse var " << var_name << " use time " VLOG(3) << "run send update sparse var " << splited_var_name << " use time "
<< after_run_send_sparse - before_run_send_sparse; << after_run_send_sparse - before_run_send_sparse;
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
std::vector<int64_t> send_rows;
send_rows.reserve(new_rows.size());
for (auto idx : new_rows) {
send_rows.push_back(idx -
absolute_section_[origin_var_name][splited_var_index]);
}
var_z_select_rows->set_rows(send_rows);
var_z_select_rows->set_height(
send_varname_to_ctx_[var_name].height_sections[splited_var_index]);
auto before_send_sparse = GetCurrentUS();
RpcSend(var_name, splited_var_name, splited_var_index);
auto after_send_sparse = GetCurrentUS();
VLOG(3) << "send " << splited_var_name << " has nums " << new_rows.size()
<< " use time " << after_send_sparse - before_send_sparse;
} }
void GeoSgdCommunicator::RecvUpdateVars(const std::string &var_name) { void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) {
// calc var_training += var_pserver - var_old // calc var_training += var_pserver - var_old
// calc var_old = var_pserver // calc var_old = var_pserver
// var_name: param.delta
// step1: recv dense var from pserver
auto origin_var_name = DeltaVarToVar(var_name);
auto before_run_recv = GetCurrentUS();
auto recv_functor = distributed::ParameterRecv<float>();
recv_functor(recv_varname_to_ctx_[origin_var_name], *pserver_scope_.get());
auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_var_name << " use time "
<< after_run_recv - before_run_recv;
// step2: update dense var
auto before_run_update = GetCurrentUS();
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto *var_y_sub = old_scope_->Var(origin_var_name);
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
auto *var_z = pserver_scope_.get()->FindVar(origin_var_name);
auto var_z_tensor = var_z->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// calc sub = pserver - old
blas.SCAL(var_y_sub_tensor.numel(), -1,
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
blas.VADD(var_y_tensor.numel(),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_z_tensor.mutable_data<float>(var_z_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
// calc recv += sub
blas.VADD(var_x_tensor.numel(),
var_x_tensor.mutable_data<float>(var_x_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_x_tensor.mutable_data<float>(var_x_tensor.place()));
// calc old = pserver
framework::CopyVariable(*var_z, var_y);
auto after_run_update = GetCurrentUS();
VLOG(3) << "dese var update " << origin_var_name << " use time "
<< after_run_update - before_run_update;
}
void GeoSgdCommunicator::RecvUpdateSparseVars(
const std::string &var_name, const std::string &splited_var_name) {
// step 1: recv splited var from pserver
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
auto origin_var_name = DeltaVarToVar(var_name);
auto origin_splited_var_name = DeltaVarToVar(splited_var_name);
auto before_run_recv = GetCurrentUS(); auto before_run_recv = GetCurrentUS();
RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index);
auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_splited_var_name << " use time "
<< after_run_recv - before_run_recv;
auto *var_x = training_scope_->FindVar(var_name); // step 2: update sparse var
auto before_run_update = GetCurrentUS();
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>(); auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto dims = var_x_tensor.dims();
float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place()); float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place());
auto *var_y = old_scope_->FindVar(var_name); auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>(); auto var_y_tensor = var_y->Get<framework::LoDTensor>();
float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place()); float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place());
if (var_list_[var_name] == true) { auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name);
// sparse param auto var_z_slr = var_z->GetMutable<framework::SelectedRows>();
auto *var_z = pserver_scope_.get()->FindVar(var_name); auto row_size = var_z_slr->rows().size();
auto var_z_slr = var_z->GetMutable<framework::SelectedRows>();
auto &new_rows = var_z_slr->rows(); std::vector<int64_t> new_rows;
auto *new_value = var_z_slr->mutable_value(); new_rows.reserve(row_size);
auto row_numel = new_value->numel() / new_rows.size();
auto *z_value = new_value->mutable_data<float>(var_x_tensor.place()); for (auto ids : var_z_slr->rows()) {
new_rows.push_back(ids +
std::vector<int> buts = absolute_section_[origin_var_name][splited_var_index]);
bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket);
std::vector<std::future<void>> fs;
for (int x = 0; x < buts.size() - 1; x++) {
int start = buts[x];
int end = buts[x + 1];
fs.push_back(paddle::framework::Async(
[&x_value, &y_value, &z_value, &new_rows, row_numel, start, end]() {
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(
cpu_ctx);
for (int y = start; y < end; y++) {
std::vector<float> row_delta(row_numel, 0);
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
VSUB(row_numel, z_val, y_val, row_delta.data());
blas.VADD(row_numel, row_delta.data(), x_val, x_val);
blas.VCOPY(row_numel, z_val, y_val);
}
}));
}
for (size_t i = 0; i < fs.size(); ++i) {
fs[i].wait();
}
} else {
// dense param
auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name));
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
auto *var_z = pserver_scope_.get()->FindVar(var_name);
auto var_z_tensor = var_z->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// calc sub = pserver - old
blas.SCAL(var_y_sub_tensor.numel(), -1,
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
blas.VADD(var_y_tensor.numel(),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_z_tensor.mutable_data<float>(var_z_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
// calc recv += sub
blas.VADD(var_x_tensor.numel(),
var_x_tensor.mutable_data<float>(var_x_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_x_tensor.mutable_data<float>(var_x_tensor.place()));
// calc old = pserver
framework::CopyVariable(*var_z, var_y);
} }
auto after_run_recv = GetCurrentUS(); auto *new_value = var_z_slr->mutable_value();
VLOG(3) << "run recv update var " << var_name << " use time " auto row_numel = dims[1];
<< after_run_recv - before_run_recv; auto *z_value = new_value->mutable_data<float>(var_x_tensor.place());
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
for (int y = 0; y < new_rows.size(); y++) {
std::vector<float> row_delta(row_numel, 0);
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
VSUB(row_numel, z_val, y_val, row_delta.data());
blas.VADD(row_numel, row_delta.data(), x_val, x_val);
blas.VCOPY(row_numel, z_val, y_val);
}
auto after_run_update = GetCurrentUS();
VLOG(3) << "sparse var recv update " << origin_splited_var_name << " has num "
<< new_rows.size() << " use time "
<< after_run_update - before_run_update;
} }
void GeoSgdCommunicator::GeoSgdSparseParamInit(framework::Scope *scope_x, void GeoSgdCommunicator::GeoSgdSparseParamInit(framework::Scope *scope_x,
...@@ -893,6 +912,37 @@ void GeoSgdCommunicator::GeoSgdDenseParamInit(framework::Scope *scope_x, ...@@ -893,6 +912,37 @@ void GeoSgdCommunicator::GeoSgdDenseParamInit(framework::Scope *scope_x,
framework::CopyVariable(*var_x, var_y); framework::CopyVariable(*var_x, var_y);
} }
void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name,
const std::string &splited_var_name,
const size_t &splited_var_index) {
auto trainer_id = send_varname_to_ctx_[origin_var_name].trainer_id;
auto endpoint =
send_varname_to_ctx_[origin_var_name].epmap[splited_var_index];
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_send = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(trainer_id);
rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(),
splited_var_name);
}
void GeoSgdCommunicator::RpcRecv(const std::string &var_name,
const std::string &splited_var_name,
const size_t &splited_var_index) {
auto train_id = recv_varname_to_ctx_[var_name].trainer_id;
auto endpoint = recv_varname_to_ctx_[var_name].epmap[splited_var_index];
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(train_id);
pserver_scope_->Var(splited_var_name);
rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, *pserver_scope_.get(),
splited_var_name, splited_var_name, splited_var_name);
}
void GeoSgdCommunicator::Recv() {}
void GeoSgdCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, void GeoSgdCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RpcCtxMap &recv_varname_to_ctx, const RpcCtxMap &recv_varname_to_ctx,
Scope *recv_scope) {} Scope *recv_scope) {}
......
...@@ -27,7 +27,10 @@ limitations under the License. */ ...@@ -27,7 +27,10 @@ limitations under the License. */
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/rpc_common.h" #include "paddle/fluid/operators/distributed/rpc_common.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h" #include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
...@@ -268,7 +271,7 @@ class Communicator { ...@@ -268,7 +271,7 @@ class Communicator {
}; };
using SparseIdsMap = using SparseIdsMap =
std::unordered_map<std::string, std::unordered_set<int64_t>>; std::unordered_map<std::string, std::vector<std::unordered_set<int64_t>>>;
class AsyncCommunicator : public Communicator { class AsyncCommunicator : public Communicator {
public: public:
...@@ -348,15 +351,18 @@ class GeoSgdCommunicator : public Communicator { ...@@ -348,15 +351,18 @@ class GeoSgdCommunicator : public Communicator {
private: private:
void SendThread(); void SendThread();
void RecvAll();
std::unordered_set<int64_t> SparseIdsMerge( std::unordered_set<int64_t> SparseIdsMerge(
const std::vector<SparseIdsMap>& ids_send_vec, const std::vector<SparseIdsMap>& ids_send_vec,
const std::string& var_name); const std::string& var_name, const std::string& splited_var_name);
void SendUpdateDenseVars(const std::string& var_name); void SendUpdateDenseVars(const std::string& var_name);
void SendUpdateSparseVars(const std::string& var_name, void SendUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name,
const std::unordered_set<int64_t>& ids_table); const std::unordered_set<int64_t>& ids_table);
void RecvUpdateVars(const std::string& var_name);
void RecvUpdateDenseVars(const std::string& var_name);
void RecvUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name);
void GeoSgdDenseParamInit(framework::Scope* scope_x, void GeoSgdDenseParamInit(framework::Scope* scope_x,
framework::Scope* scope_y, framework::Scope* scope_y,
...@@ -366,6 +372,14 @@ class GeoSgdCommunicator : public Communicator { ...@@ -366,6 +372,14 @@ class GeoSgdCommunicator : public Communicator {
framework::Scope* scope_y, framework::Scope* scope_y,
const std::string var_name); const std::string var_name);
void RpcSend(const std::string& origin_var_name,
const std::string& splited_var_name,
const size_t& splited_var_index);
void RpcRecv(const std::string& origin_var_name,
const std::string& splited_var_name,
const size_t& splited_var_index);
const std::string VarToDeltaVar(const std::string var_name) { const std::string VarToDeltaVar(const std::string var_name) {
std::string delta_name = var_name; std::string delta_name = var_name;
const std::string send_name = delta_name.append(".delta"); const std::string send_name = delta_name.append(".delta");
...@@ -379,6 +393,20 @@ class GeoSgdCommunicator : public Communicator { ...@@ -379,6 +393,20 @@ class GeoSgdCommunicator : public Communicator {
return param_name; return param_name;
} }
size_t GetSplitedVarIndex(const std::string var_name,
const std::string splited_var_name) {
size_t index = 0;
for (size_t i = 0;
i < send_varname_to_ctx_[var_name].splited_var_names.size(); i++) {
if (send_varname_to_ctx_[var_name].splited_var_names[i] ==
splited_var_name) {
index = i;
break;
}
}
return index;
}
private: private:
int trainer_nums_ = 1; int trainer_nums_ = 1;
int geo_need_push_nums_ = 100; int geo_need_push_nums_ = 100;
...@@ -390,8 +418,6 @@ class GeoSgdCommunicator : public Communicator { ...@@ -390,8 +418,6 @@ class GeoSgdCommunicator : public Communicator {
std::shared_ptr<Scope> pserver_scope_; // parameter on pserver,gloabl scope std::shared_ptr<Scope> pserver_scope_; // parameter on pserver,gloabl scope
RpcCtxMap send_varname_to_ctx_; RpcCtxMap send_varname_to_ctx_;
RpcCtxMap recv_varname_to_ctx_; RpcCtxMap recv_varname_to_ctx_;
std::atomic_uint have_push_{0};
std::unordered_map<std::string, bool> std::unordered_map<std::string, bool>
var_list_; // if var is sparse, using selected rows, bool=true var_list_; // if var is sparse, using selected rows, bool=true
...@@ -399,9 +425,12 @@ class GeoSgdCommunicator : public Communicator { ...@@ -399,9 +425,12 @@ class GeoSgdCommunicator : public Communicator {
need_push_queue_; need_push_queue_;
std::vector<SparseIdsMap> ids_send_vec_; std::vector<SparseIdsMap> ids_send_vec_;
std::unordered_map<std::string, std::vector<int64_t>> absolute_section_;
std::unique_ptr<::ThreadPool> send_threadpool_{nullptr}; std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
std::unique_ptr<std::thread> send_thread_{nullptr}; std::unique_ptr<std::thread> send_thread_{nullptr};
size_t need_thread_nums_{0};
}; };
} // namespace distributed } // namespace distributed
......
...@@ -179,8 +179,8 @@ class GeoSgdTranspiler(DistributeTranspiler): ...@@ -179,8 +179,8 @@ class GeoSgdTranspiler(DistributeTranspiler):
return self.vars_info return self.vars_info
def get_trainer_program(self, wait_port=True): def get_trainer_program(self, wait_port=True):
# if wait_port: if wait_port:
# wait_server_ready(self.pserver_endpoints) wait_server_ready(self.pserver_endpoints)
return self.origin_program return self.origin_program
def get_pserver_programs(self, endpoint): def get_pserver_programs(self, endpoint):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册