未验证 提交 d028214d 编写于 作者: F Fan Zhang 提交者: GitHub

[CPU-PSLIB] Add config for scale_sparse_grad in config_fleet.py,test=develop (#34893)

上级 ae80df91
......@@ -212,6 +212,7 @@ class DeviceWorker {
FetchConfig fetch_config_;
bool use_cvm_;
bool no_cvm_;
bool scale_sparse_gradient_with_batch_size_;
TrainerDesc trainer_desc_;
// dump params or grads for debug
......
......@@ -89,6 +89,8 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
use_cvm_ = desc.use_cvm();
// for sparse value accessor, embedding only
no_cvm_ = desc.no_cvm();
scale_sparse_gradient_with_batch_size_ =
desc.scale_sparse_gradient_with_batch_size();
scale_datanorm_ = desc.scale_datanorm();
dump_slot_ = desc.dump_slot();
adjust_ins_weight_config_ = desc.adjust_ins_weight_config();
......@@ -591,7 +593,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_);
dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
timeline.Pause();
push_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
......@@ -866,7 +869,8 @@ void DownpourWorker::TrainFiles() {
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_);
dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
}
}
......
......@@ -450,11 +450,13 @@ void DownpourWorkerOpt::TrainFiles() {
break;
}
}
bool scale_sparse_gradient_with_batch_size_ = true;
fleet_ptr_->PushSparseVarsWithLabelAsync(
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_, &sparse_push_keys_[tid], no_cvm_);
dump_slot_, &sparse_push_keys_[tid], no_cvm_,
scale_sparse_gradient_with_batch_size_);
}
}
......
......@@ -870,7 +870,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm) {
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm,
const bool scale_sparse_gradient_with_batch_size) {
#ifdef PADDLE_WITH_PSLIB
int offset = 2;
int slot_offset = 0;
......@@ -939,7 +940,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
}
float* g = g_tensor->data<float>();
if (scale_sparse_gradient_with_batch_size_ && grad_dim > 0) {
if (scale_sparse_gradient_with_batch_size && grad_dim > 0) {
int dim = emb_dim;
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
......
......@@ -209,7 +209,8 @@ class FleetWrapper {
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm);
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm,
const bool scale_sparse_gradient_with_batch_size);
// Push sparse variables to server in async mode
void PushSparseFromTensorWithLabelAsync(
......
......@@ -61,6 +61,7 @@ message TrainerDesc {
optional bool use_ps_gpu = 32 [ default = false ];
optional string user_define_dump_filename = 33;
optional bool scale_sparse_gradient_with_batch_size = 34 [ default = true ];
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;
......
......@@ -825,6 +825,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["no_cvm"] = strategy.get("no_cvm", False)
opt_info["scale_sparse_gradient_with_batch_size"] = strategy.get(
"scale_sparse_gradient_with_batch_size", True)
opt_info["worker_class"] = strategy.get("worker_class",
"DownpourWorker")
opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
......
......@@ -124,6 +124,10 @@ class TrainerDesc(object):
def _set_no_cvm(self, no_cvm=False):
self.proto_desc.no_cvm = no_cvm
def _set_scale_sparse_grad_with_batch_size(
self, scale_sparse_gradient_with_batch_size=True):
self.proto_desc.scale_sparse_gradient_with_batch_size = scale_sparse_gradient_with_batch_size
def _set_scale_datanorm(self, scale_datanorm=-1):
self.proto_desc.scale_datanorm = scale_datanorm
......
......@@ -95,6 +95,10 @@ class TrainerFactory(object):
trainer._set_use_cvm(opt_info["use_cvm"])
if opt_info.get("no_cvm") is not None:
trainer._set_no_cvm(opt_info["no_cvm"])
if opt_info.get(
"scale_sparse_gradient_with_batch_size") is not None:
trainer._set_scale_sparse_grad_with_batch_size(opt_info[
"scale_sparse_gradient_with_batch_size"])
if opt_info.get("scale_datanorm") is not None:
trainer._set_scale_datanorm(opt_info["scale_datanorm"])
if opt_info.get("adjust_ins_weight") is not None:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册