未验证 提交 768059b3 编写于 作者: J jiaqi 提交者: GitHub

adjust ins weight according to nid slot (#18784)

adjust ins weight according to nid slot , user can specify adjust_ins_weight in strategy
上级 08fa98f7
......@@ -176,6 +176,7 @@ class DownpourWorker : public HogwildWorker {
void FillSparseValue(size_t table_id);
void PushGradients();
void CollectLabelInfo(size_t table_id);
void AdjustInsWeight();
private:
bool need_to_push_dense_;
......@@ -205,6 +206,10 @@ class DownpourWorker : public HogwildWorker {
std::shared_ptr<PullDenseWorker> _pull_dense_worker;
std::vector<::std::future<int32_t>> push_sparse_status_;
std::vector<::std::future<int32_t>> push_dense_status_;
// adjust ins weight
AdjustInsWeightConfig adjust_ins_weight_config_;
std::vector<float> nid_show_;
};
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
......
......@@ -16,6 +16,11 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/platform/cpu_helper.h"
#if defined _WIN32 || defined __APPLE__
#else
#define _LINUX
#endif
namespace paddle {
namespace framework {
......@@ -66,6 +71,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
use_cvm_ = desc.use_cvm();
scale_datanorm_ = desc.scale_datanorm();
dump_slot_ = desc.dump_slot();
adjust_ins_weight_config_ = desc.adjust_ins_weight_config();
}
void DownpourWorker::CollectLabelInfo(size_t table_idx) {
......@@ -150,30 +156,130 @@ void DownpourWorker::FillSparseValue(size_t table_idx) {
auto& tensor_lod = tensor->lod()[0];
LoD data_lod{tensor_lod};
tensor_emb->set_lod(data_lod);
bool is_nid = (adjust_ins_weight_config_.need_adjust() &&
adjust_ins_weight_config_.nid_slot() == emb_slot_name);
if (is_nid) {
nid_show_.clear();
}
int nid_ins_index = 0;
for (int index = 0; index < len; ++index) {
if (use_cvm_) {
if (ids[index] == 0u) {
memcpy(ptr + table.emb_dim() * index, init_value.data(),
sizeof(float) * table.emb_dim());
if (is_nid) {
nid_show_.push_back(-1);
++nid_ins_index;
}
continue;
}
memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data(),
sizeof(float) * table.emb_dim());
if (is_nid && index == tensor->lod()[0][nid_ins_index]) {
nid_show_.push_back(fea_value[fea_idx][0]);
++nid_ins_index;
}
fea_idx++;
} else {
if (ids[index] == 0u) {
memcpy(ptr + table.emb_dim() * index, init_value.data() + 2,
sizeof(float) * table.emb_dim());
if (is_nid) {
nid_show_.push_back(-1);
++nid_ins_index;
}
continue;
}
memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data() + 2,
sizeof(float) * table.emb_dim());
if (is_nid && index == tensor->lod()[0][nid_ins_index]) {
nid_show_.push_back(fea_value[fea_idx][0]);
++nid_ins_index;
}
fea_idx++;
}
}
}
}
void DownpourWorker::AdjustInsWeight() {
#ifdef _LINUX
// check var and tensor not null
if (!adjust_ins_weight_config_.need_adjust()) {
VLOG(0) << "need_adjust=false, skip adjust ins weight";
return;
}
Variable* nid_var =
thread_scope_->FindVar(adjust_ins_weight_config_.nid_slot());
if (nid_var == nullptr) {
VLOG(0) << "nid slot var " << adjust_ins_weight_config_.nid_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
LoDTensor* nid_tensor = nid_var->GetMutable<LoDTensor>();
if (nid_tensor == nullptr) {
VLOG(0) << "tensor of nid slot var " << adjust_ins_weight_config_.nid_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
Variable* ins_weight_var =
thread_scope_->FindVar(adjust_ins_weight_config_.ins_weight_slot());
if (ins_weight_var == nullptr) {
VLOG(0) << "ins weight var " << adjust_ins_weight_config_.ins_weight_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
LoDTensor* ins_weight_tensor = ins_weight_var->GetMutable<LoDTensor>();
if (ins_weight_tensor == nullptr) {
VLOG(0) << "tensor of ins weight tensor "
<< adjust_ins_weight_config_.ins_weight_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
float* ins_weights = ins_weight_tensor->data<float>();
size_t len = ins_weight_tensor->numel(); // len = batch size
// here we assume nid_show slot only has one feasign in each instance
CHECK(len == nid_show_.size()) << "ins_weight size should be equal to "
<< "nid_show size, " << len << " vs "
<< nid_show_.size();
float nid_adjw_threshold = adjust_ins_weight_config_.nid_adjw_threshold();
float nid_adjw_ratio = adjust_ins_weight_config_.nid_adjw_ratio();
int64_t nid_adjw_num = 0;
double nid_adjw_weight = 0.0;
size_t ins_index = 0;
for (int i = 0; i < len; ++i) {
float nid_show = nid_show_[i];
VLOG(3) << "nid_show " << nid_show;
if (nid_show < 0) {
VLOG(3) << "nid_show < 0, continue";
continue;
}
float ins_weight = 1.0;
if (nid_show >= 0 && nid_show < nid_adjw_threshold) {
ins_weight = log(M_E +
(nid_adjw_threshold - nid_show) / nid_adjw_threshold *
nid_adjw_ratio);
// count nid adjw insnum and weight
++nid_adjw_num;
nid_adjw_weight += ins_weight;
// choose large ins weight
VLOG(3) << "ins weight new " << ins_weight << ", ins weight origin "
<< ins_weights[ins_index];
if (ins_weight > ins_weights[ins_index]) {
VLOG(3) << "ins " << ins_index << " weight changes to " << ins_weight;
ins_weights[ins_index] = ins_weight;
}
++ins_index;
}
}
VLOG(3) << "nid adjw info: total_adjw_num: " << nid_adjw_num
<< ", avg_adjw_weight: " << nid_adjw_weight;
#endif
}
void DownpourWorker::TrainFilesWithProfiler() {
VLOG(3) << "Begin to train files with profiler";
platform::SetNumThreads(1);
......@@ -202,6 +308,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
double total_time = 0.0;
double read_time = 0.0;
double pull_sparse_time = 0.0;
double adjust_ins_weight_time = 0.0;
double collect_label_time = 0.0;
double fill_sparse_time = 0.0;
double push_sparse_time = 0.0;
......@@ -209,8 +316,6 @@ void DownpourWorker::TrainFilesWithProfiler() {
int cur_batch;
int batch_cnt = 0;
uint64_t total_inst = 0;
double op_sum_time = 0;
std::unordered_map<std::string, double> op_to_time;
timeline.Start();
while ((cur_batch = device_reader_->Next()) > 0) {
timeline.Pause();
......@@ -245,6 +350,16 @@ void DownpourWorker::TrainFilesWithProfiler() {
timeline.Pause();
fill_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
timeline.Start();
auto nid_iter = std::find(sparse_value_names_[tid].begin(),
sparse_value_names_[tid].end(),
adjust_ins_weight_config_.nid_slot());
if (nid_iter != sparse_value_names_[tid].end()) {
AdjustInsWeight();
}
timeline.Pause();
adjust_ins_weight_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
}
VLOG(3) << "Fill sparse value for all sparse table done.";
......@@ -358,6 +473,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
if (thread_id_ == 0) {
// should be configured here
if (batch_cnt > 0 && batch_cnt % 100 == 0) {
double op_sum_time = 0;
std::unordered_map<std::string, double> op_to_time;
for (size_t i = 0; i < op_total_time.size(); ++i) {
fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i,
op_name[i].c_str(), op_total_time[i] / batch_cnt);
......@@ -382,10 +499,15 @@ void DownpourWorker::TrainFilesWithProfiler() {
fprintf(stderr, "push dense time: %fs\n", push_dense_time / batch_cnt);
fprintf(stderr, "collect label time: %fs\n",
collect_label_time / batch_cnt);
fprintf(stderr, "adjust ins weight time: %fs\n",
adjust_ins_weight_time / batch_cnt);
fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt);
fprintf(stderr, "IO percent: %f\n", read_time / total_time * 100);
fprintf(stderr, "op run percent: %f\n", op_sum_time / total_time * 100);
fprintf(stderr, "pull sparse time percent: %f\n",
pull_sparse_time / total_time * 100);
fprintf(stderr, "adjust ins weight time percent: %f\n",
adjust_ins_weight_time / total_time * 100);
fprintf(stderr, "collect label time percent: %f\n",
collect_label_time / total_time * 100);
fprintf(stderr, "fill sparse time percent: %f\n",
......@@ -425,6 +547,12 @@ void DownpourWorker::TrainFiles() {
&feature_values_[tid], table.fea_dim());
CollectLabelInfo(i);
FillSparseValue(i);
auto nid_iter = std::find(sparse_value_names_[tid].begin(),
sparse_value_names_[tid].end(),
adjust_ins_weight_config_.nid_slot());
if (nid_iter != sparse_value_names_[tid].end()) {
AdjustInsWeight();
}
}
VLOG(3) << "fill sparse value for all sparse table done.";
......
......@@ -43,6 +43,8 @@ message TrainerDesc {
optional SectionWorkerParameter section_param = 104;
// datafeed desc
optional DataFeedDesc data_desc = 201;
// adjust ins weight
optional AdjustInsWeightConfig adjust_ins_weight_config = 301;
}
message HogwildWorkerParameter { repeated string skip_ops = 1; }
......@@ -88,6 +90,14 @@ message FetchConfig {
optional Method method = 4 [ default = PRINT ];
}
message AdjustInsWeightConfig {
optional bool need_adjust = 1 [ default = false ];
optional string nid_slot = 2 [ default = "" ];
optional float nid_adjw_threshold = 3 [ default = 0.0 ];
optional float nid_adjw_ratio = 4 [ default = 0.0 ];
optional string ins_weight_slot = 5 [ default = "" ];
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
......
......@@ -167,6 +167,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class == "DownpourCtrAccessor":
opt_info["dump_slot"] = True
opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {})
for loss in losses:
loss.block.program._fleet_opt = opt_info
......
......@@ -81,6 +81,18 @@ class TrainerDesc(object):
def _set_dump_slot(self, dump_slot):
self.proto_desc.dump_slot = dump_slot
def _set_adjust_ins_weight(self, config_dict):
self.proto_desc.adjust_ins_weight_config.need_adjust = \
config_dict.get("need_adjust", False)
self.proto_desc.adjust_ins_weight_config.nid_slot = \
config_dict.get("nid_slot", "")
self.proto_desc.adjust_ins_weight_config.nid_adjw_threshold = \
config_dict.get("nid_adjw_threshold", 0.0)
self.proto_desc.adjust_ins_weight_config.nid_adjw_ratio = \
config_dict.get("nid_adjw_ratio", 0.0)
self.proto_desc.adjust_ins_weight_config.ins_weight_slot = \
config_dict.get("ins_weight_slot", "")
def _desc(self):
from google.protobuf import text_format
return self.proto_desc.SerializeToString()
......
......@@ -41,5 +41,6 @@ class TrainerFactory(object):
trainer._set_use_cvm(opt_info["use_cvm"])
trainer._set_scale_datanorm(opt_info["scale_datanorm"])
trainer._set_dump_slot(opt_info["dump_slot"])
trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"])
trainer._set_device_worker(device_worker)
return trainer
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册