未验证 提交 c167a4b4 编写于 作者: F fuyinno4 提交者: GitHub

Fix shrink-dense and add scale-datanorm (#18746)

Fix FleetWrapper:
1. fix shrink dense: just scale show
2. add datanorm scale: divide datanorm's gradient by batch_size
上级 d3ac561d
...@@ -182,6 +182,7 @@ class DownpourWorker : public HogwildWorker { ...@@ -182,6 +182,7 @@ class DownpourWorker : public HogwildWorker {
bool dump_slot_; bool dump_slot_;
bool need_to_push_sparse_; bool need_to_push_sparse_;
DownpourWorkerParameter param_; DownpourWorkerParameter param_;
float scale_datanorm_;
// just save the value in param_ for easy access // just save the value in param_ for easy access
std::map<uint64_t, std::string> label_var_name_; std::map<uint64_t, std::string> label_var_name_;
std::map<uint64_t, std::vector<std::string>> sparse_key_names_; std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
......
...@@ -64,6 +64,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { ...@@ -64,6 +64,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
fleet_ptr_ = FleetWrapper::GetInstance(); fleet_ptr_ = FleetWrapper::GetInstance();
fetch_config_ = desc.fetch_config(); fetch_config_ = desc.fetch_config();
use_cvm_ = desc.use_cvm(); use_cvm_ = desc.use_cvm();
scale_datanorm_ = desc.scale_datanorm();
dump_slot_ = desc.dump_slot(); dump_slot_ = desc.dump_slot();
} }
...@@ -298,7 +299,8 @@ void DownpourWorker::TrainFilesWithProfiler() { ...@@ -298,7 +299,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
uint64_t tid = static_cast<uint64_t>( uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i)); param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync( fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_); *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
} }
timeline.Pause(); timeline.Pause();
push_dense_time += timeline.ElapsedSec(); push_dense_time += timeline.ElapsedSec();
...@@ -467,7 +469,8 @@ void DownpourWorker::TrainFiles() { ...@@ -467,7 +469,8 @@ void DownpourWorker::TrainFiles() {
uint64_t tid = static_cast<uint64_t>( uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i)); param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync( fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_); *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
} }
VLOG(3) << "push dense gradient done."; VLOG(3) << "push dense gradient done.";
......
...@@ -264,7 +264,8 @@ void FleetWrapper::PushDenseVarsSync( ...@@ -264,7 +264,8 @@ void FleetWrapper::PushDenseVarsSync(
void FleetWrapper::PushDenseVarsAsync( void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status) { std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
std::vector<paddle::ps::Region> regions; std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) { for (auto& t : var_names) {
...@@ -272,6 +273,20 @@ void FleetWrapper::PushDenseVarsAsync( ...@@ -272,6 +273,20 @@ void FleetWrapper::PushDenseVarsAsync(
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
int count = tensor->numel(); int count = tensor->numel();
float* g = tensor->data<float>(); float* g = tensor->data<float>();
if (scale_datanorm >= 0) {
if (t.find(".batch_size@GRAD") != std::string::npos ||
t.find(".batch_sum@GRAD") != std::string::npos) {
Eigen::Map<Eigen::MatrixXf> mat(g, 1, count);
float scale = 1.0 / batch_size;
mat *= scale;
} else if (t.find(".batch_square_sum@GRAD") != std::string::npos) {
VLOG(3) << "epsilon: " << scale_datanorm;
for (int i = 0; i < count; ++i) {
g[i] = (g[i] - batch_size * scale_datanorm) / batch_size +
batch_size * scale_datanorm;
}
}
}
paddle::ps::Region reg(g, count); paddle::ps::Region reg(g, count);
regions.emplace_back(std::move(reg)); regions.emplace_back(std::move(reg));
} }
...@@ -508,18 +523,29 @@ void FleetWrapper::ShrinkSparseTable(int table_id) { ...@@ -508,18 +523,29 @@ void FleetWrapper::ShrinkSparseTable(int table_id) {
void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope, void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
std::vector<std::string> var_list, std::vector<std::string> var_list,
float decay) { float decay, int emb_dim) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
std::vector<paddle::ps::Region> regions; std::vector<paddle::ps::Region> regions;
for (std::string& name : var_list) { for (std::string& name : var_list) {
if (name.find("batch_sum") != std::string::npos) { if (name.find("batch_sum") != std::string::npos) {
Variable* var = scope->FindVar(name); Variable* var = scope->FindVar(name);
CHECK(var != nullptr) << "var[" << name << "] not found"; CHECK(var != nullptr) << "var[" << name << "] not found";
VLOG(3) << "prepare shrink dense batch_sum"; VLOG(0) << "prepare shrink dense batch_sum";
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* g = tensor->data<float>(); float* g = tensor->data<float>();
Eigen::Map<Eigen::MatrixXf> mat(g, 1, tensor->numel());
mat *= decay; // show_batch_sum += N * log(decay)
std::string size_name = name;
size_name.replace(size_name.find("batch_sum"), size_name.length(),
"batch_size");
Variable* var_size = scope->FindVar(size_name);
CHECK(var_size != nullptr) << "var[" << size_name << "] not found";
VLOG(3) << "shrink dense batch_sum: " << name << ", " << size_name;
float* g_size = var_size->GetMutable<LoDTensor>()->data<float>();
for (int k = 0; k < tensor->numel(); k += emb_dim) {
g[k] = g[k] + g_size[k] * log(decay);
}
paddle::ps::Region reg(g, tensor->numel()); paddle::ps::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg)); regions.emplace_back(std::move(reg));
} else { } else {
......
...@@ -82,7 +82,8 @@ class FleetWrapper { ...@@ -82,7 +82,8 @@ class FleetWrapper {
void PushDenseVarsAsync( void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status); std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size);
void PushDenseVarsSync(Scope* scope, const uint64_t table_id, void PushDenseVarsSync(Scope* scope, const uint64_t table_id,
const std::vector<std::string>& var_names); const std::vector<std::string>& var_names);
...@@ -149,7 +150,8 @@ class FleetWrapper { ...@@ -149,7 +150,8 @@ class FleetWrapper {
void ShrinkSparseTable(int table_id); void ShrinkSparseTable(int table_id);
void ShrinkDenseTable(int table_id, Scope* scope, void ShrinkDenseTable(int table_id, Scope* scope,
std::vector<std::string> var_list, float decay); std::vector<std::string> var_list, float decay,
int emb_dim);
// register client to client communication // register client to client communication
typedef std::function<int32_t(int, int, const std::string&)> MsgHandlerFunc; typedef std::function<int32_t(int, int, const std::string&)> MsgHandlerFunc;
......
...@@ -34,6 +34,7 @@ message TrainerDesc { ...@@ -34,6 +34,7 @@ message TrainerDesc {
optional FetchConfig fetch_config = 7; optional FetchConfig fetch_config = 7;
optional bool use_cvm = 8 [ default = false ]; optional bool use_cvm = 8 [ default = false ];
optional bool dump_slot = 9 [ default = false ]; optional bool dump_slot = 9 [ default = false ];
optional float scale_datanorm = 10 [ default = -1 ];
// device worker parameters // device worker parameters
optional HogwildWorkerParameter hogwild_param = 101; optional HogwildWorkerParameter hogwild_param = 101;
......
...@@ -227,21 +227,22 @@ class PSLib(Fleet): ...@@ -227,21 +227,22 @@ class PSLib(Fleet):
self._fleet_ptr.shrink_sparse_table(i.table_id) self._fleet_ptr.shrink_sparse_table(i.table_id)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def shrink_dense_table(self, decay, scope=None, table_id=None): def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None):
""" """
shrink all dense params in pserver by multiplying by decay shrink batch_sum in pserver by multiplying by decay
Args: Args:
decay(float): the decay rate, usually range in (0, 1) decay(float): the decay rate, usually range in (0, 1)
emb_dim(int): one element's length in datanorm layer
scope(Scope): Scope object, default is fluid.global_scope() scope(Scope): Scope object, default is fluid.global_scope()
table_id(int): table id of shrinking dense table. None means shrink all, table_id(int): table id of shrinking dense table. None means shrink all,
you should specify it when using multiple scopes, you should specify it when using multiple scopes,
default is None. default is None.
Example: Example:
>>> fleet.shrink_dense_table(0.98, myscope1, 1) >>> fleet.shrink_dense_table(0.98, 11, myscope1, 1)
>>> fleet.shrink_dense_table(0.98, myscope1, 2) >>> fleet.shrink_dense_table(0.98, 11, myscope1, 2)
>>> fleet.shrink_dense_table(0.98, myscope2, 3) >>> fleet.shrink_dense_table(0.98, 11, myscope2, 3)
""" """
if scope is None: if scope is None:
...@@ -260,7 +261,7 @@ class PSLib(Fleet): ...@@ -260,7 +261,7 @@ class PSLib(Fleet):
if skip: if skip:
continue continue
self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list, self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list,
decay) decay, emb_dim)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def load_one_table(self, table_id, model_path, **kwargs): def load_one_table(self, table_id, model_path, **kwargs):
......
...@@ -162,6 +162,7 @@ class DistributedAdam(DistributedOptimizerImplBase): ...@@ -162,6 +162,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["fleet_desc"] = ps_param opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False) opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1)
opt_info["dump_slot"] = False opt_info["dump_slot"] = False
if server._server.downpour_server_param.downpour_table_param[ if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class == "DownpourCtrAccessor": 0].accessor.accessor_class == "DownpourCtrAccessor":
......
...@@ -75,6 +75,9 @@ class TrainerDesc(object): ...@@ -75,6 +75,9 @@ class TrainerDesc(object):
def _set_use_cvm(self, use_cvm=False): def _set_use_cvm(self, use_cvm=False):
self.proto_desc.use_cvm = use_cvm self.proto_desc.use_cvm = use_cvm
def _set_scale_datanorm(self, scale_datanorm=-1):
self.proto_desc.scale_datanorm = scale_datanorm
def _set_dump_slot(self, dump_slot): def _set_dump_slot(self, dump_slot):
self.proto_desc.dump_slot = dump_slot self.proto_desc.dump_slot = dump_slot
......
...@@ -39,6 +39,7 @@ class TrainerFactory(object): ...@@ -39,6 +39,7 @@ class TrainerFactory(object):
device_worker._set_fleet_desc(opt_info["fleet_desc"]) device_worker._set_fleet_desc(opt_info["fleet_desc"])
trainer._set_fleet_desc(opt_info["fleet_desc"]) trainer._set_fleet_desc(opt_info["fleet_desc"])
trainer._set_use_cvm(opt_info["use_cvm"]) trainer._set_use_cvm(opt_info["use_cvm"])
trainer._set_scale_datanorm(opt_info["scale_datanorm"])
trainer._set_dump_slot(opt_info["dump_slot"]) trainer._set_dump_slot(opt_info["dump_slot"])
trainer._set_device_worker(device_worker) trainer._set_device_worker(device_worker)
return trainer return trainer
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册