diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 5e547940f417c7b4ab5c0f007bf1c511f25eab1c..5aa679342256f0bec5c3fcf9050a64c445ab3fbd 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -176,6 +176,7 @@ class DownpourWorker : public HogwildWorker { void FillSparseValue(size_t table_id); void PushGradients(); void CollectLabelInfo(size_t table_id); + void AdjustInsWeight(); private: bool need_to_push_dense_; @@ -205,6 +206,10 @@ class DownpourWorker : public HogwildWorker { std::shared_ptr _pull_dense_worker; std::vector<::std::future> push_sparse_status_; std::vector<::std::future> push_dense_status_; + + // adjust ins weight + AdjustInsWeightConfig adjust_ins_weight_config_; + std::vector nid_show_; }; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 5882dae852421b47944948fcf8a41e663cf806e1..2d9412712c06b87798cfdb19cf05ede5b0b23a57 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -16,6 +16,11 @@ limitations under the License. */ #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/platform/cpu_helper.h" +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + namespace paddle { namespace framework { @@ -66,6 +71,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { use_cvm_ = desc.use_cvm(); scale_datanorm_ = desc.scale_datanorm(); dump_slot_ = desc.dump_slot(); + adjust_ins_weight_config_ = desc.adjust_ins_weight_config(); } void DownpourWorker::CollectLabelInfo(size_t table_idx) { @@ -150,30 +156,130 @@ void DownpourWorker::FillSparseValue(size_t table_idx) { auto& tensor_lod = tensor->lod()[0]; LoD data_lod{tensor_lod}; tensor_emb->set_lod(data_lod); + + bool is_nid = (adjust_ins_weight_config_.need_adjust() && + adjust_ins_weight_config_.nid_slot() == emb_slot_name); + if (is_nid) { + nid_show_.clear(); + } + int nid_ins_index = 0; + for (int index = 0; index < len; ++index) { if (use_cvm_) { if (ids[index] == 0u) { memcpy(ptr + table.emb_dim() * index, init_value.data(), sizeof(float) * table.emb_dim()); + if (is_nid) { + nid_show_.push_back(-1); + ++nid_ins_index; + } continue; } memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data(), sizeof(float) * table.emb_dim()); + if (is_nid && index == tensor->lod()[0][nid_ins_index]) { + nid_show_.push_back(fea_value[fea_idx][0]); + ++nid_ins_index; + } fea_idx++; } else { if (ids[index] == 0u) { memcpy(ptr + table.emb_dim() * index, init_value.data() + 2, sizeof(float) * table.emb_dim()); + if (is_nid) { + nid_show_.push_back(-1); + ++nid_ins_index; + } continue; } memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data() + 2, sizeof(float) * table.emb_dim()); + if (is_nid && index == tensor->lod()[0][nid_ins_index]) { + nid_show_.push_back(fea_value[fea_idx][0]); + ++nid_ins_index; + } fea_idx++; } } } } +void DownpourWorker::AdjustInsWeight() { +#ifdef _LINUX + // check var and tensor not null + if (!adjust_ins_weight_config_.need_adjust()) { + VLOG(0) << "need_adjust=false, skip adjust ins weight"; + return; + } + Variable* nid_var = + thread_scope_->FindVar(adjust_ins_weight_config_.nid_slot()); + if (nid_var == nullptr) { + VLOG(0) << "nid slot var " << adjust_ins_weight_config_.nid_slot() + << " is nullptr, skip adjust ins weight"; + return; + } + LoDTensor* nid_tensor = nid_var->GetMutable(); + if (nid_tensor == nullptr) { + VLOG(0) << "tensor of nid slot var " << adjust_ins_weight_config_.nid_slot() + << " is nullptr, skip adjust ins weight"; + return; + } + Variable* ins_weight_var = + thread_scope_->FindVar(adjust_ins_weight_config_.ins_weight_slot()); + if (ins_weight_var == nullptr) { + VLOG(0) << "ins weight var " << adjust_ins_weight_config_.ins_weight_slot() + << " is nullptr, skip adjust ins weight"; + return; + } + LoDTensor* ins_weight_tensor = ins_weight_var->GetMutable(); + if (ins_weight_tensor == nullptr) { + VLOG(0) << "tensor of ins weight tensor " + << adjust_ins_weight_config_.ins_weight_slot() + << " is nullptr, skip adjust ins weight"; + return; + } + + float* ins_weights = ins_weight_tensor->data(); + size_t len = ins_weight_tensor->numel(); // len = batch size + // here we assume nid_show slot only has one feasign in each instance + CHECK(len == nid_show_.size()) << "ins_weight size should be equal to " + << "nid_show size, " << len << " vs " + << nid_show_.size(); + float nid_adjw_threshold = adjust_ins_weight_config_.nid_adjw_threshold(); + float nid_adjw_ratio = adjust_ins_weight_config_.nid_adjw_ratio(); + int64_t nid_adjw_num = 0; + double nid_adjw_weight = 0.0; + size_t ins_index = 0; + for (int i = 0; i < len; ++i) { + float nid_show = nid_show_[i]; + VLOG(3) << "nid_show " << nid_show; + if (nid_show < 0) { + VLOG(3) << "nid_show < 0, continue"; + continue; + } + float ins_weight = 1.0; + if (nid_show >= 0 && nid_show < nid_adjw_threshold) { + ins_weight = log(M_E + + (nid_adjw_threshold - nid_show) / nid_adjw_threshold * + nid_adjw_ratio); + // count nid adjw insnum and weight + ++nid_adjw_num; + nid_adjw_weight += ins_weight; + // choose large ins weight + VLOG(3) << "ins weight new " << ins_weight << ", ins weight origin " + << ins_weights[ins_index]; + if (ins_weight > ins_weights[ins_index]) { + VLOG(3) << "ins " << ins_index << " weight changes to " << ins_weight; + ins_weights[ins_index] = ins_weight; + } + ++ins_index; + } + } + VLOG(3) << "nid adjw info: total_adjw_num: " << nid_adjw_num + << ", avg_adjw_weight: " << nid_adjw_weight; +#endif +} + void DownpourWorker::TrainFilesWithProfiler() { VLOG(3) << "Begin to train files with profiler"; platform::SetNumThreads(1); @@ -202,6 +308,7 @@ void DownpourWorker::TrainFilesWithProfiler() { double total_time = 0.0; double read_time = 0.0; double pull_sparse_time = 0.0; + double adjust_ins_weight_time = 0.0; double collect_label_time = 0.0; double fill_sparse_time = 0.0; double push_sparse_time = 0.0; @@ -209,8 +316,6 @@ void DownpourWorker::TrainFilesWithProfiler() { int cur_batch; int batch_cnt = 0; uint64_t total_inst = 0; - double op_sum_time = 0; - std::unordered_map op_to_time; timeline.Start(); while ((cur_batch = device_reader_->Next()) > 0) { timeline.Pause(); @@ -245,6 +350,16 @@ void DownpourWorker::TrainFilesWithProfiler() { timeline.Pause(); fill_sparse_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); + timeline.Start(); + auto nid_iter = std::find(sparse_value_names_[tid].begin(), + sparse_value_names_[tid].end(), + adjust_ins_weight_config_.nid_slot()); + if (nid_iter != sparse_value_names_[tid].end()) { + AdjustInsWeight(); + } + timeline.Pause(); + adjust_ins_weight_time += timeline.ElapsedSec(); + total_time += timeline.ElapsedSec(); } VLOG(3) << "Fill sparse value for all sparse table done."; @@ -358,6 +473,8 @@ void DownpourWorker::TrainFilesWithProfiler() { if (thread_id_ == 0) { // should be configured here if (batch_cnt > 0 && batch_cnt % 100 == 0) { + double op_sum_time = 0; + std::unordered_map op_to_time; for (size_t i = 0; i < op_total_time.size(); ++i) { fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i, op_name[i].c_str(), op_total_time[i] / batch_cnt); @@ -382,10 +499,15 @@ void DownpourWorker::TrainFilesWithProfiler() { fprintf(stderr, "push dense time: %fs\n", push_dense_time / batch_cnt); fprintf(stderr, "collect label time: %fs\n", collect_label_time / batch_cnt); + fprintf(stderr, "adjust ins weight time: %fs\n", + adjust_ins_weight_time / batch_cnt); fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt); fprintf(stderr, "IO percent: %f\n", read_time / total_time * 100); + fprintf(stderr, "op run percent: %f\n", op_sum_time / total_time * 100); fprintf(stderr, "pull sparse time percent: %f\n", pull_sparse_time / total_time * 100); + fprintf(stderr, "adjust ins weight time percent: %f\n", + adjust_ins_weight_time / total_time * 100); fprintf(stderr, "collect label time percent: %f\n", collect_label_time / total_time * 100); fprintf(stderr, "fill sparse time percent: %f\n", @@ -425,6 +547,12 @@ void DownpourWorker::TrainFiles() { &feature_values_[tid], table.fea_dim()); CollectLabelInfo(i); FillSparseValue(i); + auto nid_iter = std::find(sparse_value_names_[tid].begin(), + sparse_value_names_[tid].end(), + adjust_ins_weight_config_.nid_slot()); + if (nid_iter != sparse_value_names_[tid].end()) { + AdjustInsWeight(); + } } VLOG(3) << "fill sparse value for all sparse table done."; diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 622c6af152ad7dfef8d68e268b476cf8ced58895..d84708918fb74477e544e97ef30c257cbd60944f 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -43,6 +43,8 @@ message TrainerDesc { optional SectionWorkerParameter section_param = 104; // datafeed desc optional DataFeedDesc data_desc = 201; + // adjust ins weight + optional AdjustInsWeightConfig adjust_ins_weight_config = 301; } message HogwildWorkerParameter { repeated string skip_ops = 1; } @@ -88,6 +90,14 @@ message FetchConfig { optional Method method = 4 [ default = PRINT ]; } +message AdjustInsWeightConfig { + optional bool need_adjust = 1 [ default = false ]; + optional string nid_slot = 2 [ default = "" ]; + optional float nid_adjw_threshold = 3 [ default = 0.0 ]; + optional float nid_adjw_ratio = 4 [ default = 0.0 ]; + optional string ins_weight_slot = 5 [ default = "" ]; +} + message ProgramConfig { required string program_id = 1; repeated int32 push_sparse_table_id = 2; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 3e910551df8cbde1f148b95146408f36d515b1fb..0363ff3761e14936fbbfeef6168ee4d4df8bc0ab 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -167,6 +167,7 @@ class DistributedAdam(DistributedOptimizerImplBase): if server._server.downpour_server_param.downpour_table_param[ 0].accessor.accessor_class == "DownpourCtrAccessor": opt_info["dump_slot"] = True + opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {}) for loss in losses: loss.block.program._fleet_opt = opt_info diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index ec98fba8e69ef8e0f916064511111621641591d9..97bd52d5f15c9591eb3337fe3fd0eeb3233385d4 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -81,6 +81,18 @@ class TrainerDesc(object): def _set_dump_slot(self, dump_slot): self.proto_desc.dump_slot = dump_slot + def _set_adjust_ins_weight(self, config_dict): + self.proto_desc.adjust_ins_weight_config.need_adjust = \ + config_dict.get("need_adjust", False) + self.proto_desc.adjust_ins_weight_config.nid_slot = \ + config_dict.get("nid_slot", "") + self.proto_desc.adjust_ins_weight_config.nid_adjw_threshold = \ + config_dict.get("nid_adjw_threshold", 0.0) + self.proto_desc.adjust_ins_weight_config.nid_adjw_ratio = \ + config_dict.get("nid_adjw_ratio", 0.0) + self.proto_desc.adjust_ins_weight_config.ins_weight_slot = \ + config_dict.get("ins_weight_slot", "") + def _desc(self): from google.protobuf import text_format return self.proto_desc.SerializeToString() diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index f8ca88931215324b74aedcc7d4054b0855d1d0f8..519672f7c8c24568c38385c58753ee73ba71a89e 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -41,5 +41,6 @@ class TrainerFactory(object): trainer._set_use_cvm(opt_info["use_cvm"]) trainer._set_scale_datanorm(opt_info["scale_datanorm"]) trainer._set_dump_slot(opt_info["dump_slot"]) + trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"]) trainer._set_device_worker(device_worker) return trainer