未验证 提交 79f92509 编写于 作者: F Fan Zhang 提交者: GitHub

[CPU-PSLIB] Add config for scale_sparse_grad in config_fleet.py (#34933)

* [CPU-PSLIB] Fix bug for consistency insepection of op's embedding name and sparse table name in config_fleet.py (#34441)

* [CPU-PSLIB] Add config for scale_sparse_grad in config_fleet.py
上级 61c121cd
...@@ -148,6 +148,7 @@ class DeviceWorker { ...@@ -148,6 +148,7 @@ class DeviceWorker {
FetchConfig fetch_config_; FetchConfig fetch_config_;
bool use_cvm_; bool use_cvm_;
bool no_cvm_; bool no_cvm_;
bool scale_sparse_gradient_with_batch_size_;
std::vector<std::string> all_param_; std::vector<std::string> all_param_;
}; };
......
...@@ -78,6 +78,8 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { ...@@ -78,6 +78,8 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
use_cvm_ = desc.use_cvm(); use_cvm_ = desc.use_cvm();
// for sparse value accessor, embedding only // for sparse value accessor, embedding only
no_cvm_ = desc.no_cvm(); no_cvm_ = desc.no_cvm();
scale_sparse_gradient_with_batch_size_ =
desc.scale_sparse_gradient_with_batch_size();
scale_datanorm_ = desc.scale_datanorm(); scale_datanorm_ = desc.scale_datanorm();
dump_slot_ = desc.dump_slot(); dump_slot_ = desc.dump_slot();
dump_fields_.resize(desc.dump_fields_size()); dump_fields_.resize(desc.dump_fields_size());
...@@ -614,7 +616,8 @@ void DownpourWorker::TrainFilesWithProfiler() { ...@@ -614,7 +616,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
*thread_scope_, tid, features_[tid], feature_labels_[tid], *thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_); dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
timeline.Pause(); timeline.Pause();
push_sparse_time += timeline.ElapsedSec(); push_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec();
...@@ -887,7 +890,8 @@ void DownpourWorker::TrainFiles() { ...@@ -887,7 +890,8 @@ void DownpourWorker::TrainFiles() {
*thread_scope_, tid, features_[tid], feature_labels_[tid], *thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_); dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
} }
} }
......
...@@ -464,11 +464,13 @@ void DownpourWorkerOpt::TrainFiles() { ...@@ -464,11 +464,13 @@ void DownpourWorkerOpt::TrainFiles() {
break; break;
} }
} }
bool scale_sparse_gradient_with_batch_size_ = true;
fleet_ptr_->PushSparseVarsWithLabelAsync( fleet_ptr_->PushSparseVarsWithLabelAsync(
*thread_scope_, tid, features_[tid], feature_labels_[tid], *thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_); dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
} }
} }
......
...@@ -531,7 +531,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( ...@@ -531,7 +531,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
std::vector<std::vector<float>>* push_values, std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot, const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm) { std::vector<uint64_t>* sparse_push_keys, const bool no_cvm,
const bool scale_sparse_gradient_with_batch_size) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
int offset = 2; int offset = 2;
int slot_offset = 0; int slot_offset = 0;
...@@ -595,7 +596,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( ...@@ -595,7 +596,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
} }
float* g = g_tensor->data<float>(); float* g = g_tensor->data<float>();
if (scale_sparse_gradient_with_batch_size_ && grad_dim > 0) { if (scale_sparse_gradient_with_batch_size && grad_dim > 0) {
int dim = emb_dim; int dim = emb_dim;
Eigen::Map< Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>> Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
......
...@@ -164,7 +164,8 @@ class FleetWrapper { ...@@ -164,7 +164,8 @@ class FleetWrapper {
std::vector<std::vector<float>>* push_values, std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot, const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm); std::vector<uint64_t>* sparse_push_keys, const bool no_cvm,
const bool scale_sparse_gradient_with_batch_size);
// Push sparse variables to server in async mode // Push sparse variables to server in async mode
void PushSparseFromTensorWithLabelAsync( void PushSparseFromTensorWithLabelAsync(
......
...@@ -51,6 +51,7 @@ message TrainerDesc { ...@@ -51,6 +51,7 @@ message TrainerDesc {
repeated string loss_names = 23; repeated string loss_names = 23;
optional string user_define_dump_filename = 24; optional string user_define_dump_filename = 24;
optional bool scale_sparse_gradient_with_batch_size = 25 [ default = true ];
// device worker parameters // device worker parameters
optional HogwildWorkerParameter hogwild_param = 101; optional HogwildWorkerParameter hogwild_param = 101;
......
...@@ -593,6 +593,8 @@ class DistributedAdam(DistributedOptimizerImplBase): ...@@ -593,6 +593,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["worker_skipped_ops"] = worker_skipped_ops opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False) opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["no_cvm"] = strategy.get("no_cvm", False) opt_info["no_cvm"] = strategy.get("no_cvm", False)
opt_info["scale_sparse_gradient_with_batch_size"] = strategy.get(
"scale_sparse_gradient_with_batch_size", True)
opt_info["stat_var_names"] = strategy.get("stat_var_names", []) opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
opt_info["local_tables"] = strategy.get("local_tables", []) opt_info["local_tables"] = strategy.get("local_tables", [])
opt_info["async_tables"] = strategy.get("async_tables", []) opt_info["async_tables"] = strategy.get("async_tables", [])
......
...@@ -84,6 +84,10 @@ class TrainerDesc(object): ...@@ -84,6 +84,10 @@ class TrainerDesc(object):
def _set_no_cvm(self, no_cvm=False): def _set_no_cvm(self, no_cvm=False):
self.proto_desc.no_cvm = no_cvm self.proto_desc.no_cvm = no_cvm
def _set_scale_sparse_grad_with_batch_size(
self, scale_sparse_gradient_with_batch_size=True):
self.proto_desc.scale_sparse_gradient_with_batch_size = scale_sparse_gradient_with_batch_size
def _set_scale_datanorm(self, scale_datanorm=-1): def _set_scale_datanorm(self, scale_datanorm=-1):
self.proto_desc.scale_datanorm = scale_datanorm self.proto_desc.scale_datanorm = scale_datanorm
......
...@@ -80,6 +80,10 @@ class TrainerFactory(object): ...@@ -80,6 +80,10 @@ class TrainerFactory(object):
trainer._set_use_cvm(opt_info["use_cvm"]) trainer._set_use_cvm(opt_info["use_cvm"])
if opt_info.get("no_cvm") is not None: if opt_info.get("no_cvm") is not None:
trainer._set_no_cvm(opt_info["no_cvm"]) trainer._set_no_cvm(opt_info["no_cvm"])
if opt_info.get(
"scale_sparse_gradient_with_batch_size") is not None:
trainer._set_scale_sparse_grad_with_batch_size(opt_info[
"scale_sparse_gradient_with_batch_size"])
if opt_info.get("scale_datanorm") is not None: if opt_info.get("scale_datanorm") is not None:
trainer._set_scale_datanorm(opt_info["scale_datanorm"]) trainer._set_scale_datanorm(opt_info["scale_datanorm"])
if opt_info.get("adjust_ins_weight") is not None: if opt_info.get("adjust_ins_weight") is not None:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册