未验证 提交 eb05db71 编写于 作者: C Chengmo 提交者: GitHub

Speed GEO-SGD (#20158)

* delete debug vlog & add rpc function & fix word2vec bug & speed GEO-SGD
上级 b28d4a82
......@@ -24,6 +24,7 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"
......@@ -62,23 +63,6 @@ inline void VSUB(int n, const T *x, const T *y, T *z) {
}
}
inline std::vector<int> bucket(const int v_size, const int b_size) {
int remainder = v_size % b_size;
int bucket = v_size / b_size;
std::vector<int> ret_vec(b_size, bucket);
for (int i = 0; i < remainder; ++i) {
ret_vec[i] = ret_vec[i] + 1;
}
int cur_bucket = 0;
for (int j = 0; j < ret_vec.size(); ++j) {
int tmp = ret_vec[j];
ret_vec[j] = cur_bucket;
cur_bucket += tmp;
}
ret_vec.push_back(cur_bucket);
return ret_vec;
}
std::once_flag Communicator::init_flag_;
std::shared_ptr<Communicator> Communicator::communicator_(nullptr);
......@@ -447,6 +431,14 @@ void GeoSgdCommunicator::InitImpl(
send_var_name, send_var_names, vars_epmap, vars_sections_int, 0);
recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext(
var_name, vars_names, vars_epmap, vars_sections_int, 0);
// record sparse section
if (is_sparse) {
need_thread_nums_ +=
send_varname_to_ctx_[send_var_name].height_sections.size();
absolute_section_[var_name] = operators::ToAbsoluteSection(
send_varname_to_ctx_[send_var_name].height_sections);
}
}
if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) {
......@@ -454,17 +446,12 @@ void GeoSgdCommunicator::InitImpl(
}
send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size));
need_push_queue_ =
std::make_shared<BlockingQueue<std::shared_ptr<SparseIdsMap>>>(
geo_need_push_nums);
delta_scope_.reset(new Scope());
old_scope_.reset(new Scope());
pserver_scope_.reset(new Scope());
// for coverage test, please ignore follow code
InitImpl(send_varname_to_ctx_, recv_varname_to_ctx_, training_scope_);
InitImpl(program, training_scope_);
}
void GeoSgdCommunicator::Start() {
......@@ -521,11 +508,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
const framework::Scope &scope) {
// SparseIdsMap = std::unordered_map<std::string,std::unordered_set<int64_t>>
std::shared_ptr<SparseIdsMap> ids_table = std::make_shared<SparseIdsMap>();
auto before_run_send = GetCurrentUS();
for (size_t i = 0; i < sparse_var_tables.size(); i++) {
if (ids_table->find(sparse_var_tables[i]) == ids_table->end()) {
// create empty set for new sparse var
ids_table->insert(std::pair<std::string, std::unordered_set<int64_t>>(
sparse_var_tables[i], std::unordered_set<int64_t>{}));
auto splited_var_nums =
recv_varname_to_ctx_[sparse_var_tables[i]].splited_var_names.size();
ids_table->insert(
std::pair<std::string, std::vector<std::unordered_set<int64_t>>>(
sparse_var_tables[i],
std::vector<std::unordered_set<int64_t>>{splited_var_nums}));
}
auto *var = scope.FindVar(sparse_var_names[i]);
auto var_tensor = var->Get<framework::LoDTensor>();
......@@ -533,12 +525,16 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
int *var_mutable_data = var_tensor.mutable_data<int>(var_tensor.place());
// insert ids which has not been record
for (size_t j = 0; j < element_number; j++) {
ids_table->at(sparse_var_tables[i]).insert(var_mutable_data[j]);
auto ep_idx = GetSectionIndex(var_mutable_data[j],
absolute_section_[sparse_var_tables[i]]);
ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]);
VLOG(4) << "Sparse var " << sparse_var_tables[i] << " insert "
<< var_mutable_data[j];
}
}
need_push_queue_->Push(ids_table);
auto after_run_send = GetCurrentUS();
VLOG(3) << "run send_op use time " << after_run_send - before_run_send;
}
void GeoSgdCommunicator::SendThread() {
......@@ -548,13 +544,22 @@ void GeoSgdCommunicator::SendThread() {
while (running_) {
std::vector<std::future<void>> task_futures;
task_futures.reserve(send_varname_to_ctx_.size());
auto before_run_send_graph = GetCurrentUS();
if (ids_send_vec_.size() < geo_need_push_nums_) {
size_t wait_times = 0;
while (ids_send_vec_.size() < geo_need_push_nums_) {
VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size();
if (need_push_queue_->Size() > 0) {
wait_times = 0;
ids_send_vec_.push_back(*(need_push_queue_->Pop()));
VLOG(4) << "ids_send_vec_ pushed";
} else if (need_push_queue_->Size() == 0) {
VLOG(3) << "wait_times -> " << wait_times;
if (wait_times >= FLAGS_communicator_send_wait_times) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait_times++;
continue;
}
}
......@@ -567,86 +572,62 @@ void GeoSgdCommunicator::SendThread() {
for (auto &iter : send_varname_to_ctx_) {
auto &var_name = iter.first;
auto send_task = [this, &var_name] {
auto origin_var_name = DeltaVarToVar(var_name);
if (var_list_[origin_var_name] == true) {
auto ids_set = SparseIdsMerge(ids_send_vec_, origin_var_name);
SendUpdateSparseVars(origin_var_name, ids_set);
} else {
SendUpdateDenseVars(origin_var_name);
if (var_list_[DeltaVarToVar(var_name)] == true) {
// sparse var: merge->send->recv
for (auto &splited_var_name : iter.second.splited_var_names) {
auto send_task = [this, &var_name, &splited_var_name] {
auto before_run_geo = GetCurrentUS();
auto ids_set =
SparseIdsMerge(ids_send_vec_, var_name, splited_var_name);
SendUpdateSparseVars(var_name, splited_var_name, ids_set);
RecvUpdateSparseVars(var_name, splited_var_name);
auto after_run_geo = GetCurrentUS();
VLOG(1) << "run GEO-SGD var " << splited_var_name << " use time "
<< after_run_geo - before_run_geo;
};
task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task)));
}
auto before_send = GetCurrentUS();
auto send_functor = distributed::ParameterSend<float>();
auto &ctx = send_varname_to_ctx_.at(var_name);
send_functor(ctx, *delta_scope_.get(), true, 1);
auto after_send = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time "
<< after_send - before_send;
};
task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task)));
} else {
auto send_task = [this, &var_name] {
auto before_run_geo = GetCurrentUS();
SendUpdateDenseVars(var_name);
RecvUpdateDenseVars(var_name);
auto after_run_geo = GetCurrentUS();
VLOG(3) << "run GEO-SGD var " << var_name << " use time "
<< after_run_geo - before_run_geo;
};
task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task)));
}
}
for (auto &task_f : task_futures) {
task_f.wait();
}
ids_send_vec_.clear();
}
for (auto &task_f : task_futures) {
task_f.wait();
have_push_.fetch_add(1, std::memory_order_relaxed);
}
auto after_run_send_graph = GetCurrentUS();
VLOG(4) << "run send graph use time "
<< after_run_send_graph - before_run_send_graph;
Recv();
}
}
void GeoSgdCommunicator::Recv() {
auto push_nums = have_push_.load();
if (push_nums >= send_varname_to_ctx_.size()) {
ids_send_vec_.clear();
RecvAll();
have_push_.store(0);
}
}
void GeoSgdCommunicator::RecvAll() {
if (!running_) return;
auto before_recv = GetCurrentUS();
std::vector<std::future<void>> task_futures;
task_futures.reserve(recv_varname_to_ctx_.size());
for (auto &iter : recv_varname_to_ctx_) {
auto recv_task = [this, &iter] {
auto &var_name = iter.first;
auto recv_functor = distributed::ParameterRecv<float>();
auto before_parameter_recv = GetCurrentUS();
recv_functor(iter.second, *pserver_scope_.get());
auto after_parameter_recv = GetCurrentUS();
VLOG(3) << "run parameter recv var " << var_name << " use time "
<< after_parameter_recv - before_parameter_recv;
RecvUpdateVars(var_name);
};
task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task)));
}
for (auto &task : task_futures) {
task.wait();
}
auto after_recv = GetCurrentUS();
VLOG(3) << "run recv graph use time " << after_recv - before_recv;
}
std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
const std::vector<SparseIdsMap> &ids_send_vec,
const std::string &var_name) {
const std::vector<SparseIdsMap> &ids_send_vec, const std::string &var_name,
const std::string &splited_var_name) {
// every batch has some sparse id, merge them into one unoredered_set
VLOG(3) << "Sparse Ids merge var: " << var_name
<< " splited var: " << splited_var_name;
auto before_run_ids_merge_ = GetCurrentUS();
auto origin_var_name = DeltaVarToVar(var_name);
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
std::unordered_set<int64_t> ids_set;
for (auto ids_map : ids_send_vec) {
for (auto id : ids_map[var_name]) {
for (auto id : ids_map[origin_var_name][splited_var_index]) {
ids_set.insert(id);
}
}
auto after_run_ids_merge_ = GetCurrentUS();
VLOG(3) << "run SparseIdsMerge use time "
VLOG(3) << "run SparseIdsMerge " << splited_var_name << " has nums "
<< ids_set.size() << " use time "
<< after_run_ids_merge_ - before_run_ids_merge_;
return ids_set;
}
......@@ -654,24 +635,26 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) {
// calc var_delata = (var_training - var_old)/trainer_nums
// calc var_old += var_delta
// var_name: param.delta
auto origin_var_name = DeltaVarToVar(var_name);
auto before_run_send_dense = GetCurrentUS();
auto *var_x = training_scope_->FindVar(var_name);
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_->FindVar(var_name);
auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto dims = var_x_tensor.dims();
// create temp var for sub
auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name));
auto *var_y_sub = old_scope_->Var(var_name);
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
// create delta var in delta scope
auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name));
auto *var_z = delta_scope_->Var(var_name);
auto *var_z_tensor = var_z->GetMutable<framework::LoDTensor>();
var_z_tensor->mutable_data<float>(dims, var_x_tensor.place());
var_z_tensor->set_lod(var_x_tensor.lod());
......@@ -702,20 +685,34 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) {
auto after_run_send_dense = GetCurrentUS();
VLOG(3) << "run send update dense var " << var_name << " use time "
<< after_run_send_dense - before_run_send_dense;
auto send_functor = distributed::ParameterSend<float>();
auto &ctx = send_varname_to_ctx_.at(var_name);
auto before_send_dense = GetCurrentUS();
send_functor(ctx, *delta_scope_.get(), true, 1);
auto after_send_denxe = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time "
<< after_send_denxe - before_send_dense;
}
void GeoSgdCommunicator::SendUpdateSparseVars(
const std::string &var_name, const std::unordered_set<int64_t> &ids_table) {
const std::string &var_name, const std::string &splited_var_name,
const std::unordered_set<int64_t> &ids_table) {
// calc var_delata = (var_training - var_old)/trainer_nums
// calc var_old += var_delta
// var_name: param.delta, splited_var_name: param.block0.delta
// origin_var_name: param
auto before_run_send_sparse = GetCurrentUS();
auto ids_num = ids_table.size();
VLOG(3) << "Sparse Ids nums is : " << ids_num;
auto *var_x = training_scope_->FindVar(var_name);
VLOG(4) << "Sparse Ids nums is : " << ids_num;
auto origin_var_name = DeltaVarToVar(var_name);
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_.get()->FindVar(var_name);
auto *var_y = old_scope_.get()->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto dims = var_x_tensor.dims();
......@@ -724,7 +721,7 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place());
float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place());
auto *var_z = delta_scope_->Var(VarToDeltaVar(var_name));
auto *var_z = delta_scope_->Var(splited_var_name);
auto *var_z_select_rows = var_z->GetMutable<framework::SelectedRows>();
auto *var_z_value = var_z_select_rows->mutable_value();
var_z_value->Resize({static_cast<int64_t>(ids_num), row_numel});
......@@ -732,135 +729,157 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
std::vector<int64_t> new_rows;
new_rows.insert(new_rows.begin(), ids_table.begin(), ids_table.end());
var_z_select_rows->set_rows(new_rows);
var_z_select_rows->set_height(new_rows.size());
// using multi thread speed sparse delta calc
std::vector<int> buts =
bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket);
std::vector<std::future<void>> fs;
for (int x = 0; x < buts.size() - 1; x++) {
int start = buts[x];
int end = buts[x + 1];
float avg = 1 / static_cast<float>(trainer_nums_);
fs.push_back(
paddle::framework::Async([&x_value, &y_value, &z_value, &new_rows,
row_numel, start, end, avg]() {
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
for (int y = start; y < end; y++) {
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
std::vector<float> row_delta(row_numel, 0);
VSUB<float>(row_numel, x_val, y_val, row_delta.data());
blas.SCAL(row_numel, avg, row_delta.data());
blas.VADD(row_numel, row_delta.data(), y_val, y_val);
blas.VCOPY(row_numel, row_delta.data(), z_val);
}
}));
}
for (size_t i = 0; i < fs.size(); ++i) {
fs[i].wait();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
float avg = 1 / static_cast<float>(trainer_nums_);
for (int y = 0; y < new_rows.size(); y++) {
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
std::vector<float> row_delta(row_numel, 0);
VSUB<float>(row_numel, x_val, y_val, row_delta.data());
blas.SCAL(row_numel, avg, row_delta.data());
blas.VADD(row_numel, row_delta.data(), y_val, y_val);
blas.VCOPY(row_numel, row_delta.data(), z_val);
}
auto after_run_send_sparse = GetCurrentUS();
VLOG(3) << "run send update sparse var " << var_name << " use time "
VLOG(3) << "run send update sparse var " << splited_var_name << " use time "
<< after_run_send_sparse - before_run_send_sparse;
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
std::vector<int64_t> send_rows;
send_rows.reserve(new_rows.size());
for (auto idx : new_rows) {
send_rows.push_back(idx -
absolute_section_[origin_var_name][splited_var_index]);
}
var_z_select_rows->set_rows(send_rows);
var_z_select_rows->set_height(
send_varname_to_ctx_[var_name].height_sections[splited_var_index]);
auto before_send_sparse = GetCurrentUS();
RpcSend(var_name, splited_var_name, splited_var_index);
auto after_send_sparse = GetCurrentUS();
VLOG(3) << "send " << splited_var_name << " has nums " << new_rows.size()
<< " use time " << after_send_sparse - before_send_sparse;
}
void GeoSgdCommunicator::RecvUpdateVars(const std::string &var_name) {
void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) {
// calc var_training += var_pserver - var_old
// calc var_old = var_pserver
// var_name: param.delta
// step1: recv dense var from pserver
auto origin_var_name = DeltaVarToVar(var_name);
auto before_run_recv = GetCurrentUS();
auto recv_functor = distributed::ParameterRecv<float>();
recv_functor(recv_varname_to_ctx_[origin_var_name], *pserver_scope_.get());
auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_var_name << " use time "
<< after_run_recv - before_run_recv;
// step2: update dense var
auto before_run_update = GetCurrentUS();
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto *var_y_sub = old_scope_->Var(origin_var_name);
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
auto *var_z = pserver_scope_.get()->FindVar(origin_var_name);
auto var_z_tensor = var_z->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// calc sub = pserver - old
blas.SCAL(var_y_sub_tensor.numel(), -1,
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
blas.VADD(var_y_tensor.numel(),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_z_tensor.mutable_data<float>(var_z_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
// calc recv += sub
blas.VADD(var_x_tensor.numel(),
var_x_tensor.mutable_data<float>(var_x_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_x_tensor.mutable_data<float>(var_x_tensor.place()));
// calc old = pserver
framework::CopyVariable(*var_z, var_y);
auto after_run_update = GetCurrentUS();
VLOG(3) << "dese var update " << origin_var_name << " use time "
<< after_run_update - before_run_update;
}
void GeoSgdCommunicator::RecvUpdateSparseVars(
const std::string &var_name, const std::string &splited_var_name) {
// step 1: recv splited var from pserver
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
auto origin_var_name = DeltaVarToVar(var_name);
auto origin_splited_var_name = DeltaVarToVar(splited_var_name);
auto before_run_recv = GetCurrentUS();
RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index);
auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_splited_var_name << " use time "
<< after_run_recv - before_run_recv;
auto *var_x = training_scope_->FindVar(var_name);
// step 2: update sparse var
auto before_run_update = GetCurrentUS();
auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>();
auto dims = var_x_tensor.dims();
float *x_value = var_x_tensor.mutable_data<float>(var_x_tensor.place());
auto *var_y = old_scope_->FindVar(var_name);
auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>();
float *y_value = var_y_tensor.mutable_data<float>(var_y_tensor.place());
if (var_list_[var_name] == true) {
// sparse param
auto *var_z = pserver_scope_.get()->FindVar(var_name);
auto var_z_slr = var_z->GetMutable<framework::SelectedRows>();
auto &new_rows = var_z_slr->rows();
auto *new_value = var_z_slr->mutable_value();
auto row_numel = new_value->numel() / new_rows.size();
auto *z_value = new_value->mutable_data<float>(var_x_tensor.place());
std::vector<int> buts =
bucket(new_rows.size(), FLAGS_communicator_merge_sparse_bucket);
std::vector<std::future<void>> fs;
for (int x = 0; x < buts.size() - 1; x++) {
int start = buts[x];
int end = buts[x + 1];
fs.push_back(paddle::framework::Async(
[&x_value, &y_value, &z_value, &new_rows, row_numel, start, end]() {
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(
cpu_ctx);
for (int y = start; y < end; y++) {
std::vector<float> row_delta(row_numel, 0);
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
VSUB(row_numel, z_val, y_val, row_delta.data());
blas.VADD(row_numel, row_delta.data(), x_val, x_val);
blas.VCOPY(row_numel, z_val, y_val);
}
}));
}
for (size_t i = 0; i < fs.size(); ++i) {
fs[i].wait();
}
} else {
// dense param
auto *var_y_sub = old_scope_->Var(VarToDeltaVar(var_name));
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
auto *var_z = pserver_scope_.get()->FindVar(var_name);
auto var_z_tensor = var_z->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas =
math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// calc sub = pserver - old
blas.SCAL(var_y_sub_tensor.numel(), -1,
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
blas.VADD(var_y_tensor.numel(),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_z_tensor.mutable_data<float>(var_z_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()));
// calc recv += sub
blas.VADD(var_x_tensor.numel(),
var_x_tensor.mutable_data<float>(var_x_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_x_tensor.mutable_data<float>(var_x_tensor.place()));
// calc old = pserver
framework::CopyVariable(*var_z, var_y);
auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name);
auto var_z_slr = var_z->GetMutable<framework::SelectedRows>();
auto row_size = var_z_slr->rows().size();
std::vector<int64_t> new_rows;
new_rows.reserve(row_size);
for (auto ids : var_z_slr->rows()) {
new_rows.push_back(ids +
absolute_section_[origin_var_name][splited_var_index]);
}
auto after_run_recv = GetCurrentUS();
VLOG(3) << "run recv update var " << var_name << " use time "
<< after_run_recv - before_run_recv;
auto *new_value = var_z_slr->mutable_value();
auto row_numel = dims[1];
auto *z_value = new_value->mutable_data<float>(var_x_tensor.place());
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
for (int y = 0; y < new_rows.size(); y++) {
std::vector<float> row_delta(row_numel, 0);
auto ids = new_rows[y];
float *x_val = x_value + ids * row_numel;
float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel;
VSUB(row_numel, z_val, y_val, row_delta.data());
blas.VADD(row_numel, row_delta.data(), x_val, x_val);
blas.VCOPY(row_numel, z_val, y_val);
}
auto after_run_update = GetCurrentUS();
VLOG(3) << "sparse var recv update " << origin_splited_var_name << " has num "
<< new_rows.size() << " use time "
<< after_run_update - before_run_update;
}
void GeoSgdCommunicator::GeoSgdSparseParamInit(framework::Scope *scope_x,
......@@ -893,6 +912,37 @@ void GeoSgdCommunicator::GeoSgdDenseParamInit(framework::Scope *scope_x,
framework::CopyVariable(*var_x, var_y);
}
void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name,
const std::string &splited_var_name,
const size_t &splited_var_index) {
auto trainer_id = send_varname_to_ctx_[origin_var_name].trainer_id;
auto endpoint =
send_varname_to_ctx_[origin_var_name].epmap[splited_var_index];
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_send = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(trainer_id);
rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(),
splited_var_name);
}
void GeoSgdCommunicator::RpcRecv(const std::string &var_name,
const std::string &splited_var_name,
const size_t &splited_var_index) {
auto train_id = recv_varname_to_ctx_[var_name].trainer_id;
auto endpoint = recv_varname_to_ctx_[var_name].epmap[splited_var_index];
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(train_id);
pserver_scope_->Var(splited_var_name);
rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, *pserver_scope_.get(),
splited_var_name, splited_var_name, splited_var_name);
}
void GeoSgdCommunicator::Recv() {}
void GeoSgdCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RpcCtxMap &recv_varname_to_ctx,
Scope *recv_scope) {}
......
......@@ -27,7 +27,10 @@ limitations under the License. */
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/rpc_common.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/platform/device_context.h"
......@@ -268,7 +271,7 @@ class Communicator {
};
using SparseIdsMap =
std::unordered_map<std::string, std::unordered_set<int64_t>>;
std::unordered_map<std::string, std::vector<std::unordered_set<int64_t>>>;
class AsyncCommunicator : public Communicator {
public:
......@@ -348,15 +351,18 @@ class GeoSgdCommunicator : public Communicator {
private:
void SendThread();
void RecvAll();
std::unordered_set<int64_t> SparseIdsMerge(
const std::vector<SparseIdsMap>& ids_send_vec,
const std::string& var_name);
const std::string& var_name, const std::string& splited_var_name);
void SendUpdateDenseVars(const std::string& var_name);
void SendUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name,
const std::unordered_set<int64_t>& ids_table);
void RecvUpdateVars(const std::string& var_name);
void RecvUpdateDenseVars(const std::string& var_name);
void RecvUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name);
void GeoSgdDenseParamInit(framework::Scope* scope_x,
framework::Scope* scope_y,
......@@ -366,6 +372,14 @@ class GeoSgdCommunicator : public Communicator {
framework::Scope* scope_y,
const std::string var_name);
void RpcSend(const std::string& origin_var_name,
const std::string& splited_var_name,
const size_t& splited_var_index);
void RpcRecv(const std::string& origin_var_name,
const std::string& splited_var_name,
const size_t& splited_var_index);
const std::string VarToDeltaVar(const std::string var_name) {
std::string delta_name = var_name;
const std::string send_name = delta_name.append(".delta");
......@@ -379,6 +393,20 @@ class GeoSgdCommunicator : public Communicator {
return param_name;
}
size_t GetSplitedVarIndex(const std::string var_name,
const std::string splited_var_name) {
size_t index = 0;
for (size_t i = 0;
i < send_varname_to_ctx_[var_name].splited_var_names.size(); i++) {
if (send_varname_to_ctx_[var_name].splited_var_names[i] ==
splited_var_name) {
index = i;
break;
}
}
return index;
}
private:
int trainer_nums_ = 1;
int geo_need_push_nums_ = 100;
......@@ -390,8 +418,6 @@ class GeoSgdCommunicator : public Communicator {
std::shared_ptr<Scope> pserver_scope_; // parameter on pserver,gloabl scope
RpcCtxMap send_varname_to_ctx_;
RpcCtxMap recv_varname_to_ctx_;
std::atomic_uint have_push_{0};
std::unordered_map<std::string, bool>
var_list_; // if var is sparse, using selected rows, bool=true
......@@ -399,9 +425,12 @@ class GeoSgdCommunicator : public Communicator {
need_push_queue_;
std::vector<SparseIdsMap> ids_send_vec_;
std::unordered_map<std::string, std::vector<int64_t>> absolute_section_;
std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
std::unique_ptr<std::thread> send_thread_{nullptr};
size_t need_thread_nums_{0};
};
} // namespace distributed
......
......@@ -179,8 +179,8 @@ class GeoSgdTranspiler(DistributeTranspiler):
return self.vars_info
def get_trainer_program(self, wait_port=True):
# if wait_port:
# wait_server_ready(self.pserver_endpoints)
if wait_port:
wait_server_ready(self.pserver_endpoints)
return self.origin_program
def get_pserver_programs(self, endpoint):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册