未验证 提交 a86f11b5 编写于 作者: C Chengmo 提交者: GitHub

Speed GEO dense calc & communication (#21579)

* test=develop, speed dense calc & communication
上级 666c3bb9
...@@ -183,20 +183,20 @@ void AsyncCommunicator::SendThread() { ...@@ -183,20 +183,20 @@ void AsyncCommunicator::SendThread() {
while (running_) { while (running_) {
std::vector<std::future<void>> task_futures; std::vector<std::future<void>> task_futures;
task_futures.reserve(send_varname_to_ctx_.size()); task_futures.reserve(send_varname_to_ctx_.size());
VLOG(3) << "run send graph"; VLOG(4) << "run send graph";
auto before_run_send_graph = GetCurrentUS(); auto before_run_send_graph = GetCurrentUS();
for (auto &iter : send_varname_to_queue_) { for (auto &iter : send_varname_to_queue_) {
auto &var_name = iter.first; auto &var_name = iter.first;
auto &var_queue = iter.second; auto &var_queue = iter.second;
if (var_queue->Size() > 0) { if (var_queue->Size() > 0) {
auto send_task = [this, &var_name, &var_queue] { auto send_task = [this, &var_name, &var_queue] {
VLOG(3) << var_name << " merge and send"; VLOG(4) << var_name << " merge and send";
std::vector<std::shared_ptr<Variable>> vars; std::vector<std::shared_ptr<Variable>> vars;
int merged_var_num = 0; int merged_var_num = 0;
int wait_times = 0; int wait_times = 0;
while (merged_var_num < FLAGS_communicator_max_merge_var_num) { while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
if (var_queue->Size() == 0) { if (var_queue->Size() == 0) {
VLOG(3) << "wait_times -> " << wait_times; VLOG(4) << "wait_times -> " << wait_times;
if (wait_times >= FLAGS_communicator_send_wait_times) { if (wait_times >= FLAGS_communicator_send_wait_times) {
break; break;
} }
...@@ -223,14 +223,14 @@ void AsyncCommunicator::SendThread() { ...@@ -223,14 +223,14 @@ void AsyncCommunicator::SendThread() {
ctx.merge_add); ctx.merge_add);
} }
auto after_merge = GetCurrentUS(); auto after_merge = GetCurrentUS();
VLOG(3) << "merge " << merged_var_num << " " << var_name VLOG(4) << "merge " << merged_var_num << " " << var_name
<< " use time " << after_merge - before_merge; << " use time " << after_merge - before_merge;
auto send_functor = distributed::ParameterSend<float>(); auto send_functor = distributed::ParameterSend<float>();
if (!FLAGS_communicator_fake_rpc) { if (!FLAGS_communicator_fake_rpc) {
send_functor(ctx, *send_scope_, true, 1); send_functor(ctx, *send_scope_, true, 1);
} }
auto after_send = GetCurrentUS(); auto after_send = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time " VLOG(4) << "send " << var_name << " use time "
<< after_send - after_merge; << after_send - after_merge;
}; };
task_futures.emplace_back( task_futures.emplace_back(
...@@ -244,7 +244,7 @@ void AsyncCommunicator::SendThread() { ...@@ -244,7 +244,7 @@ void AsyncCommunicator::SendThread() {
} }
auto after_run_send_graph = GetCurrentUS(); auto after_run_send_graph = GetCurrentUS();
VLOG(3) << "run send graph use time " VLOG(4) << "run send graph use time "
<< after_run_send_graph - before_run_send_graph; << after_run_send_graph - before_run_send_graph;
Recv(); Recv();
} }
...@@ -323,7 +323,7 @@ void AsyncCommunicator::RecvAll() { ...@@ -323,7 +323,7 @@ void AsyncCommunicator::RecvAll() {
task.wait(); task.wait();
} }
auto after_recv = GetCurrentUS(); auto after_recv = GetCurrentUS();
VLOG(1) << "run recv graph use time " << after_recv - before_send; VLOG(3) << "run recv graph use time " << after_recv - before_send;
} }
void AsyncCommunicator::Start() { void AsyncCommunicator::Start() {
...@@ -446,13 +446,15 @@ void GeoSgdCommunicator::InitImpl( ...@@ -446,13 +446,15 @@ void GeoSgdCommunicator::InitImpl(
recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext( recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext(
var_name, vars_names, vars_epmap, vars_sections_int, 0); var_name, vars_names, vars_epmap, vars_sections_int, 0);
// record sparse section
if (is_sparse) {
need_thread_nums_ +=
send_varname_to_ctx_[send_var_name].height_sections.size();
absolute_section_[var_name] = operators::ToAbsoluteSection( absolute_section_[var_name] = operators::ToAbsoluteSection(
send_varname_to_ctx_[send_var_name].height_sections); send_varname_to_ctx_[send_var_name].height_sections);
vars_first_dimension_[var_name] = 0;
for (int64_t section : vars_sections_int) {
vars_first_dimension_[var_name] += section;
} }
send_var_nums_ += vars_names.size();
} }
if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) { if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) {
...@@ -548,7 +550,7 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names, ...@@ -548,7 +550,7 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
} }
need_push_queue_->Push(ids_table); need_push_queue_->Push(ids_table);
auto after_run_send = GetCurrentUS(); auto after_run_send = GetCurrentUS();
VLOG(3) << "run send_op use time " << after_run_send - before_run_send; VLOG(4) << "run send_op use time " << after_run_send - before_run_send;
} }
void GeoSgdCommunicator::SendThread() { void GeoSgdCommunicator::SendThread() {
...@@ -557,7 +559,7 @@ void GeoSgdCommunicator::SendThread() { ...@@ -557,7 +559,7 @@ void GeoSgdCommunicator::SendThread() {
while (running_) { while (running_) {
std::vector<std::future<void>> task_futures; std::vector<std::future<void>> task_futures;
task_futures.reserve(send_varname_to_ctx_.size()); task_futures.reserve(send_var_nums_);
int wait_times = 0; int wait_times = 0;
while (ids_send_vec_.size() < geo_need_push_nums_) { while (ids_send_vec_.size() < geo_need_push_nums_) {
...@@ -567,7 +569,7 @@ void GeoSgdCommunicator::SendThread() { ...@@ -567,7 +569,7 @@ void GeoSgdCommunicator::SendThread() {
ids_send_vec_.push_back(*(need_push_queue_->Pop())); ids_send_vec_.push_back(*(need_push_queue_->Pop()));
VLOG(4) << "ids_send_vec_ pushed"; VLOG(4) << "ids_send_vec_ pushed";
} else if (need_push_queue_->Size() == 0) { } else if (need_push_queue_->Size() == 0) {
VLOG(3) << "wait_times -> " << wait_times; VLOG(4) << "wait_times -> " << wait_times;
if (wait_times >= FLAGS_communicator_send_wait_times) { if (wait_times >= FLAGS_communicator_send_wait_times) {
break; break;
} }
...@@ -579,10 +581,10 @@ void GeoSgdCommunicator::SendThread() { ...@@ -579,10 +581,10 @@ void GeoSgdCommunicator::SendThread() {
if (ids_send_vec_.size() >= geo_need_push_nums_) { if (ids_send_vec_.size() >= geo_need_push_nums_) {
auto after_run_training = GetCurrentUS(); auto after_run_training = GetCurrentUS();
VLOG(3) << "run Training use time " VLOG(4) << "run Training use time "
<< after_run_training - before_run_training; << after_run_training - before_run_training;
before_run_training = GetCurrentUS(); before_run_training = GetCurrentUS();
VLOG(3) << "Start send after get need_push_num"; VLOG(4) << "Start send after get need_push_num";
for (auto &iter : send_varname_to_ctx_) { for (auto &iter : send_varname_to_ctx_) {
auto &var_name = iter.first; auto &var_name = iter.first;
...@@ -591,30 +593,33 @@ void GeoSgdCommunicator::SendThread() { ...@@ -591,30 +593,33 @@ void GeoSgdCommunicator::SendThread() {
for (auto &splited_var_name : iter.second.splited_var_names) { for (auto &splited_var_name : iter.second.splited_var_names) {
auto send_task = [this, &var_name, &splited_var_name] { auto send_task = [this, &var_name, &splited_var_name] {
auto before_run_geo = GetCurrentUS(); auto before_run_geo = GetCurrentUS();
VLOG(4) << "ids_send_vec_ size: " << ids_send_vec_.size();
auto ids_set = auto ids_set =
SparseIdsMerge(ids_send_vec_, var_name, splited_var_name); SparseIdsMerge(ids_send_vec_, var_name, splited_var_name);
SendUpdateSparseVars(var_name, splited_var_name, ids_set); SendUpdateSparseVars(var_name, splited_var_name, ids_set);
RecvUpdateSparseVars(var_name, splited_var_name); RecvUpdateSparseVars(var_name, splited_var_name);
auto after_run_geo = GetCurrentUS(); auto after_run_geo = GetCurrentUS();
VLOG(1) << "run GEO-SGD var " << splited_var_name << " use time " VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time "
<< after_run_geo - before_run_geo; << after_run_geo - before_run_geo;
}; };
task_futures.emplace_back( task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task))); send_threadpool_->enqueue(std::move(send_task)));
} }
} else { } else {
auto send_task = [this, &var_name] { for (auto &splited_var_name : iter.second.splited_var_names) {
auto send_task = [this, &var_name, &splited_var_name] {
auto before_run_geo = GetCurrentUS(); auto before_run_geo = GetCurrentUS();
SendUpdateDenseVars(var_name); SendUpdateDenseVars(var_name, splited_var_name);
RecvUpdateDenseVars(var_name); RecvUpdateDenseVars(var_name, splited_var_name);
auto after_run_geo = GetCurrentUS(); auto after_run_geo = GetCurrentUS();
VLOG(3) << "run GEO-SGD var " << var_name << " use time " VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time "
<< after_run_geo - before_run_geo; << after_run_geo - before_run_geo;
}; };
task_futures.emplace_back( task_futures.emplace_back(
send_threadpool_->enqueue(std::move(send_task))); send_threadpool_->enqueue(std::move(send_task)));
} }
} }
}
for (auto &task_f : task_futures) { for (auto &task_f : task_futures) {
task_f.wait(); task_f.wait();
} }
...@@ -627,31 +632,36 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge( ...@@ -627,31 +632,36 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
const std::vector<SparseIdsMap> &ids_send_vec, const std::string &var_name, const std::vector<SparseIdsMap> &ids_send_vec, const std::string &var_name,
const std::string &splited_var_name) { const std::string &splited_var_name) {
// every batch has some sparse id, merge them into one unoredered_set // every batch has some sparse id, merge them into one unoredered_set
VLOG(3) << "Sparse Ids merge var: " << var_name VLOG(4) << "Sparse Ids merge var: " << var_name
<< " splited var: " << splited_var_name; << " splited var: " << splited_var_name;
auto before_run_ids_merge_ = GetCurrentUS(); auto before_run_ids_merge_ = GetCurrentUS();
auto origin_var_name = DeltaVarToVar(var_name); auto origin_var_name = DeltaVarToVar(var_name);
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
std::unordered_set<int64_t> ids_set; std::unordered_set<int64_t> ids_set;
for (auto ids_map : ids_send_vec) { for (auto ids_map : ids_send_vec) {
for (auto id : ids_map[origin_var_name][splited_var_index]) { for (auto id : ids_map[origin_var_name][splited_var_index]) {
ids_set.insert(id); ids_set.insert(id);
} }
} }
auto after_run_ids_merge_ = GetCurrentUS(); auto after_run_ids_merge_ = GetCurrentUS();
VLOG(3) << "run SparseIdsMerge " << splited_var_name << " has nums " VLOG(4) << "run SparseIdsMerge " << splited_var_name << " has nums "
<< ids_set.size() << " use time " << ids_set.size() << " use time "
<< after_run_ids_merge_ - before_run_ids_merge_; << after_run_ids_merge_ - before_run_ids_merge_;
return ids_set; return ids_set;
} }
void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { void GeoSgdCommunicator::SendUpdateDenseVars(
const std::string &var_name, const std::string &splited_var_name) {
// calc var_delata = (var_training - var_old)/trainer_nums // calc var_delata = (var_training - var_old)/trainer_nums
// calc var_old += var_delta // calc var_old += var_delta
// var_name: param.delta // var_name: param.delta
auto origin_var_name = DeltaVarToVar(var_name); auto origin_var_name = DeltaVarToVar(var_name);
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
VLOG(4) << "Dense var: " << var_name
<< " 's splited var: " << splited_var_name
<< " splited var index: " << splited_var_index;
auto before_run_send_dense = GetCurrentUS(); auto before_run_send_dense = GetCurrentUS();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto *var_x = training_scope_->FindVar(origin_var_name); auto *var_x = training_scope_->FindVar(origin_var_name);
auto var_x_tensor = var_x->Get<framework::LoDTensor>(); auto var_x_tensor = var_x->Get<framework::LoDTensor>();
...@@ -659,55 +669,73 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { ...@@ -659,55 +669,73 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) {
auto *var_y = old_scope_->FindVar(origin_var_name); auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>(); auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto dims = var_x_tensor.dims(); auto dims = var_x_tensor.dims();
auto total_element = var_x_tensor.numel();
// create temp var for sub int64_t section = 0;
auto *var_y_sub = old_scope_->Var(var_name); int64_t begin_loc = 0;
framework::CopyVariable(*var_y, var_y_sub); int64_t dimension = 0;
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
size_t out_num = send_varname_to_ctx_[var_name].height_sections.size();
if (out_num > 1) {
section = send_varname_to_ctx_[var_name].height_sections[splited_var_index];
dims[0] = section;
begin_loc = absolute_section_[origin_var_name][splited_var_index];
dimension = total_element / vars_first_dimension_[origin_var_name];
total_element = section * dimension;
VLOG(4) << "Dense splited var: " << splited_var_name
<< " section: " << section << " dimension: " << dimension
<< " begin loc: " << begin_loc << " total_element "
<< total_element;
}
auto *var_x_data = var_x_tensor.mutable_data<float>(var_x_tensor.place()) +
begin_loc * dimension;
VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
<< var_x_data[0] << " var_x_data[end] "
<< var_x_data[total_element - 1];
auto *var_y_data = var_y_tensor.mutable_data<float>(var_y_tensor.place()) +
begin_loc * dimension;
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
<< var_y_data[0] << " var_y_data[end] "
<< var_y_data[total_element - 1];
// create delta var in delta scope // create delta var in delta scope
auto *var_z = delta_scope_->Var(var_name); auto *var_z_tensor =
auto *var_z_tensor = var_z->GetMutable<framework::LoDTensor>(); delta_scope_->Var(splited_var_name)->GetMutable<framework::LoDTensor>();
var_z_tensor->mutable_data<float>(dims, var_x_tensor.place()); var_z_tensor->Resize(dims);
var_z_tensor->set_lod(var_x_tensor.lod()); var_z_tensor->mutable_data<float>(dims, cpu_ctx.GetPlace());
auto *var_z_data = var_z_tensor->mutable_data<float>(cpu_ctx.GetPlace());
math::SetConstant<paddle::platform::CPUDeviceContext, float> constant_functor; VLOG(4) << "Dense splited var: " << splited_var_name << "var_z_data[0] "
constant_functor(cpu_ctx, var_z_tensor, static_cast<float>(0)); << var_z_data[0] << " var_z_data[end] "
<< var_z_data[total_element - 1];
// calc sub = var_training - var_old // calc sub = var_training - var_old
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx); auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
blas.SCAL(var_y_sub_tensor.numel(), -1, blas.VSUB(total_element, var_x_data, var_y_data, var_z_data);
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place())); VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] "
blas.VADD(var_x_tensor.numel(), << var_z_data[0] << " var_z_data[end] "
var_x_tensor.mutable_data<float>(var_x_tensor.place()), << var_z_data[total_element - 1];
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()),
var_z_tensor->mutable_data<float>(var_z_tensor->place()));
// calc var_delta = sub / trainer_nums // calc var_delta = sub / trainer_nums
float trainer_param = 1.0 / static_cast<float>(trainer_nums_); float trainer_param = 1.0 / static_cast<float>(trainer_nums_);
blas.SCAL(var_z_tensor->numel(), trainer_param, blas.SCAL(total_element, trainer_param, var_z_data);
var_z_tensor->mutable_data<float>(var_z_tensor->place()));
// calc var_old += var_delta // calc var_old += var_delta
blas.VADD(var_y_tensor.numel(), blas.VADD(total_element, var_y_data, var_z_data, var_y_data);
var_y_tensor.mutable_data<float>(var_y_tensor.place()), VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
var_z_tensor->mutable_data<float>(var_z_tensor->place()), << var_y_data[0] << " var_y_data[end] "
var_y_tensor.mutable_data<float>(var_y_tensor.place())); << var_y_data[total_element - 1];
auto after_run_send_dense = GetCurrentUS(); auto after_run_send_dense = GetCurrentUS();
VLOG(3) << "run send update dense var " << var_name << " use time " VLOG(4) << "run send update dense var " << var_name << " use time "
<< after_run_send_dense - before_run_send_dense; << after_run_send_dense - before_run_send_dense;
auto send_functor = distributed::ParameterSend<float>();
auto &ctx = send_varname_to_ctx_.at(var_name);
auto before_send_dense = GetCurrentUS(); auto before_send_dense = GetCurrentUS();
send_functor(ctx, *delta_scope_.get(), true, 1); RpcSend(var_name, splited_var_name, splited_var_index);
auto after_send_denxe = GetCurrentUS(); auto after_send_dense = GetCurrentUS();
VLOG(3) << "send " << var_name << " use time " VLOG(4) << "send " << splited_var_name << " use time "
<< after_send_denxe - before_send_dense; << after_send_dense - before_send_dense;
} }
void GeoSgdCommunicator::SendUpdateSparseVars( void GeoSgdCommunicator::SendUpdateSparseVars(
...@@ -755,14 +783,14 @@ void GeoSgdCommunicator::SendUpdateSparseVars( ...@@ -755,14 +783,14 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
float *z_val = z_value + y * row_numel; float *z_val = z_value + y * row_numel;
std::vector<float> row_delta(row_numel, 0); std::vector<float> row_delta(row_numel, 0);
VSUB<float>(row_numel, x_val, y_val, row_delta.data()); blas.VSUB(row_numel, x_val, y_val, row_delta.data());
blas.SCAL(row_numel, avg, row_delta.data()); blas.SCAL(row_numel, avg, row_delta.data());
blas.VADD(row_numel, row_delta.data(), y_val, y_val); blas.VADD(row_numel, row_delta.data(), y_val, y_val);
blas.VCOPY(row_numel, row_delta.data(), z_val); blas.VCOPY(row_numel, row_delta.data(), z_val);
} }
auto after_run_send_sparse = GetCurrentUS(); auto after_run_send_sparse = GetCurrentUS();
VLOG(3) << "run send update sparse var " << splited_var_name << " use time " VLOG(4) << "run send update sparse var " << splited_var_name << " use time "
<< after_run_send_sparse - before_run_send_sparse; << after_run_send_sparse - before_run_send_sparse;
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
...@@ -779,23 +807,29 @@ void GeoSgdCommunicator::SendUpdateSparseVars( ...@@ -779,23 +807,29 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
auto before_send_sparse = GetCurrentUS(); auto before_send_sparse = GetCurrentUS();
RpcSend(var_name, splited_var_name, splited_var_index); RpcSend(var_name, splited_var_name, splited_var_index);
auto after_send_sparse = GetCurrentUS(); auto after_send_sparse = GetCurrentUS();
VLOG(3) << "send " << splited_var_name << " has nums " << new_rows.size() VLOG(4) << "send " << splited_var_name << " has nums " << new_rows.size()
<< " use time " << after_send_sparse - before_send_sparse; << " use time " << after_send_sparse - before_send_sparse;
} }
void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) { void GeoSgdCommunicator::RecvUpdateDenseVars(
const std::string &var_name, const std::string &splited_var_name) {
// calc var_training += var_pserver - var_old // calc var_training += var_pserver - var_old
// calc var_old = var_pserver // calc var_old = var_pserver
// var_name: param.delta // var_name: param.delta
// step1: recv dense var from pserver // step1: recv dense var from pserver
auto origin_var_name = DeltaVarToVar(var_name); auto origin_var_name = DeltaVarToVar(var_name);
auto origin_splited_var_name = DeltaVarToVar(splited_var_name);
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto before_run_recv = GetCurrentUS(); auto before_run_recv = GetCurrentUS();
auto recv_functor = distributed::ParameterRecv<float>(); VLOG(4) << "Dense recv origin_var_name: " << origin_var_name
recv_functor(recv_varname_to_ctx_[origin_var_name], *pserver_scope_.get()); << " origin_splited_var_name: " << origin_splited_var_name
<< " splited_var_index: " << splited_var_index;
RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index);
auto after_run_recv = GetCurrentUS(); auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_var_name << " use time " VLOG(4) << "recv var " << origin_splited_var_name << " use time "
<< after_run_recv - before_run_recv; << after_run_recv - before_run_recv;
// step2: update dense var // step2: update dense var
...@@ -806,31 +840,75 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) { ...@@ -806,31 +840,75 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) {
auto *var_y = old_scope_->FindVar(origin_var_name); auto *var_y = old_scope_->FindVar(origin_var_name);
auto var_y_tensor = var_y->Get<framework::LoDTensor>(); auto var_y_tensor = var_y->Get<framework::LoDTensor>();
auto *var_y_sub = old_scope_->Var(origin_var_name); auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name);
framework::CopyVariable(*var_y, var_y_sub);
auto var_y_sub_tensor = var_y_sub->Get<framework::LoDTensor>();
auto *var_z = pserver_scope_.get()->FindVar(origin_var_name);
auto var_z_tensor = var_z->Get<framework::LoDTensor>(); auto var_z_tensor = var_z->Get<framework::LoDTensor>();
auto dims = var_z_tensor.dims();
auto total_element = var_z_tensor.numel();
int64_t section = 0;
int64_t begin_loc = 0;
int64_t dimension = 0;
size_t out_num = recv_varname_to_ctx_[origin_var_name].height_sections.size();
if (out_num > 1) {
section = dims[0];
begin_loc = absolute_section_[origin_var_name][splited_var_index];
dimension = total_element / section;
VLOG(4) << "Dense splited var: " << splited_var_name
<< " section: " << section << " dimension: " << dimension
<< " begin loc: " << begin_loc << " total_element "
<< total_element;
}
auto *var_x_data = var_x_tensor.mutable_data<float>(var_x_tensor.place()) +
begin_loc * dimension;
VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
<< var_x_data[0] << " var_x_data[end] "
<< var_x_data[total_element - 1];
auto *var_y_data = var_y_tensor.mutable_data<float>(var_y_tensor.place()) +
begin_loc * dimension;
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
<< var_y_data[0] << " var_y_data[end] "
<< var_y_data[total_element - 1];
auto *var_z_data = var_z_tensor.mutable_data<float>(cpu_ctx.GetPlace());
VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] "
<< var_z_data[0] << " var_z_data[end] "
<< var_z_data[total_element - 1];
auto *var_y_sub_tensor = old_scope_->Var(origin_splited_var_name)
->GetMutable<framework::LoDTensor>();
var_y_sub_tensor->Resize(dims);
var_y_sub_tensor->mutable_data<float>(dims, cpu_ctx.GetPlace());
auto *var_y_sub_data =
var_y_sub_tensor->mutable_data<float>(cpu_ctx.GetPlace());
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] "
<< var_y_sub_data[0] << " var_y_sub_data[end] "
<< var_y_sub_data[total_element - 1];
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx); auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
// calc sub = pserver - old // calc sub = pserver - old
blas.SCAL(var_y_sub_tensor.numel(), -1, blas.VSUB(total_element, var_z_data, var_y_data, var_y_sub_data);
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place())); VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] "
blas.VADD(var_y_tensor.numel(), << var_y_sub_data[0] << " var_y_sub_data[end] "
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()), << var_y_sub_data[total_element - 1];
var_z_tensor.mutable_data<float>(var_z_tensor.place()),
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place())); // calc train += sub
// calc recv += sub blas.VADD(total_element, var_x_data, var_y_sub_data, var_x_data);
blas.VADD(var_x_tensor.numel(), VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
var_x_tensor.mutable_data<float>(var_x_tensor.place()), << var_x_data[0] << " var_x_data[end] "
var_y_sub_tensor.mutable_data<float>(var_y_sub_tensor.place()), << var_x_data[total_element - 1];
var_x_tensor.mutable_data<float>(var_x_tensor.place()));
// calc old = pserver // calc old = pserver
framework::CopyVariable(*var_z, var_y); blas.VCOPY(total_element, var_z_data, var_y_data);
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
<< var_y_data[0] << " var_y_data[end] "
<< var_y_data[total_element - 1];
auto after_run_update = GetCurrentUS(); auto after_run_update = GetCurrentUS();
VLOG(3) << "dese var update " << origin_var_name << " use time " VLOG(4) << "dense var update " << origin_splited_var_name << " use time "
<< after_run_update - before_run_update; << after_run_update - before_run_update;
} }
...@@ -844,7 +922,7 @@ void GeoSgdCommunicator::RecvUpdateSparseVars( ...@@ -844,7 +922,7 @@ void GeoSgdCommunicator::RecvUpdateSparseVars(
auto before_run_recv = GetCurrentUS(); auto before_run_recv = GetCurrentUS();
RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index);
auto after_run_recv = GetCurrentUS(); auto after_run_recv = GetCurrentUS();
VLOG(3) << "recv var " << origin_splited_var_name << " use time " VLOG(4) << "recv var " << origin_splited_var_name << " use time "
<< after_run_recv - before_run_recv; << after_run_recv - before_run_recv;
// step 2: update sparse var // step 2: update sparse var
...@@ -885,13 +963,13 @@ void GeoSgdCommunicator::RecvUpdateSparseVars( ...@@ -885,13 +963,13 @@ void GeoSgdCommunicator::RecvUpdateSparseVars(
float *y_val = y_value + ids * row_numel; float *y_val = y_value + ids * row_numel;
float *z_val = z_value + y * row_numel; float *z_val = z_value + y * row_numel;
VSUB(row_numel, z_val, y_val, row_delta.data()); blas.VSUB(row_numel, z_val, y_val, row_delta.data());
blas.VADD(row_numel, row_delta.data(), x_val, x_val); blas.VADD(row_numel, row_delta.data(), x_val, x_val);
blas.VCOPY(row_numel, z_val, y_val); blas.VCOPY(row_numel, z_val, y_val);
} }
auto after_run_update = GetCurrentUS(); auto after_run_update = GetCurrentUS();
VLOG(3) << "sparse var recv update " << origin_splited_var_name << " has num " VLOG(4) << "sparse var recv update " << origin_splited_var_name << " has num "
<< new_rows.size() << " use time " << new_rows.size() << " use time "
<< after_run_update - before_run_update; << after_run_update - before_run_update;
} }
......
...@@ -364,12 +364,14 @@ class GeoSgdCommunicator : public Communicator { ...@@ -364,12 +364,14 @@ class GeoSgdCommunicator : public Communicator {
const std::vector<SparseIdsMap>& ids_send_vec, const std::vector<SparseIdsMap>& ids_send_vec,
const std::string& var_name, const std::string& splited_var_name); const std::string& var_name, const std::string& splited_var_name);
void SendUpdateDenseVars(const std::string& var_name); void SendUpdateDenseVars(const std::string& var_name,
const std::string& splited_var_name);
void SendUpdateSparseVars(const std::string& var_name, void SendUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name, const std::string& splited_var_name,
const std::unordered_set<int64_t>& ids_table); const std::unordered_set<int64_t>& ids_table);
void RecvUpdateDenseVars(const std::string& var_name); void RecvUpdateDenseVars(const std::string& var_name,
const std::string& splited_var_name);
void RecvUpdateSparseVars(const std::string& var_name, void RecvUpdateSparseVars(const std::string& var_name,
const std::string& splited_var_name); const std::string& splited_var_name);
...@@ -420,21 +422,32 @@ class GeoSgdCommunicator : public Communicator { ...@@ -420,21 +422,32 @@ class GeoSgdCommunicator : public Communicator {
int trainer_nums_ = 1; int trainer_nums_ = 1;
size_t geo_need_push_nums_ = 100; size_t geo_need_push_nums_ = 100;
bool is_geo_sgd_ = false; bool is_geo_sgd_ = false;
Scope* training_scope_; int send_var_nums_ = 0;
std::shared_ptr<Scope> delta_scope_; // parameter local delta: recv - old
std::shared_ptr<Scope>
old_scope_; // parameter local, storage the param after last recv
std::shared_ptr<Scope> pserver_scope_; // parameter on pserver,gloabl scope
RpcCtxMap send_varname_to_ctx_; RpcCtxMap send_varname_to_ctx_;
RpcCtxMap recv_varname_to_ctx_; RpcCtxMap recv_varname_to_ctx_;
std::unordered_map<std::string, bool>
var_list_; // if var is sparse, using selected rows, bool=true // parameter for local training
Scope* training_scope_;
// parameter for delta calc and send
std::shared_ptr<Scope> delta_scope_;
// parameter for storage the pserver param after last recv
std::shared_ptr<Scope> old_scope_;
// parameter on pserver
std::shared_ptr<Scope> pserver_scope_;
// if var is sparse, using selected rows, bool=true
std::unordered_map<std::string, bool> var_list_;
std::shared_ptr<BlockingQueue<std::shared_ptr<SparseIdsMap>>> std::shared_ptr<BlockingQueue<std::shared_ptr<SparseIdsMap>>>
need_push_queue_; need_push_queue_;
std::vector<SparseIdsMap> ids_send_vec_; std::vector<SparseIdsMap> ids_send_vec_;
std::unordered_map<std::string, std::vector<int64_t>> absolute_section_; std::unordered_map<std::string, std::vector<int64_t>> absolute_section_;
std::unordered_map<std::string, int64_t> vars_first_dimension_;
std::unique_ptr<::ThreadPool> send_threadpool_{nullptr}; std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
std::unique_ptr<std::thread> send_thread_{nullptr}; std::unique_ptr<std::thread> send_thread_{nullptr};
......
...@@ -11,6 +11,9 @@ ...@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Distribute CTR model for test fleet api
"""
from __future__ import print_function from __future__ import print_function
...@@ -30,10 +33,22 @@ fluid.default_main_program().random_seed = 1 ...@@ -30,10 +33,22 @@ fluid.default_main_program().random_seed = 1
class TestDistCTR2x2(FleetDistRunnerBase): class TestDistCTR2x2(FleetDistRunnerBase):
"""
For test CTR model, using Fleet api
"""
def net(self, batch_size=4, lr=0.01): def net(self, batch_size=4, lr=0.01):
"""
network definition
Args:
batch_size(int): the size of mini-batch for training
lr(float): learning rate of training
Returns:
avg_cost: LoDTensor of cost.
"""
dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data(
) )
""" network definition """
dnn_data = fluid.layers.data( dnn_data = fluid.layers.data(
name="dnn_data", name="dnn_data",
shape=[-1, 1], shape=[-1, 1],
...@@ -56,7 +71,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -56,7 +71,8 @@ class TestDistCTR2x2(FleetDistRunnerBase):
datas = [dnn_data, lr_data, label] datas = [dnn_data, lr_data, label]
# build dnn model # build dnn model
dnn_layer_dims = [128, 64, 32, 1] # add 12800 for test huge dense Variable
dnn_layer_dims = [128, 128000, 64, 32, 1]
dnn_embedding = fluid.layers.embedding( dnn_embedding = fluid.layers.embedding(
is_distributed=False, is_distributed=False,
input=dnn_data, input=dnn_data,
...@@ -116,6 +132,11 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -116,6 +132,11 @@ class TestDistCTR2x2(FleetDistRunnerBase):
wn.write(str(program)) wn.write(str(program))
def do_training(self, fleet): def do_training(self, fleet):
"""
do training using dataset, using fetch handler to catch variable
Args:
fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
"""
dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data(
) )
...@@ -163,9 +184,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -163,9 +184,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
exe.train_from_dataset( exe.train_from_dataset(
program=fleet.main_program, program=fleet.main_program,
dataset=dataset, dataset=dataset,
fetch_handler=FH([self.avg_cost.name], fetch_handler=FH([self.avg_cost.name], period_secs=2),
period_secs=2,
return_np=True),
debug=False) debug=False)
pass_time = time.time() - pass_start pass_time = time.time() - pass_start
......
...@@ -46,7 +46,7 @@ class TestDistGeoCtr_2x2(TestFleetBase): ...@@ -46,7 +46,7 @@ class TestDistGeoCtr_2x2(TestFleetBase):
required_envs.update(need_envs) required_envs.update(need_envs)
if check_error_log: if check_error_log:
required_envs["GLOG_v"] = "3" required_envs["GLOG_v"] = "4"
required_envs["GLOG_logtostderr"] = "1" required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册