From c167a4b4dd625b3d0b4d392af8f3f2867439c3d9 Mon Sep 17 00:00:00 2001 From: fuyinno4 <35824027+fuyinno4@users.noreply.github.com> Date: Thu, 25 Jul 2019 14:18:54 +0800 Subject: [PATCH] Fix shrink-dense and add scale-datanorm (#18746) Fix FleetWrapper: 1. fix shrink dense: just scale show 2. add datanorm scale: divide datanorm's gradient by batch_size --- paddle/fluid/framework/device_worker.h | 1 + paddle/fluid/framework/downpour_worker.cc | 7 ++-- paddle/fluid/framework/fleet/fleet_wrapper.cc | 36 ++++++++++++++++--- paddle/fluid/framework/fleet/fleet_wrapper.h | 6 ++-- paddle/fluid/framework/trainer_desc.proto | 1 + .../fleet/parameter_server/pslib/__init__.py | 13 +++---- .../pslib/optimizer_factory.py | 1 + python/paddle/fluid/trainer_desc.py | 3 ++ python/paddle/fluid/trainer_factory.py | 1 + 9 files changed, 54 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index d1543eee5dc..5e547940f41 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -182,6 +182,7 @@ class DownpourWorker : public HogwildWorker { bool dump_slot_; bool need_to_push_sparse_; DownpourWorkerParameter param_; + float scale_datanorm_; // just save the value in param_ for easy access std::map label_var_name_; std::map> sparse_key_names_; diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index cf0ac4dd8c6..5882dae8524 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -64,6 +64,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { fleet_ptr_ = FleetWrapper::GetInstance(); fetch_config_ = desc.fetch_config(); use_cvm_ = desc.use_cvm(); + scale_datanorm_ = desc.scale_datanorm(); dump_slot_ = desc.dump_slot(); } @@ -298,7 +299,8 @@ void DownpourWorker::TrainFilesWithProfiler() { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); fleet_ptr_->PushDenseVarsAsync( - *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_); + *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, + scale_datanorm_, cur_batch); } timeline.Pause(); push_dense_time += timeline.ElapsedSec(); @@ -467,7 +469,8 @@ void DownpourWorker::TrainFiles() { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); fleet_ptr_->PushDenseVarsAsync( - *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_); + *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, + scale_datanorm_, cur_batch); } VLOG(3) << "push dense gradient done."; diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 2e638e9b7f0..0c5eb0da191 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -264,7 +264,8 @@ void FleetWrapper::PushDenseVarsSync( void FleetWrapper::PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, - std::vector<::std::future>* push_sparse_status) { + std::vector<::std::future>* push_sparse_status, + float scale_datanorm, int batch_size) { #ifdef PADDLE_WITH_PSLIB std::vector regions; for (auto& t : var_names) { @@ -272,6 +273,20 @@ void FleetWrapper::PushDenseVarsAsync( LoDTensor* tensor = var->GetMutable(); int count = tensor->numel(); float* g = tensor->data(); + if (scale_datanorm >= 0) { + if (t.find(".batch_size@GRAD") != std::string::npos || + t.find(".batch_sum@GRAD") != std::string::npos) { + Eigen::Map mat(g, 1, count); + float scale = 1.0 / batch_size; + mat *= scale; + } else if (t.find(".batch_square_sum@GRAD") != std::string::npos) { + VLOG(3) << "epsilon: " << scale_datanorm; + for (int i = 0; i < count; ++i) { + g[i] = (g[i] - batch_size * scale_datanorm) / batch_size + + batch_size * scale_datanorm; + } + } + } paddle::ps::Region reg(g, count); regions.emplace_back(std::move(reg)); } @@ -508,18 +523,29 @@ void FleetWrapper::ShrinkSparseTable(int table_id) { void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope, std::vector var_list, - float decay) { + float decay, int emb_dim) { #ifdef PADDLE_WITH_PSLIB std::vector regions; for (std::string& name : var_list) { if (name.find("batch_sum") != std::string::npos) { Variable* var = scope->FindVar(name); CHECK(var != nullptr) << "var[" << name << "] not found"; - VLOG(3) << "prepare shrink dense batch_sum"; + VLOG(0) << "prepare shrink dense batch_sum"; LoDTensor* tensor = var->GetMutable(); float* g = tensor->data(); - Eigen::Map mat(g, 1, tensor->numel()); - mat *= decay; + + // show_batch_sum += N * log(decay) + std::string size_name = name; + size_name.replace(size_name.find("batch_sum"), size_name.length(), + "batch_size"); + Variable* var_size = scope->FindVar(size_name); + CHECK(var_size != nullptr) << "var[" << size_name << "] not found"; + VLOG(3) << "shrink dense batch_sum: " << name << ", " << size_name; + float* g_size = var_size->GetMutable()->data(); + + for (int k = 0; k < tensor->numel(); k += emb_dim) { + g[k] = g[k] + g_size[k] * log(decay); + } paddle::ps::Region reg(g, tensor->numel()); regions.emplace_back(std::move(reg)); } else { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 5b9de11450f..4779978689d 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -82,7 +82,8 @@ class FleetWrapper { void PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, - std::vector<::std::future>* push_sparse_status); + std::vector<::std::future>* push_sparse_status, + float scale_datanorm, int batch_size); void PushDenseVarsSync(Scope* scope, const uint64_t table_id, const std::vector& var_names); @@ -149,7 +150,8 @@ class FleetWrapper { void ShrinkSparseTable(int table_id); void ShrinkDenseTable(int table_id, Scope* scope, - std::vector var_list, float decay); + std::vector var_list, float decay, + int emb_dim); // register client to client communication typedef std::function MsgHandlerFunc; diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 600056f62f0..622c6af152a 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -34,6 +34,7 @@ message TrainerDesc { optional FetchConfig fetch_config = 7; optional bool use_cvm = 8 [ default = false ]; optional bool dump_slot = 9 [ default = false ]; + optional float scale_datanorm = 10 [ default = -1 ]; // device worker parameters optional HogwildWorkerParameter hogwild_param = 101; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 48fc302a007..b70a4f5558e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -227,21 +227,22 @@ class PSLib(Fleet): self._fleet_ptr.shrink_sparse_table(i.table_id) self._role_maker._barrier_worker() - def shrink_dense_table(self, decay, scope=None, table_id=None): + def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None): """ - shrink all dense params in pserver by multiplying by decay + shrink batch_sum in pserver by multiplying by decay Args: decay(float): the decay rate, usually range in (0, 1) + emb_dim(int): one element's length in datanorm layer scope(Scope): Scope object, default is fluid.global_scope() table_id(int): table id of shrinking dense table. None means shrink all, you should specify it when using multiple scopes, default is None. Example: - >>> fleet.shrink_dense_table(0.98, myscope1, 1) - >>> fleet.shrink_dense_table(0.98, myscope1, 2) - >>> fleet.shrink_dense_table(0.98, myscope2, 3) + >>> fleet.shrink_dense_table(0.98, 11, myscope1, 1) + >>> fleet.shrink_dense_table(0.98, 11, myscope1, 2) + >>> fleet.shrink_dense_table(0.98, 11, myscope2, 3) """ if scope is None: @@ -260,7 +261,7 @@ class PSLib(Fleet): if skip: continue self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list, - decay) + decay, emb_dim) self._role_maker._barrier_worker() def load_one_table(self, table_id, model_path, **kwargs): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index b9652d0fc3d..3e910551df8 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -162,6 +162,7 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["fleet_desc"] = ps_param opt_info["worker_skipped_ops"] = worker_skipped_ops opt_info["use_cvm"] = strategy.get("use_cvm", False) + opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1) opt_info["dump_slot"] = False if server._server.downpour_server_param.downpour_table_param[ 0].accessor.accessor_class == "DownpourCtrAccessor": diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index 14122aeee07..ec98fba8e69 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -75,6 +75,9 @@ class TrainerDesc(object): def _set_use_cvm(self, use_cvm=False): self.proto_desc.use_cvm = use_cvm + def _set_scale_datanorm(self, scale_datanorm=-1): + self.proto_desc.scale_datanorm = scale_datanorm + def _set_dump_slot(self, dump_slot): self.proto_desc.dump_slot = dump_slot diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index c18ce5db47d..f8ca8893121 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -39,6 +39,7 @@ class TrainerFactory(object): device_worker._set_fleet_desc(opt_info["fleet_desc"]) trainer._set_fleet_desc(opt_info["fleet_desc"]) trainer._set_use_cvm(opt_info["use_cvm"]) + trainer._set_scale_datanorm(opt_info["scale_datanorm"]) trainer._set_dump_slot(opt_info["dump_slot"]) trainer._set_device_worker(device_worker) return trainer -- GitLab