未验证 提交 0ed9b051 编写于 作者: F Fan Zhang 提交者: GitHub

[CPU-PSLIB] Add dump_prob & Modify dump_param (#35414)

* Add dump_prob & Modify dump_param

* Add dump_prob & Modify dump_param

* Add dump_prob & Modify dump_param

* Add dump_prob & Modify dump_param

* Add dump_prob & Modify dump_param
上级 79f92509
...@@ -19,7 +19,7 @@ IF((NOT DEFINED PSLIB_VER) OR (NOT DEFINED PSLIB_URL)) ...@@ -19,7 +19,7 @@ IF((NOT DEFINED PSLIB_VER) OR (NOT DEFINED PSLIB_URL))
MESSAGE(STATUS "use pre defined download url") MESSAGE(STATUS "use pre defined download url")
SET(PSLIB_VER "0.1.1" CACHE STRING "" FORCE) SET(PSLIB_VER "0.1.1" CACHE STRING "" FORCE)
SET(PSLIB_NAME "pslib" CACHE STRING "" FORCE) SET(PSLIB_NAME "pslib" CACHE STRING "" FORCE)
SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib.tar.gz" CACHE STRING "" FORCE) SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib_1.8.5.tar.gz" CACHE STRING "" FORCE)
ENDIF() ENDIF()
MESSAGE(STATUS "PSLIB_NAME: ${PSLIB_NAME}, PSLIB_URL: ${PSLIB_URL}") MESSAGE(STATUS "PSLIB_NAME: ${PSLIB_NAME}, PSLIB_URL: ${PSLIB_URL}")
SET(PSLIB_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib") SET(PSLIB_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib")
......
...@@ -775,8 +775,12 @@ void MultiSlotDataFeed::PutToFeedVec( ...@@ -775,8 +775,12 @@ void MultiSlotDataFeed::PutToFeedVec(
total_instance * sizeof(int64_t)); total_instance * sizeof(int64_t));
} }
LoD data_lod{offset}; // LoD data_lod{offset};
feed_vec_[i]->set_lod(data_lod); // feed_vec_[i]->set_lod(data_lod);
if (!use_slots_is_dense_[i]) {
LoD data_lod{offset};
feed_vec_[i]->set_lod(data_lod);
}
if (use_slots_is_dense_[i]) { if (use_slots_is_dense_[i]) {
if (inductive_shape_index_[i] != -1) { if (inductive_shape_index_[i] != -1) {
use_slots_shape_[i][inductive_shape_index_[i]] = use_slots_shape_[i][inductive_shape_index_[i]] =
...@@ -1101,8 +1105,12 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( ...@@ -1101,8 +1105,12 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec(
CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t)); CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t));
} }
auto& slot_offset = offset[i]; auto& slot_offset = offset[i];
LoD data_lod{slot_offset}; // LoD data_lod{slot_offset};
feed_vec_[i]->set_lod(data_lod); // feed_vec_[i]->set_lod(data_lod);
if (!use_slots_is_dense_[i]) {
LoD data_lod{slot_offset};
feed_vec_[i]->set_lod(data_lod);
}
if (use_slots_is_dense_[i]) { if (use_slots_is_dense_[i]) {
if (inductive_shape_index_[i] != -1) { if (inductive_shape_index_[i] != -1) {
use_slots_shape_[i][inductive_shape_index_[i]] = use_slots_shape_[i][inductive_shape_index_[i]] =
......
...@@ -149,6 +149,7 @@ class DeviceWorker { ...@@ -149,6 +149,7 @@ class DeviceWorker {
bool use_cvm_; bool use_cvm_;
bool no_cvm_; bool no_cvm_;
bool scale_sparse_gradient_with_batch_size_; bool scale_sparse_gradient_with_batch_size_;
float dump_prob_;
std::vector<std::string> all_param_; std::vector<std::string> all_param_;
}; };
...@@ -226,6 +227,7 @@ class DownpourWorker : public HogwildWorker { ...@@ -226,6 +227,7 @@ class DownpourWorker : public HogwildWorker {
void CopySparseTable(); void CopySparseTable();
void CopyDenseTable(); void CopyDenseTable();
void CopyDenseVars(); void CopyDenseVars();
void DumpParam(std::ostringstream& os);
virtual void DumpParam(const int batch_id); virtual void DumpParam(const int batch_id);
DownpourWorkerParameter param_; DownpourWorkerParameter param_;
......
...@@ -78,6 +78,11 @@ void DistMultiTrainer::DumpWork(int tid) { ...@@ -78,6 +78,11 @@ void DistMultiTrainer::DumpWork(int tid) {
std::string path = string::format_string( std::string path = string::format_string(
"%s/part-%03d-%05d", dump_fields_path_.c_str(), mpi_rank_, tid); "%s/part-%03d-%05d", dump_fields_path_.c_str(), mpi_rank_, tid);
if (user_define_dump_filename_ != "") {
path = string::format_string("%s/part-%s", dump_fields_path_.c_str(),
user_define_dump_filename_.c_str());
}
std::shared_ptr<FILE> fp = fs_open_write(path, &err_no, dump_converter_); std::shared_ptr<FILE> fp = fs_open_write(path, &err_no, dump_converter_);
while (1) { while (1) {
std::string out_str; std::string out_str;
......
...@@ -78,6 +78,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { ...@@ -78,6 +78,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
use_cvm_ = desc.use_cvm(); use_cvm_ = desc.use_cvm();
// for sparse value accessor, embedding only // for sparse value accessor, embedding only
no_cvm_ = desc.no_cvm(); no_cvm_ = desc.no_cvm();
dump_prob_ = desc.dump_prob();
scale_sparse_gradient_with_batch_size_ = scale_sparse_gradient_with_batch_size_ =
desc.scale_sparse_gradient_with_batch_size(); desc.scale_sparse_gradient_with_batch_size();
scale_datanorm_ = desc.scale_datanorm(); scale_datanorm_ = desc.scale_datanorm();
...@@ -131,6 +132,25 @@ void DownpourWorker::SetNeedDump(bool need_dump_field) { ...@@ -131,6 +132,25 @@ void DownpourWorker::SetNeedDump(bool need_dump_field) {
need_dump_field_ = need_dump_field; need_dump_field_ = need_dump_field;
} }
void DownpourWorker::DumpParam(std::ostringstream& os) {
for (auto& param : dump_param_) {
Variable* var = thread_scope_->FindVar(param);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t len = tensor->numel();
std::string tensor_str;
try {
tensor_str = PrintLodTensor(tensor, 0, len);
} catch (std::exception& e) {
LOG(WARNING) << "catch exception, param:" << param;
continue;
}
os << "\t" << param << ":" << len << tensor_str;
}
}
void DownpourWorker::DumpParam(const int batch_id) { void DownpourWorker::DumpParam(const int batch_id) {
std::ostringstream os; std::ostringstream os;
for (auto& param : dump_param_) { for (auto& param : dump_param_) {
...@@ -963,43 +983,47 @@ void DownpourWorker::TrainFiles() { ...@@ -963,43 +983,47 @@ void DownpourWorker::TrainFiles() {
} }
if (need_dump_field_) { if (need_dump_field_) {
size_t batch_size = device_reader_->GetCurBatchSize(); size_t batch_size = device_reader_->GetCurBatchSize();
std::vector<std::string> ars(batch_size); std::vector<std::ostringstream> ars(batch_size);
for (auto& ar : ars) { for (auto& ar : ars) {
ar.clear(); ar.clear();
} }
auto& ins_id_vec = device_reader_->GetInsIdVec(); auto& ins_id_vec = device_reader_->GetInsIdVec();
auto& ins_content_vec = device_reader_->GetInsContentVec(); auto& ins_content_vec = device_reader_->GetInsContentVec();
for (size_t i = 0; i < ins_id_vec.size(); i++) { for (size_t i = 0; i < ins_id_vec.size(); i++) {
ars[i] += ins_id_vec[i]; srand((unsigned)time(NULL));
ars[i] = ars[i] + "\t" + ins_content_vec[i]; float random_prob = (float)rand() / RAND_MAX; // NOLINT
} if (random_prob >= dump_prob_) {
for (auto& field : dump_fields_) {
Variable* var = thread_scope_->FindVar(field);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!CheckValidOutput(tensor, batch_size)) {
continue; continue;
} }
for (size_t i = 0; i < batch_size; ++i) { ars[i] << ins_id_vec[i];
ars[i] << "\t" << ins_content_vec[i];
for (auto& field : dump_fields_) {
Variable* var = thread_scope_->FindVar(field);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!CheckValidOutput(tensor, batch_size)) {
continue;
}
auto output_dim = tensor->dims()[1]; auto output_dim = tensor->dims()[1];
std::string output_dimstr = std::string output_dimstr =
boost::lexical_cast<std::string>(output_dim); boost::lexical_cast<std::string>(output_dim);
ars[i] = ars[i] + "\t" + field + ":" + output_dimstr; ars[i] << "\t" << field << ":" << output_dimstr;
auto bound = GetTensorBound(tensor, i); auto bound = GetTensorBound(tensor, i);
ars[i] += PrintLodTensor(tensor, bound.first, bound.second); ars[i] << PrintLodTensor(tensor, bound.first, bound.second);
} }
}
// #pragma omp parallel for if (need_dump_param_ && thread_id_ == 0) {
for (size_t i = 0; i < ars.size(); i++) { DumpParam(ars[i]);
if (ars[i].length() == 0) { }
if (ars[i].str().length() < 2) {
continue; continue;
} }
writer_ << ars[i];
} writer_ << ars[i].str();
if (need_dump_param_ && thread_id_ == 0) {
DumpParam(batch_cnt);
} }
} }
......
...@@ -52,6 +52,7 @@ message TrainerDesc { ...@@ -52,6 +52,7 @@ message TrainerDesc {
optional string user_define_dump_filename = 24; optional string user_define_dump_filename = 24;
optional bool scale_sparse_gradient_with_batch_size = 25 [ default = true ]; optional bool scale_sparse_gradient_with_batch_size = 25 [ default = true ];
optional float dump_prob = 26 [ default = 1.0 ];
// device worker parameters // device worker parameters
optional HogwildWorkerParameter hogwild_param = 101; optional HogwildWorkerParameter hogwild_param = 101;
......
...@@ -131,6 +131,9 @@ class TrainerDesc(object): ...@@ -131,6 +131,9 @@ class TrainerDesc(object):
for loss in loss_names: for loss in loss_names:
self.proto_desc.loss_names.append(loss) self.proto_desc.loss_names.append(loss)
def _set_dump_prob(self, dump_prob):
self.proto_desc.dump_prob = dump_prob
def _set_adjust_ins_weight(self, config_dict): def _set_adjust_ins_weight(self, config_dict):
self.proto_desc.adjust_ins_weight_config.need_adjust = \ self.proto_desc.adjust_ins_weight_config.need_adjust = \
config_dict.get("need_adjust", False) config_dict.get("need_adjust", False)
......
...@@ -68,10 +68,15 @@ class TrainerFactory(object): ...@@ -68,10 +68,15 @@ class TrainerFactory(object):
trainer._set_dump_fields_path(opt_info["dump_fields_path"]) trainer._set_dump_fields_path(opt_info["dump_fields_path"])
if opt_info.get("dump_file_num") is not None: if opt_info.get("dump_file_num") is not None:
trainer._set_dump_file_num(opt_info["dump_file_num"]) trainer._set_dump_file_num(opt_info["dump_file_num"])
if opt_info.get("user_define_dump_filename") is not None:
trainer._set_user_define_dump_filename(opt_info[
"user_define_dump_filename"])
if opt_info.get("dump_converter") is not None: if opt_info.get("dump_converter") is not None:
trainer._set_dump_converter(opt_info["dump_converter"]) trainer._set_dump_converter(opt_info["dump_converter"])
if opt_info.get("dump_param") is not None: if opt_info.get("dump_param") is not None:
trainer._set_dump_param(opt_info["dump_param"]) trainer._set_dump_param(opt_info["dump_param"])
if opt_info.get("dump_prob") is not None:
trainer._set_dump_prob(opt_info["dump_prob"])
if "fleet_desc" in opt_info: if "fleet_desc" in opt_info:
device_worker._set_fleet_desc(opt_info["fleet_desc"]) device_worker._set_fleet_desc(opt_info["fleet_desc"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册