From a647b80afcd1918ab32f89ed1316d7f599a43627 Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Wed, 1 Sep 2021 20:28:13 +0800 Subject: [PATCH] [HeterPs] merge dense && data norm && g2sum (#35029) * merge dense * log level * tensor copy sync * format --- .../framework/fleet/heter_ps/optimizer.cuh.h | 4 +-- paddle/fluid/framework/ps_gpu_trainer.cc | 36 +++++++++++++------ paddle/fluid/framework/trainer.h | 2 ++ .../pslib/optimizer_factory.py | 6 ++-- 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h b/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h index 362877aa16..374984ecdb 100644 --- a/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h +++ b/paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h @@ -44,7 +44,7 @@ class Optimizer { if (w < optimizer_config::min_bound) w = optimizer_config::min_bound; if (w > optimizer_config::max_bound) w = optimizer_config::max_bound; - add_g2sum = scaled_grad * scaled_grad; + add_g2sum += scaled_grad * scaled_grad; g2sum += add_g2sum; } @@ -64,7 +64,7 @@ class Optimizer { w[i] = optimizer_config::mf_min_bound; if (w[i] > optimizer_config::mf_max_bound) w[i] = optimizer_config::mf_max_bound; - add_g2sum = scaled_grad * scaled_grad; + add_g2sum += scaled_grad * scaled_grad; } g2sum += add_g2sum / n; diff --git a/paddle/fluid/framework/ps_gpu_trainer.cc b/paddle/fluid/framework/ps_gpu_trainer.cc index 39bc3f0406..8b16b6a5d0 100644 --- a/paddle/fluid/framework/ps_gpu_trainer.cc +++ b/paddle/fluid/framework/ps_gpu_trainer.cc @@ -57,8 +57,6 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc, trainer_desc.downpour_param().stat_var_names(i)); } VLOG(3) << "going to initialize pull dense worker"; - pull_dense_worker_ = PullDenseWorker::GetInstance(); - pull_dense_worker_->Initialize(trainer_desc); SetDebug(trainer_desc.debug()); trainer_desc_ = trainer_desc; workers_.resize(place_num); @@ -112,15 +110,21 @@ void PSGPUTrainer::InitTrainerEnv(const ProgramDesc& main_program, } } } + for (auto& var : main_program.Block(0).AllVars()) { + if (var->Persistable()) { + auto it = std::find(need_merge_var_names_.begin(), + need_merge_var_names_.end(), var->Name()); + if (it == need_merge_var_names_.end()) { + VLOG(2) << "train param: " << var->Name(); + trainable_param_.push_back(var->Name()); + } + } + } place_ = place; return; } void PSGPUTrainer::InitOtherEnv(const ProgramDesc& main_program) { - pull_dense_worker_->SetRootScope(root_scope_); - for (size_t i = 0; i < places_.size(); ++i) { - pull_dense_worker_->AddThreadScope(workers_[i]->GetThreadScope()); - } VLOG(3) << "init other env done."; } @@ -141,15 +145,27 @@ Scope* PSGPUTrainer::GetWorkerScope(int thread_id) { return nullptr; } template void PSGPUTrainer::MergeToRootScope(LoDTensor* root_tensor, LoDTensor* tensor) { LoDTensor tmp_root; - TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root); + TensorCopySync(*root_tensor, platform::CPUPlace(), &tmp_root); T* tmp_root_data = tmp_root.data(); LoDTensor tmp_tensor; - TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor); + TensorCopySync(*tensor, platform::CPUPlace(), &tmp_tensor); T* data = tmp_tensor.data(); for (int i = 0; i < tmp_tensor.numel(); i++) { tmp_root_data[i] += data[i]; } - TensorCopy(tmp_root, platform::CPUPlace(), root_tensor); + TensorCopySync(tmp_root, platform::CPUPlace(), root_tensor); +} + +void PSGPUTrainer::MergeDenseParam() { + auto thread_scope = workers_[0]->GetThreadScope(); + for (auto& name : trainable_param_) { + VLOG(2) << "merge var " << name << " to root scope"; + Variable* root_var = root_scope_->FindVar(name); + LoDTensor* root_tensor = root_var->GetMutable(); + Variable* var = thread_scope->FindVar(name); + LoDTensor* tensor = var->GetMutable(); + TensorCopySync((*tensor), root_tensor->place(), root_tensor); + } } void PSGPUTrainer::Finalize() { @@ -187,7 +203,7 @@ void PSGPUTrainer::Finalize() { _ForEachDataType_(MergeCallback); } } - pull_dense_worker_->MergeDenseParam(); + MergeDenseParam(); root_scope_->DropKids(); } } // namespace framework diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index fc8fb9327d..0f34c84549 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -265,6 +265,7 @@ class PSGPUTrainer : public TrainerBase { } virtual std::string GetDumpPath(int tid) { return ""; } virtual void InitDumpEnv() {} + virtual void MergeDenseParam(); template void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); @@ -274,6 +275,7 @@ class PSGPUTrainer : public TrainerBase { DownpourWorkerParameter param_; std::map> dense_grad_names_; std::vector need_merge_var_names_; + std::vector trainable_param_; float scale_datanorm_; paddle::platform::Place place_; ProgramDesc program_; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 9a21a5a850..e2fb29c543 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -412,11 +412,13 @@ class DistributedAdam(DistributedOptimizerImplBase): sparse_table_index = 0 for num in range(len(losses)): loss = losses[num] + parameters = None + if parameter_list != None: + parameters = parameter_list[num] prog_id = str(id(loss.block.program)) # param_grads of program params_grads = sorted( - fluid.backward.append_backward(loss, parameter_list, - no_grad_set), + fluid.backward.append_backward(loss, parameters, no_grad_set), key=lambda x: x[0].name) flag_use_ps_gpu = strategy.get("use_ps_gpu", False) -- GitLab