diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index d3d0658edcd204d94af1a87d40c4355a7649ec01..13e89092ca7f8443df65b040096d36f61c1258e8 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -183,20 +183,20 @@ void AsyncCommunicator::SendThread() { while (running_) { std::vector> task_futures; task_futures.reserve(send_varname_to_ctx_.size()); - VLOG(3) << "run send graph"; + VLOG(4) << "run send graph"; auto before_run_send_graph = GetCurrentUS(); for (auto &iter : send_varname_to_queue_) { auto &var_name = iter.first; auto &var_queue = iter.second; if (var_queue->Size() > 0) { auto send_task = [this, &var_name, &var_queue] { - VLOG(3) << var_name << " merge and send"; + VLOG(4) << var_name << " merge and send"; std::vector> vars; int merged_var_num = 0; int wait_times = 0; while (merged_var_num < FLAGS_communicator_max_merge_var_num) { if (var_queue->Size() == 0) { - VLOG(3) << "wait_times -> " << wait_times; + VLOG(4) << "wait_times -> " << wait_times; if (wait_times >= FLAGS_communicator_send_wait_times) { break; } @@ -223,14 +223,14 @@ void AsyncCommunicator::SendThread() { ctx.merge_add); } auto after_merge = GetCurrentUS(); - VLOG(3) << "merge " << merged_var_num << " " << var_name + VLOG(4) << "merge " << merged_var_num << " " << var_name << " use time " << after_merge - before_merge; auto send_functor = distributed::ParameterSend(); if (!FLAGS_communicator_fake_rpc) { send_functor(ctx, *send_scope_, true, 1); } auto after_send = GetCurrentUS(); - VLOG(3) << "send " << var_name << " use time " + VLOG(4) << "send " << var_name << " use time " << after_send - after_merge; }; task_futures.emplace_back( @@ -244,7 +244,7 @@ void AsyncCommunicator::SendThread() { } auto after_run_send_graph = GetCurrentUS(); - VLOG(3) << "run send graph use time " + VLOG(4) << "run send graph use time " << after_run_send_graph - before_run_send_graph; Recv(); } @@ -323,7 +323,7 @@ void AsyncCommunicator::RecvAll() { task.wait(); } auto after_recv = GetCurrentUS(); - VLOG(1) << "run recv graph use time " << after_recv - before_send; + VLOG(3) << "run recv graph use time " << after_recv - before_send; } void AsyncCommunicator::Start() { @@ -446,13 +446,15 @@ void GeoSgdCommunicator::InitImpl( recv_varname_to_ctx_[var_name] = operators::distributed::RpcContext( var_name, vars_names, vars_epmap, vars_sections_int, 0); - // record sparse section - if (is_sparse) { - need_thread_nums_ += - send_varname_to_ctx_[send_var_name].height_sections.size(); - absolute_section_[var_name] = operators::ToAbsoluteSection( - send_varname_to_ctx_[send_var_name].height_sections); + absolute_section_[var_name] = operators::ToAbsoluteSection( + send_varname_to_ctx_[send_var_name].height_sections); + + vars_first_dimension_[var_name] = 0; + for (int64_t section : vars_sections_int) { + vars_first_dimension_[var_name] += section; } + + send_var_nums_ += vars_names.size(); } if (send_varname_to_ctx_.size() == 0 && recv_varname_to_ctx_.size() == 0) { @@ -548,7 +550,7 @@ void GeoSgdCommunicator::Send(const std::vector &sparse_var_names, } need_push_queue_->Push(ids_table); auto after_run_send = GetCurrentUS(); - VLOG(3) << "run send_op use time " << after_run_send - before_run_send; + VLOG(4) << "run send_op use time " << after_run_send - before_run_send; } void GeoSgdCommunicator::SendThread() { @@ -557,7 +559,7 @@ void GeoSgdCommunicator::SendThread() { while (running_) { std::vector> task_futures; - task_futures.reserve(send_varname_to_ctx_.size()); + task_futures.reserve(send_var_nums_); int wait_times = 0; while (ids_send_vec_.size() < geo_need_push_nums_) { @@ -567,7 +569,7 @@ void GeoSgdCommunicator::SendThread() { ids_send_vec_.push_back(*(need_push_queue_->Pop())); VLOG(4) << "ids_send_vec_ pushed"; } else if (need_push_queue_->Size() == 0) { - VLOG(3) << "wait_times -> " << wait_times; + VLOG(4) << "wait_times -> " << wait_times; if (wait_times >= FLAGS_communicator_send_wait_times) { break; } @@ -579,10 +581,10 @@ void GeoSgdCommunicator::SendThread() { if (ids_send_vec_.size() >= geo_need_push_nums_) { auto after_run_training = GetCurrentUS(); - VLOG(3) << "run Training use time " + VLOG(4) << "run Training use time " << after_run_training - before_run_training; before_run_training = GetCurrentUS(); - VLOG(3) << "Start send after get need_push_num"; + VLOG(4) << "Start send after get need_push_num"; for (auto &iter : send_varname_to_ctx_) { auto &var_name = iter.first; @@ -591,28 +593,31 @@ void GeoSgdCommunicator::SendThread() { for (auto &splited_var_name : iter.second.splited_var_names) { auto send_task = [this, &var_name, &splited_var_name] { auto before_run_geo = GetCurrentUS(); + VLOG(4) << "ids_send_vec_ size: " << ids_send_vec_.size(); auto ids_set = SparseIdsMerge(ids_send_vec_, var_name, splited_var_name); SendUpdateSparseVars(var_name, splited_var_name, ids_set); RecvUpdateSparseVars(var_name, splited_var_name); auto after_run_geo = GetCurrentUS(); - VLOG(1) << "run GEO-SGD var " << splited_var_name << " use time " + VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time " << after_run_geo - before_run_geo; }; task_futures.emplace_back( send_threadpool_->enqueue(std::move(send_task))); } } else { - auto send_task = [this, &var_name] { - auto before_run_geo = GetCurrentUS(); - SendUpdateDenseVars(var_name); - RecvUpdateDenseVars(var_name); - auto after_run_geo = GetCurrentUS(); - VLOG(3) << "run GEO-SGD var " << var_name << " use time " - << after_run_geo - before_run_geo; - }; - task_futures.emplace_back( - send_threadpool_->enqueue(std::move(send_task))); + for (auto &splited_var_name : iter.second.splited_var_names) { + auto send_task = [this, &var_name, &splited_var_name] { + auto before_run_geo = GetCurrentUS(); + SendUpdateDenseVars(var_name, splited_var_name); + RecvUpdateDenseVars(var_name, splited_var_name); + auto after_run_geo = GetCurrentUS(); + VLOG(3) << "run GEO-SGD var " << splited_var_name << " use time " + << after_run_geo - before_run_geo; + }; + task_futures.emplace_back( + send_threadpool_->enqueue(std::move(send_task))); + } } } for (auto &task_f : task_futures) { @@ -627,31 +632,36 @@ std::unordered_set GeoSgdCommunicator::SparseIdsMerge( const std::vector &ids_send_vec, const std::string &var_name, const std::string &splited_var_name) { // every batch has some sparse id, merge them into one unoredered_set - VLOG(3) << "Sparse Ids merge var: " << var_name + VLOG(4) << "Sparse Ids merge var: " << var_name << " splited var: " << splited_var_name; auto before_run_ids_merge_ = GetCurrentUS(); auto origin_var_name = DeltaVarToVar(var_name); auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); std::unordered_set ids_set; - for (auto ids_map : ids_send_vec) { for (auto id : ids_map[origin_var_name][splited_var_index]) { ids_set.insert(id); } } auto after_run_ids_merge_ = GetCurrentUS(); - VLOG(3) << "run SparseIdsMerge " << splited_var_name << " has nums " + VLOG(4) << "run SparseIdsMerge " << splited_var_name << " has nums " << ids_set.size() << " use time " << after_run_ids_merge_ - before_run_ids_merge_; return ids_set; } -void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { +void GeoSgdCommunicator::SendUpdateDenseVars( + const std::string &var_name, const std::string &splited_var_name) { // calc var_delata = (var_training - var_old)/trainer_nums // calc var_old += var_delta // var_name: param.delta auto origin_var_name = DeltaVarToVar(var_name); + auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); + VLOG(4) << "Dense var: " << var_name + << " 's splited var: " << splited_var_name + << " splited var index: " << splited_var_index; auto before_run_send_dense = GetCurrentUS(); + auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto *var_x = training_scope_->FindVar(origin_var_name); auto var_x_tensor = var_x->Get(); @@ -659,55 +669,73 @@ void GeoSgdCommunicator::SendUpdateDenseVars(const std::string &var_name) { auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); - auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto dims = var_x_tensor.dims(); + auto total_element = var_x_tensor.numel(); + int64_t section = 0; + int64_t begin_loc = 0; + int64_t dimension = 0; + + size_t out_num = send_varname_to_ctx_[var_name].height_sections.size(); + if (out_num > 1) { + section = send_varname_to_ctx_[var_name].height_sections[splited_var_index]; + dims[0] = section; + begin_loc = absolute_section_[origin_var_name][splited_var_index]; + dimension = total_element / vars_first_dimension_[origin_var_name]; + total_element = section * dimension; + VLOG(4) << "Dense splited var: " << splited_var_name + << " section: " << section << " dimension: " << dimension + << " begin loc: " << begin_loc << " total_element " + << total_element; + } - // create temp var for sub - auto *var_y_sub = old_scope_->Var(var_name); - framework::CopyVariable(*var_y, var_y_sub); - auto var_y_sub_tensor = var_y_sub->Get(); + auto *var_x_data = var_x_tensor.mutable_data(var_x_tensor.place()) + + begin_loc * dimension; + VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " + << var_x_data[0] << " var_x_data[end] " + << var_x_data[total_element - 1]; + auto *var_y_data = var_y_tensor.mutable_data(var_y_tensor.place()) + + begin_loc * dimension; + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " + << var_y_data[0] << " var_y_data[end] " + << var_y_data[total_element - 1]; // create delta var in delta scope - auto *var_z = delta_scope_->Var(var_name); - auto *var_z_tensor = var_z->GetMutable(); - var_z_tensor->mutable_data(dims, var_x_tensor.place()); - var_z_tensor->set_lod(var_x_tensor.lod()); + auto *var_z_tensor = + delta_scope_->Var(splited_var_name)->GetMutable(); + var_z_tensor->Resize(dims); + var_z_tensor->mutable_data(dims, cpu_ctx.GetPlace()); + auto *var_z_data = var_z_tensor->mutable_data(cpu_ctx.GetPlace()); - math::SetConstant constant_functor; - constant_functor(cpu_ctx, var_z_tensor, static_cast(0)); + VLOG(4) << "Dense splited var: " << splited_var_name << "var_z_data[0] " + << var_z_data[0] << " var_z_data[end] " + << var_z_data[total_element - 1]; // calc sub = var_training - var_old auto blas = math::GetBlas(cpu_ctx); - blas.SCAL(var_y_sub_tensor.numel(), -1, - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); - blas.VADD(var_x_tensor.numel(), - var_x_tensor.mutable_data(var_x_tensor.place()), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), - var_z_tensor->mutable_data(var_z_tensor->place())); + blas.VSUB(total_element, var_x_data, var_y_data, var_z_data); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] " + << var_z_data[0] << " var_z_data[end] " + << var_z_data[total_element - 1]; // calc var_delta = sub / trainer_nums float trainer_param = 1.0 / static_cast(trainer_nums_); - blas.SCAL(var_z_tensor->numel(), trainer_param, - var_z_tensor->mutable_data(var_z_tensor->place())); + blas.SCAL(total_element, trainer_param, var_z_data); // calc var_old += var_delta - blas.VADD(var_y_tensor.numel(), - var_y_tensor.mutable_data(var_y_tensor.place()), - var_z_tensor->mutable_data(var_z_tensor->place()), - var_y_tensor.mutable_data(var_y_tensor.place())); + blas.VADD(total_element, var_y_data, var_z_data, var_y_data); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " + << var_y_data[0] << " var_y_data[end] " + << var_y_data[total_element - 1]; auto after_run_send_dense = GetCurrentUS(); - VLOG(3) << "run send update dense var " << var_name << " use time " + VLOG(4) << "run send update dense var " << var_name << " use time " << after_run_send_dense - before_run_send_dense; - auto send_functor = distributed::ParameterSend(); - auto &ctx = send_varname_to_ctx_.at(var_name); - auto before_send_dense = GetCurrentUS(); - send_functor(ctx, *delta_scope_.get(), true, 1); - auto after_send_denxe = GetCurrentUS(); - VLOG(3) << "send " << var_name << " use time " - << after_send_denxe - before_send_dense; + RpcSend(var_name, splited_var_name, splited_var_index); + auto after_send_dense = GetCurrentUS(); + VLOG(4) << "send " << splited_var_name << " use time " + << after_send_dense - before_send_dense; } void GeoSgdCommunicator::SendUpdateSparseVars( @@ -755,14 +783,14 @@ void GeoSgdCommunicator::SendUpdateSparseVars( float *z_val = z_value + y * row_numel; std::vector row_delta(row_numel, 0); - VSUB(row_numel, x_val, y_val, row_delta.data()); + blas.VSUB(row_numel, x_val, y_val, row_delta.data()); blas.SCAL(row_numel, avg, row_delta.data()); blas.VADD(row_numel, row_delta.data(), y_val, y_val); blas.VCOPY(row_numel, row_delta.data(), z_val); } auto after_run_send_sparse = GetCurrentUS(); - VLOG(3) << "run send update sparse var " << splited_var_name << " use time " + VLOG(4) << "run send update sparse var " << splited_var_name << " use time " << after_run_send_sparse - before_run_send_sparse; auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); @@ -779,23 +807,29 @@ void GeoSgdCommunicator::SendUpdateSparseVars( auto before_send_sparse = GetCurrentUS(); RpcSend(var_name, splited_var_name, splited_var_index); auto after_send_sparse = GetCurrentUS(); - VLOG(3) << "send " << splited_var_name << " has nums " << new_rows.size() + VLOG(4) << "send " << splited_var_name << " has nums " << new_rows.size() << " use time " << after_send_sparse - before_send_sparse; } -void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) { +void GeoSgdCommunicator::RecvUpdateDenseVars( + const std::string &var_name, const std::string &splited_var_name) { // calc var_training += var_pserver - var_old // calc var_old = var_pserver // var_name: param.delta // step1: recv dense var from pserver auto origin_var_name = DeltaVarToVar(var_name); + auto origin_splited_var_name = DeltaVarToVar(splited_var_name); + auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name); + auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto before_run_recv = GetCurrentUS(); - auto recv_functor = distributed::ParameterRecv(); - recv_functor(recv_varname_to_ctx_[origin_var_name], *pserver_scope_.get()); + VLOG(4) << "Dense recv origin_var_name: " << origin_var_name + << " origin_splited_var_name: " << origin_splited_var_name + << " splited_var_index: " << splited_var_index; + RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); auto after_run_recv = GetCurrentUS(); - VLOG(3) << "recv var " << origin_var_name << " use time " + VLOG(4) << "recv var " << origin_splited_var_name << " use time " << after_run_recv - before_run_recv; // step2: update dense var @@ -806,31 +840,75 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(const std::string &var_name) { auto *var_y = old_scope_->FindVar(origin_var_name); auto var_y_tensor = var_y->Get(); - auto *var_y_sub = old_scope_->Var(origin_var_name); - framework::CopyVariable(*var_y, var_y_sub); - auto var_y_sub_tensor = var_y_sub->Get(); - - auto *var_z = pserver_scope_.get()->FindVar(origin_var_name); + auto *var_z = pserver_scope_.get()->FindVar(origin_splited_var_name); auto var_z_tensor = var_z->Get(); + auto dims = var_z_tensor.dims(); + auto total_element = var_z_tensor.numel(); + + int64_t section = 0; + int64_t begin_loc = 0; + int64_t dimension = 0; + size_t out_num = recv_varname_to_ctx_[origin_var_name].height_sections.size(); + if (out_num > 1) { + section = dims[0]; + begin_loc = absolute_section_[origin_var_name][splited_var_index]; + dimension = total_element / section; + VLOG(4) << "Dense splited var: " << splited_var_name + << " section: " << section << " dimension: " << dimension + << " begin loc: " << begin_loc << " total_element " + << total_element; + } + + auto *var_x_data = var_x_tensor.mutable_data(var_x_tensor.place()) + + begin_loc * dimension; + VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " + << var_x_data[0] << " var_x_data[end] " + << var_x_data[total_element - 1]; + + auto *var_y_data = var_y_tensor.mutable_data(var_y_tensor.place()) + + begin_loc * dimension; + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " + << var_y_data[0] << " var_y_data[end] " + << var_y_data[total_element - 1]; + + auto *var_z_data = var_z_tensor.mutable_data(cpu_ctx.GetPlace()); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] " + << var_z_data[0] << " var_z_data[end] " + << var_z_data[total_element - 1]; + + auto *var_y_sub_tensor = old_scope_->Var(origin_splited_var_name) + ->GetMutable(); + var_y_sub_tensor->Resize(dims); + var_y_sub_tensor->mutable_data(dims, cpu_ctx.GetPlace()); + auto *var_y_sub_data = + var_y_sub_tensor->mutable_data(cpu_ctx.GetPlace()); + + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] " + << var_y_sub_data[0] << " var_y_sub_data[end] " + << var_y_sub_data[total_element - 1]; - auto cpu_ctx = paddle::platform::CPUDeviceContext(); auto blas = math::GetBlas(cpu_ctx); + // calc sub = pserver - old - blas.SCAL(var_y_sub_tensor.numel(), -1, - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); - blas.VADD(var_y_tensor.numel(), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), - var_z_tensor.mutable_data(var_z_tensor.place()), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place())); - // calc recv += sub - blas.VADD(var_x_tensor.numel(), - var_x_tensor.mutable_data(var_x_tensor.place()), - var_y_sub_tensor.mutable_data(var_y_sub_tensor.place()), - var_x_tensor.mutable_data(var_x_tensor.place())); + blas.VSUB(total_element, var_z_data, var_y_data, var_y_sub_data); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] " + << var_y_sub_data[0] << " var_y_sub_data[end] " + << var_y_sub_data[total_element - 1]; + + // calc train += sub + blas.VADD(total_element, var_x_data, var_y_sub_data, var_x_data); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] " + << var_x_data[0] << " var_x_data[end] " + << var_x_data[total_element - 1]; + // calc old = pserver - framework::CopyVariable(*var_z, var_y); + blas.VCOPY(total_element, var_z_data, var_y_data); + VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] " + << var_y_data[0] << " var_y_data[end] " + << var_y_data[total_element - 1]; + auto after_run_update = GetCurrentUS(); - VLOG(3) << "dese var update " << origin_var_name << " use time " + VLOG(4) << "dense var update " << origin_splited_var_name << " use time " << after_run_update - before_run_update; } @@ -844,7 +922,7 @@ void GeoSgdCommunicator::RecvUpdateSparseVars( auto before_run_recv = GetCurrentUS(); RpcRecv(origin_var_name, origin_splited_var_name, splited_var_index); auto after_run_recv = GetCurrentUS(); - VLOG(3) << "recv var " << origin_splited_var_name << " use time " + VLOG(4) << "recv var " << origin_splited_var_name << " use time " << after_run_recv - before_run_recv; // step 2: update sparse var @@ -885,13 +963,13 @@ void GeoSgdCommunicator::RecvUpdateSparseVars( float *y_val = y_value + ids * row_numel; float *z_val = z_value + y * row_numel; - VSUB(row_numel, z_val, y_val, row_delta.data()); + blas.VSUB(row_numel, z_val, y_val, row_delta.data()); blas.VADD(row_numel, row_delta.data(), x_val, x_val); blas.VCOPY(row_numel, z_val, y_val); } auto after_run_update = GetCurrentUS(); - VLOG(3) << "sparse var recv update " << origin_splited_var_name << " has num " + VLOG(4) << "sparse var recv update " << origin_splited_var_name << " has num " << new_rows.size() << " use time " << after_run_update - before_run_update; } diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 7942c5f166791946de379a78866bb75ffe804a5d..c22ab9a798c6703d860f96865b34323a8cbc7444 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -364,12 +364,14 @@ class GeoSgdCommunicator : public Communicator { const std::vector& ids_send_vec, const std::string& var_name, const std::string& splited_var_name); - void SendUpdateDenseVars(const std::string& var_name); + void SendUpdateDenseVars(const std::string& var_name, + const std::string& splited_var_name); void SendUpdateSparseVars(const std::string& var_name, const std::string& splited_var_name, const std::unordered_set& ids_table); - void RecvUpdateDenseVars(const std::string& var_name); + void RecvUpdateDenseVars(const std::string& var_name, + const std::string& splited_var_name); void RecvUpdateSparseVars(const std::string& var_name, const std::string& splited_var_name); @@ -420,21 +422,32 @@ class GeoSgdCommunicator : public Communicator { int trainer_nums_ = 1; size_t geo_need_push_nums_ = 100; bool is_geo_sgd_ = false; - Scope* training_scope_; - std::shared_ptr delta_scope_; // parameter local delta: recv - old - std::shared_ptr - old_scope_; // parameter local, storage the param after last recv - std::shared_ptr pserver_scope_; // parameter on pserver,gloabl scope + int send_var_nums_ = 0; + RpcCtxMap send_varname_to_ctx_; RpcCtxMap recv_varname_to_ctx_; - std::unordered_map - var_list_; // if var is sparse, using selected rows, bool=true + + // parameter for local training + Scope* training_scope_; + + // parameter for delta calc and send + std::shared_ptr delta_scope_; + + // parameter for storage the pserver param after last recv + std::shared_ptr old_scope_; + + // parameter on pserver + std::shared_ptr pserver_scope_; + + // if var is sparse, using selected rows, bool=true + std::unordered_map var_list_; std::shared_ptr>> need_push_queue_; std::vector ids_send_vec_; std::unordered_map> absolute_section_; + std::unordered_map vars_first_dimension_; std::unique_ptr<::ThreadPool> send_threadpool_{nullptr}; std::unique_ptr send_thread_{nullptr}; diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index c27f23a2d8c9c226587c24622376089fd918d334..41a689f849e9c7033f6599fca7ed3ec90bec42c6 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +Distribute CTR model for test fleet api +""" from __future__ import print_function @@ -30,10 +33,22 @@ fluid.default_main_program().random_seed = 1 class TestDistCTR2x2(FleetDistRunnerBase): + """ + For test CTR model, using Fleet api + """ + def net(self, batch_size=4, lr=0.01): + """ + network definition + + Args: + batch_size(int): the size of mini-batch for training + lr(float): learning rate of training + Returns: + avg_cost: LoDTensor of cost. + """ dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( ) - """ network definition """ dnn_data = fluid.layers.data( name="dnn_data", shape=[-1, 1], @@ -56,7 +71,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): datas = [dnn_data, lr_data, label] # build dnn model - dnn_layer_dims = [128, 64, 32, 1] + # add 12800 for test huge dense Variable + dnn_layer_dims = [128, 128000, 64, 32, 1] dnn_embedding = fluid.layers.embedding( is_distributed=False, input=dnn_data, @@ -116,6 +132,11 @@ class TestDistCTR2x2(FleetDistRunnerBase): wn.write(str(program)) def do_training(self, fleet): + """ + do training using dataset, using fetch handler to catch variable + Args: + fleet(Fleet api): the fleet object of Parameter Server, define distribute training role + """ dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( ) @@ -163,9 +184,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe.train_from_dataset( program=fleet.main_program, dataset=dataset, - fetch_handler=FH([self.avg_cost.name], - period_secs=2, - return_np=True), + fetch_handler=FH([self.avg_cost.name], period_secs=2), debug=False) pass_time = time.time() - pass_start diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py index 447f1c16e7dfcd0140392ccd34c1d90350f2bd14..402ae15e9c43423da3619a3debaa7edb7d2d8bdb 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py @@ -46,7 +46,7 @@ class TestDistGeoCtr_2x2(TestFleetBase): required_envs.update(need_envs) if check_error_log: - required_envs["GLOG_v"] = "3" + required_envs["GLOG_v"] = "4" required_envs["GLOG_logtostderr"] = "1" tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)