diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index cb33e87f490c259eb2fc7e463c4a7f102ee37ce1..a7138fd2642a8878edb9e2855edd97546b3d9ef9 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -219,6 +219,10 @@ void HogwildWorker::TrainFiles() { device_reader_->Start(); int cur_batch; int batch_cnt = 0; + +#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_CUDA) + platform::SetDeviceId(thread_id_); +#endif while ((cur_batch = device_reader_->Next()) > 0) { for (auto &op : ops_) { bool need_skip = false; @@ -244,9 +248,12 @@ void HogwildWorker::TrainFiles() { ++batch_cnt; PrintFetchVars(); thread_scope_->DropKids(); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); +#endif } timeline.Pause(); - VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() + VLOG(1) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() << " seconds, ins_num: " << total_ins_num; if (need_dump_field_ || need_dump_param_) { diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index 7a83fdccc218c48859bf644d282601c828f4bb16..6479f7ae7265483a3d682600a1ea57db452411a2 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -148,6 +148,17 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, } } #endif + for (auto& var : main_program.Block(0).AllVars()) { + if (var->Persistable()) { + auto it = std::find(need_merge_var_names_.begin(), + need_merge_var_names_.end(), var->Name()); + if (it == need_merge_var_names_.end() && + var->GetType() != proto::VarType::SELECTED_ROWS) { + VLOG(2) << "train param: " << var->Name(); + trainable_param_.push_back(var->Name()); + } + } + } } void MultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { @@ -192,18 +203,30 @@ void MultiTrainer::Run() { #ifdef PADDLE_WITH_HETERPS void MultiTrainer::MergeDenseParam() { -#ifdef PADDLE_WTIH_PSCORE +#ifdef PADDLE_WITH_PSCORE auto communicator = paddle::distributed::Communicator::GetInstance(); - auto& recv_ctx = communicator->GetRecvCtxMap(); - Scope* thread_scope = workers_[0]->GetThreadScope(); - for (auto& iter : recv_ctx) { - auto& varnames = iter.second; - for (auto& name : varnames) { + auto thread_scope = workers_[0]->GetThreadScope(); + if (communicator == nullptr) { + for (auto& name : trainable_param_) { + VLOG(2) << "merge var " << name << " to root scope"; Variable* root_var = root_scope_->FindVar(name); LoDTensor* root_tensor = root_var->GetMutable(); Variable* var = thread_scope->FindVar(name); LoDTensor* tensor = var->GetMutable(); - TensorCopy((*tensor), root_tensor->place(), root_tensor); + TensorCopySync((*tensor), root_tensor->place(), root_tensor); + } + } else { + auto& recv_ctx = communicator->GetRecvCtxMap(); + for (auto& iter : recv_ctx) { + auto& varnames = iter.second; + for (auto& name : varnames) { + VLOG(2) << "merge var " << name << " to root scope"; + Variable* root_var = root_scope_->FindVar(name); + LoDTensor* root_tensor = root_var->GetMutable(); + Variable* var = thread_scope->FindVar(name); + LoDTensor* tensor = var->GetMutable(); + TensorCopySync((*tensor), root_tensor->place(), root_tensor); + } } } #endif @@ -236,11 +259,7 @@ void MultiTrainer::Finalize() { } LoDTensor* root_tensor = root_var->GetMutable(); -#ifdef PADDLE_WITH_HETERPS - for (size_t j = 0; j < places_.size(); j++) { -#else for (int j = 1; j < thread_num_; j++) { -#endif Scope* cur_thread_scope = workers_[j]->GetThreadScope(); Variable* thread_var = cur_thread_scope->FindVar(need_merge_var_names_[i]); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index b86b4fec8a57188cda66accc38935396d0dd040d..c78f7611b63bee60ec24caf2fa969abd6464f4c3 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -129,6 +129,7 @@ class MultiTrainer : public TrainerBase { std::vector readers_; std::vector> workers_; std::vector need_merge_var_names_; + std::vector trainable_param_; #ifdef PADDLE_WITH_HETERPS std::vector places_; #endif diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 76e617c7dafcf34052a1a7deab0616417710331a..0792a1eddc7fd9fea4ed4376f853a03362faf1df 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -614,15 +614,24 @@ class PsGpuPass(PassBase): return True def _add_push_box_sparse_op(self, program): + insert_index = -1 + for idx, op in list(enumerate(program.global_block().ops)): + if op.type == "lookup_table_grad": + insert_index = idx for op in program.global_block().ops: - if op.type != "pull_box_sparse": + if op.type != "pull_box_sparse" and op.type != "pull_gpups_sparse": continue grad_op_desc, op_grad_to_var = core.get_grad_op_desc( op.desc, cpt.to_text(set()), []) for op_desc in grad_op_desc: - new_op_desc = program.global_block().desc.append_op() + new_op_desc = program.global_block().desc._insert_op( + insert_index + 1) new_op_desc.copy_from(op_desc) new_op_desc._set_attr(op_role_attr_name, backward) + new_op = paddle.fluid.framework.Operator(program.global_block(), + new_op_desc) + program.global_block().ops.insert(insert_index + 1, new_op) + program.global_block()._sync_with_cpp() def _remove_optimizer_var(self, program): embedding_w = {} diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index c6df7559a22e8114d085af4de14a4db98d3de3e5..888d517116a15ff89f7fa1fe4a530eba96250e1e 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -1013,12 +1013,13 @@ class TheOnePSRuntime(RuntimeBase): if self.context['ps_mode'] == DistributedMode.GEO: self._communicator.init_params(init_params) else: - if role_id == 0: - self._init_all_params(scopes, send_ctx, dense_map) + if not self.context['use_ps_gpu']: + if role_id == 0: + self._init_all_params(scopes, send_ctx, dense_map) fleet.util.barrier() - - self._pull_all_dense(scopes, send_ctx, dense_map) + if not self.context['use_ps_gpu']: + self._pull_all_dense(scopes, send_ctx, dense_map) fleet.util.barrier() if self.context['ps_mode'] == DistributedMode.GEO: