From 6c6a7a14dc94c122c08cb68146ddbab4a2b3f589 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Mon, 2 Sep 2019 10:16:06 +0800 Subject: [PATCH] mpi control && trainer-net update && bug fix --- .../feed/accessor/dense_input_accessor.cc | 45 +++++++++- .../feed/accessor/epoch_accessor.cc | 80 +++++++++++++----- .../feed/accessor/epoch_accessor.h | 10 ++- .../feed/accessor/input_data_accessor.h | 3 +- .../feed/accessor/sparse_input_accessor.cc | 70 +++++++++++++-- .../feed/common/pslib_warpper.cc | 1 + .../feed/common/runtime_environment.cc | 19 ++++- .../feed/common/runtime_environment.h | 21 +++++ .../train/custom_trainer/feed/conf/env.conf | 19 +++++ .../custom_trainer/feed/conf/trainer.yaml | 4 +- .../feed/dataset/data_reader.cc | 1 + .../feed/executor/multi_thread_executor.cc | 42 ++++++--- .../feed/executor/multi_thread_executor.h | 23 +++++ .../feed/io/hadoop_file_system.cc | 2 +- .../train/custom_trainer/feed/io/shell.cc | 1 + .../fluid/train/custom_trainer/feed/main.cc | 2 + .../feed/monitor/auc_monitor.cc | 9 +- .../custom_trainer/feed/monitor/auc_monitor.h | 4 +- .../feed/monitor/cost_monitor.cc | 6 +- .../feed/monitor/cost_monitor.h | 4 +- .../custom_trainer/feed/monitor/monitor.h | 5 +- .../feed/process/learner_process.cc | 43 +++++----- .../feed/process/learner_process.h | 2 - .../feed/scripts/compake_runable_package.sh | 44 ++++++++++ .../train/custom_trainer/feed/scripts/join.py | 31 ++++--- .../feed/scripts/model/join/main_program | Bin 73660 -> 74710 bytes .../feed/scripts/model/join/model.yaml | 2 +- .../feed/scripts/model/join/startup_program | Bin 29136 -> 29822 bytes .../feed/scripts/model/join/test_program | Bin 32643 -> 33168 bytes .../feed/scripts/model/update/main_program | Bin 54527 -> 55307 bytes .../feed/scripts/model/update/startup_program | Bin 18651 -> 19151 bytes .../feed/scripts/model/update/test_program | Bin 23432 -> 23822 bytes .../feed/scripts/start_feed_trainer.sh | 52 +++++++++++- .../custom_trainer/feed/scripts/submit_mpi.sh | 32 +++++++ .../custom_trainer/feed/scripts/update.py | 25 ++++-- .../custom_trainer/feed/trainer_context.h | 2 + 36 files changed, 498 insertions(+), 106 deletions(-) create mode 100644 paddle/fluid/train/custom_trainer/feed/conf/env.conf create mode 100755 paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh create mode 100755 paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc index f1ca59ec..b9a40b0a 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc @@ -1,8 +1,12 @@ +#include +#include "gflags/gflags.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" namespace paddle { namespace custom_trainer { namespace feed { + +DEFINE_string(feed_trainer_debug_dense_name, "", "open dense debug for specif layer_name"); int DenseInputAccessor::initialize(YAML::Node config, std::shared_ptr context_ptr) { @@ -85,7 +89,6 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num, } _pull_mutex.unlock(); } - size_t data_buffer_idx = 0; for (auto& variable : _x_variables) { auto* shape_ptr = &(variable.shape[0]); @@ -97,6 +100,26 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num, memcpy(var_data, _data_buffer + data_buffer_idx, variable.dim * sizeof(float)); data_buffer_idx += variable.dim; } + if (!FLAGS_feed_trainer_debug_dense_name.empty()) { + data_buffer_idx = 0; + std::stringstream ssm; + for (auto& variable : _x_variables) { + if (variable.name != FLAGS_feed_trainer_debug_dense_name) { + data_buffer_idx += variable.dim; + continue; + } + ssm.str(""); + auto& tensor = ScopeHelper::var(scope, variable.name); + const auto* var_data = tensor.data(); + for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) { + if (data_idx > 0) + ssm << ","; + ssm << _data_buffer[data_buffer_idx + data_idx]; + } + data_buffer_idx += variable.dim; + VLOG(2) << "[DEBUG]pull_dense: " << ssm.str(); + } + } if (_need_async_pull) { ++_pull_request_num; } @@ -118,7 +141,25 @@ int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num, } auto* ps_client = _trainer_context->pslib->ps_client(); auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id); - //return push_status.get(); + //push_status.get(); + if (!FLAGS_feed_trainer_debug_dense_name.empty()) { + std::stringstream ssm; + for (auto& variable : _x_variables) { + ssm.str(""); + if (variable.name != FLAGS_feed_trainer_debug_dense_name) { + continue; + } + auto& tensor = scope->Var(variable.gradient_name)-> + Get(); + const auto* var_data = tensor.data(); + for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) { + if (data_idx > 0) + ssm << ","; + ssm << var_data[data_idx]; + } + VLOG(2) << "[DEBUG]push_dense: " << ssm.str(); + } + } return 0; } diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc index dd2d0e95..16bb5195 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc @@ -15,22 +15,22 @@ namespace feed { return -1; } auto fs = _trainer_context->file_system.get(); - if (config["donefile"]) { - _done_file_path = fs->path_join(_model_root_path, config["donefile"].as()); - } else { - _done_file_path = fs->path_join(_model_root_path, "epoch_donefile.txt"); - } - + _done_file_path = fs->path_join(_model_root_path, config["donefile"].as("epoch_donefile.txt")); if (!fs->exists(_done_file_path)) { VLOG(0) << "missing done file, path:" << _done_file_path; return -1; } - std::string done_text = fs->tail(_done_file_path); _done_status = paddle::string::split_string(done_text, std::string("\t")); _current_epoch_id = get_status(EpochStatusFiled::EpochIdField); _last_checkpoint_epoch_id = get_status(EpochStatusFiled::CheckpointIdField); _last_checkpoint_path = get_status(EpochStatusFiled::CheckpointPathField); + _inference_base_model_key = get_status(EpochStatusFiled::InferenceBaseKeyField); + _inference_model_path = fs->path_join(_model_root_path, config["inference_model_dir"].as("xbox")); + _inference_model_base_done_path = fs->path_join(_inference_model_path, + config["inference_base_done_name"].as("xbox_base_done.txt")); + _inference_model_delta_done_path = fs->path_join(_inference_model_path, + config["inference_delta_done_name"].as("xbox_delta_done.txt")); return 0; } @@ -46,31 +46,64 @@ namespace feed { set_status(EpochStatusFiled::CheckpointIdField, _last_checkpoint_epoch_id); set_status(EpochStatusFiled::CheckpointPathField, _last_checkpoint_path); set_status(EpochStatusFiled::DateField, format_timestamp(epoch_id, "%Y%m%d")); - - // 非主节点不做状态持久化 - if (!_trainer_context->environment->is_master_node(EnvironmentRole::WORKER)) { + set_status(EpochStatusFiled::InferenceBaseKeyField, _inference_base_model_key); + return 0; + } + + int EpochAccessor::update_model_donefile( + uint64_t epoch_id, ModelSaveWay save_way) { + auto* env = _trainer_context->environment.get(); + // 非主节点不做done状态持久化 + if (!env->is_master_node(EnvironmentRole::WORKER)) { return 0; } - auto fs = _trainer_context->file_system.get(); - std::string done_str = paddle::string::join_strings(_done_status, '\t'); + std::string done_str; + std::string donefile; + auto model_path = model_save_path(epoch_id, save_way); + std::string inference_done_format("{\"id\":\"%lu\",\"key\":\"%lu\",\"input\":\"%s/000\",\"record_count\":\"1\",\"file_format\":\"pb\",\"schema_version\":\"2\",\"partition_type\":\"1\",\"job_name\":\"%s\",\"job_id\":\"%s\",\"mpi_size\":\"%d\",\"monitor_data\":\"%s\"}"); + + auto id = time(NULL); + switch (save_way) { + case ModelSaveWay::ModelSaveTrainCheckpoint: + donefile = _done_file_path; + done_str = paddle::string::join_strings(_done_status, '\t'); + break; + case ModelSaveWay::ModelSaveInferenceDelta: + donefile = _inference_model_delta_done_path; + done_str = string::format_string(inference_done_format.c_str(), id, _inference_base_model_key, + model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(), + env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str()); + break; + case ModelSaveWay::ModelSaveInferenceBase: + donefile = _inference_model_base_done_path; + _inference_base_model_key = id; + done_str = string::format_string(inference_done_format.c_str(), id, id, + model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(), + env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str()); + break; + } // 保留末尾1000数据 - std::string tail_done_info = paddle::string::trim_spaces(fs->tail(_done_file_path, 1000)); + std::string tail_done_info; + auto fs = _trainer_context->file_system.get(); + if (fs->exists(donefile)) { + tail_done_info = paddle::string::trim_spaces(fs->tail(donefile, 1000)); + } if (tail_done_info.size() > 0) { tail_done_info = tail_done_info + "\n" + done_str; } else { tail_done_info = done_str; } - VLOG(2) << "Write epoch donefile to " << _done_file_path << ", str:" << done_str; + VLOG(2) << "Write donefile " << donefile << ", str:" << done_str; bool write_success = false; while (true) { - fs->remove(_done_file_path); - auto fp = fs->open_write(_done_file_path, ""); + fs->remove(donefile); + auto fp = fs->open_write(donefile, ""); if (fwrite(tail_done_info.c_str(), tail_done_info.length(), 1, &*fp) == 1) { break; } sleep(10); } - VLOG(2) << "Write epoch donefile success"; + VLOG(2) << "Write donefile " << donefile << "success"; return 0; } @@ -126,7 +159,10 @@ namespace feed { case ModelSaveWay::ModelSaveInferenceBase: return is_last_epoch(epoch_id); case ModelSaveWay::ModelSaveTrainCheckpoint: - return delta_id(epoch_id) % 8 == 0; + if (is_last_epoch(epoch_id)) { + return true; + } + return delta_id(epoch_id) % 24 == 0; } return false; } @@ -137,11 +173,11 @@ namespace feed { std::string date_with_hour = format_timestamp(epoch_id, "%Y%m%d%H"); switch (save_way) { case ModelSaveWay::ModelSaveInferenceDelta: - return _trainer_context->file_system->path_join(_model_root_path, - string::format_string("xbox/%s/delta-%d", date.c_str(), delta)); + return _trainer_context->file_system->path_join(_inference_model_path, + string::format_string("%s/delta-%d", date.c_str(), delta)); case ModelSaveWay::ModelSaveInferenceBase: - return _trainer_context->file_system->path_join(_model_root_path, - string::format_string("xbox/%s/base", date.c_str())); + return _trainer_context->file_system->path_join(_inference_model_path, + string::format_string("%s/base", date.c_str())); case ModelSaveWay::ModelSaveTrainCheckpoint: return _trainer_context->file_system->path_join(_model_root_path, string::format_string("batch_model/%s", date_with_hour.c_str())); diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h index 07b15c62..7d42e73e 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h +++ b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h @@ -14,7 +14,8 @@ enum class EpochStatusFiled { TimestampField = 1, CheckpointPathField = 2, EpochIdField = 3, - CheckpointIdField = 4 + CheckpointIdField = 4, + InferenceBaseKeyField = 5 }; class EpochAccessor : public Accessor { @@ -62,14 +63,19 @@ public: virtual bool need_save_model(uint64_t epoch_id, ModelSaveWay save_way) = 0; virtual std::string model_save_path(uint64_t epoch_id, ModelSaveWay save_way) = 0; + virtual int update_model_donefile(uint64_t epoch_id, ModelSaveWay save_way); protected: TrainerContext* _trainer_context; std::string _done_file_path; std::string _model_root_path; + std::string _inference_model_path; + std::string _inference_model_base_done_path; + std::string _inference_model_delta_done_path; uint64_t _current_epoch_id = 0; std::string _last_checkpoint_path; uint64_t _last_checkpoint_epoch_id = 0; - std::vector _done_status; //当前完成状态,统一存成string + std::vector _done_status; // 当前完成状态,统一存成string + uint64_t _inference_base_model_key = 0; // 预估模型的base-key }; REGIST_REGISTERER(EpochAccessor); diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h index bc2063c4..b49eb469 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h +++ b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h @@ -102,7 +102,8 @@ public: paddle::framework::Scope* scope); // SparseGradValue会被依次调用,用于整理push的梯度 virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0; + paddle::ps::ValueAccessor&, SparseInputVariable&, + SampleInstance&, FeatureItem&) = 0; protected: // 输入层列表 diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc index e51fd1b5..35be38bf 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc @@ -1,10 +1,14 @@ #include #include #include +#include +#include "gflags/gflags.h" #include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" +DEFINE_int32(feed_trainer_debug_sparse_slot, 0, "open sparse debug for specif slot"); + namespace paddle { namespace custom_trainer { namespace feed { @@ -99,6 +103,30 @@ int32_t BaseSparseInputAccessor::forward(SampleInstance* samples, } } } + if (FLAGS_feed_trainer_debug_sparse_slot) { + std::stringstream ssm; + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + ssm.str(""); + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) { + continue; + } + if (variable.slot_idx[feature_item.slot()] < 0) { + continue; + } + ssm << "(" << feature_item.sign() << "," << feature_item.slot(); + for (auto weight : feature_item.weights) { + ssm << "," << weight; + } + ssm << ")"; + } + } + VLOG(2) << "[DEBUG][sparse_slot_pull]" << ssm.str(); + } + } // Variable后置处理 for (size_t i = 0; i < _x_variables.size(); ++i) { auto& variable = _x_variables[i]; @@ -145,12 +173,37 @@ int32_t BaseSparseInputAccessor::backward(SampleInstance* samples, const float* grad_data = var_runtime_data[i].gradient_data + samp_idx * variable.total_dim + variable.slot_dim * slot_idx; fill_gradient(&(feature_item.gradients[0]), grad_data, - *value_accessor, variable, samples[samp_idx]); + *value_accessor, variable, samples[samp_idx], feature_item); keys[key_idx] = feature_item.sign(); push_values[key_idx++] = &(feature_item.gradients[0]); } } } + if (FLAGS_feed_trainer_debug_sparse_slot) { + size_t key_idx = 0; + std::stringstream ssm; + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + ssm.str(""); + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) { + continue; + } + if (variable.slot_idx[feature_item.slot()] < 0) { + continue; + } + ssm << "(" << feature_item.sign() << "," << feature_item.slot(); + for (auto weight : feature_item.gradients) { + ssm << "," << weight; + } + ssm << ")"; + } + } + VLOG(2) << "[DEBUG][sparse_slot_push]" << ssm.str(); + } + } auto push_status = ps_client->push_sparse(_table_id, keys.data(), (const float**)push_values, key_idx); //auto ret = push_status.get(); @@ -180,8 +233,8 @@ public: } virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor& value_accessor, - SparseInputVariable& variable, SampleInstance& sample) { + paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable, + SampleInstance& sample, FeatureItem& feature) { // join阶段不回填梯度 CHECK(false); return; @@ -207,12 +260,13 @@ public: } virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor& value_accessor, - SparseInputVariable& variable, SampleInstance& sample) { - push_value[0] += 1; - push_value[1] += sample.labels[0]; + paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable, + SampleInstance& sample, FeatureItem& feature) { + push_value[0] = feature.slot(); + push_value[1] += 1; + push_value[2] += sample.labels[0]; for (size_t i = 0; i < variable.slot_dim; ++i) { - push_value[i + 2] += gradient_raw[i]; + push_value[i + 3] += gradient_raw[i]; } return; } diff --git a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc index fd7bd2d6..2f3a584f 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc @@ -38,6 +38,7 @@ int PSlib::init_server() { _environment->rank_id(EnvironmentRole::PSERVER)); _server_ptr->start(); } + _environment->barrier(EnvironmentRole::ALL); _environment->ps_environment()->gather_ps_servers(); return 0; } diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc index 415f9955..bb0342ea 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc @@ -56,7 +56,6 @@ struct mpi_type_trait { return MPI_UNSIGNED_LONG_LONG; } }; - RuntimeEnvironment::RuntimeEnvironment() {} RuntimeEnvironment::~RuntimeEnvironment() {} bool RuntimeEnvironment::is_master_node(EnvironmentRole role) { @@ -87,13 +86,24 @@ public: return 0; } virtual int wireup() { - int hr = MPI_Init(NULL, NULL); + int argc = 0; + char** argv = NULL; + int hr = MPI_Init(&argc, &argv); if (MPI_SUCCESS != hr) { LOG(FATAL) << "MPI_init failed with error code" << hr; return -1; } _roles_node_info.resize(static_cast(EnvironmentRole::ALL) + 1); add_role(EnvironmentRole::ALL); + + char* value = getenv("JOB_ID"); + if (value) { + _job_id = value; + } + value = getenv("JOB_NAME"); + if (value) { + _job_name = value; + } return 0; } @@ -155,6 +165,11 @@ protected: return; } VLOG(static_cast(level)) << log_str; + /* + static std::mutex mtx; + std::lock_guard guard(mtx); + std::err << log_str; + */ } inline MpiNodeInfo& mpi_node_info(EnvironmentRole role) { diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h index 993ac36d..c147b91f 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h @@ -46,6 +46,15 @@ public: virtual ~RuntimeEnvironment(); // 配置初始化 virtual int initialize(YAML::Node config) = 0; + + // job 信息 + virtual std::string job_id() { + return _job_id; + } + virtual std::string job_name() { + return _job_name; + } + // 设置role virtual int add_role(EnvironmentRole role) = 0; // 判断role @@ -90,9 +99,21 @@ public: protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, EnvironmentLogLevel level, const std::string& log_str) = 0; + + std::string _job_id = "default_job_id"; + std::string _job_name = "default_job_name"; }; REGIST_REGISTERER(RuntimeEnvironment); +#define ENVLOG_WORKER_ALL_NOTICE \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::NOTICE, +#define ENVLOG_WORKER_MASTER_NOTICE \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::NOTICE, +#define ENVLOG_WORKER_ALL_ERROR \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::ERROR, +#define ENVLOG_WORKER_MASTER_ERROR \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::ERROR, + std::string format_timestamp(time_t time, const char* format); inline std::string format_timestamp(time_t time, const std::string& format) { return format_timestamp(time, format.c_str()); diff --git a/paddle/fluid/train/custom_trainer/feed/conf/env.conf b/paddle/fluid/train/custom_trainer/feed/conf/env.conf new file mode 100644 index 00000000..f97cf557 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/conf/env.conf @@ -0,0 +1,19 @@ +HPC_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/smart_client +HADOOP_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/hadoop-client/hadoop/ + +#===============Job-related config====================== +MPI_JOB_NAME=feed_smfw_shoubai_video_cupai_new_arch +MPI_QUEUE=feed5 +MPI_PRIORITY=high +MPI_NODE_NUM=100 +MPI_WALL_TIME=700:00:00 +MPI_NODE_MEM=100000 +MPI_RESOURCE=full + +#===========MPI cluster Server(nmg-off/10g/hlan)========== +MPI_SERVER=yq01-hpc-lvliang01-smart-master.dmop.baidu.com + +#===========Cluster-related (HDFS/MPI Server)============== +HDFS_ROOT=/user/feed/mlarch/mio_temp/$(date +%Y%m%d-%H%M%S-%N) +HADOOP_FS=afs://xingtian.afs.baidu.com:9902 +HADOOP_UGI=mlarch,Fv1M87 diff --git a/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml b/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml index c71205b2..5c687fcb 100644 --- a/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml +++ b/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml @@ -44,7 +44,7 @@ executor: train_batch_size: 32 input_parse_thread_num: 10 push_gradient_thread_num: 16 - train_thread_num: 16 + train_thread_num: 12 need_dump_all_model: true - name: update class: SimpleExecutor @@ -52,5 +52,5 @@ executor: train_batch_size: 32 input_parse_thread_num: 10 push_gradient_thread_num: 16 - train_thread_num: 16 + train_thread_num: 12 need_dump_all_model: false diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc index 3898583d..7eb3156b 100755 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc @@ -469,6 +469,7 @@ public: return read_all(file_list, data_channel); } virtual int read_all(const std::vector& file_list, ::paddle::framework::Channel data_channel) { + data_channel->Open(); const int file_list_size = file_list.size(); std::atomic is_failed(false); diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc index 28d0eb41..af3d4f7c 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc @@ -1,3 +1,4 @@ +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include "paddle/fluid/train/custom_trainer/feed/monitor/monitor.h" #include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" @@ -94,20 +95,22 @@ paddle::framework::Channel MultiThreadExecutor::run( [this, parser](DataItem* item, size_t item_num, ScopePoolObj* scope, size_t* scope_num, size_t thread_idx) -> int { *scope_num = 1; + paddle::platform::Timer timer; + timer.Start(); auto scope_obj = _scope_obj_pool->get(); - auto* samples = new SampleInstance[item_num]; + auto* scope_context = new ScopeExecutorContext(item_num); + auto* samples = scope_context->samples(); for (size_t i = 0; i parse_to_sample(item[i], samples[i]) == 0); } for (size_t i = 0; i < _input_accessors.size(); ++i) { _input_accessors[i]->forward(samples, item_num, scope_obj.get()); } - int64_t data_for_scope = (int64_t)samples; + timer.Pause(); + scope_context->prepare_cost_ms = timer.ElapsedMS(); + int64_t data_for_scope = (int64_t)scope_context; ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, - "sample_data", data_for_scope); - data_for_scope = (int64_t)item_num; - ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, - "sample_num", data_for_scope); + "scope_context", data_for_scope); *scope = std::move(scope_obj); return 0; }); @@ -123,7 +126,14 @@ paddle::framework::Channel MultiThreadExecutor::run( auto* executor = _thread_executors[thread_idx].get(); size_t& out_idx = *out_num; for (out_idx = 0; out_idx < in_num; ++out_idx) { - CHECK(executor->run(in_items[out_idx].get()) == 0); + auto* scope = in_items[out_idx].get(); + auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "scope_context")); + paddle::platform::Timer timer; + timer.Start(); + CHECK(executor->run(scope) == 0); + timer.Pause(); + scope_ctx->executor_cost_ms = timer.ElapsedMS(); out_items[out_idx] = std::move(in_items[out_idx]); } return 0; @@ -139,20 +149,24 @@ paddle::framework::Channel MultiThreadExecutor::run( int* out_items, size_t* out_num, size_t thread_idx) -> int { size_t& out_idx = *out_num; for (out_idx = 0; out_idx < in_num; ++out_idx) { + paddle::platform::Timer timer; + timer.Start(); auto* scope = in_items[out_idx].get(); - auto sample_num = *ScopeHelper::get_value( - scope, _trainer_context->cpu_place, "sample_num"); + auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "scope_context")); + auto* samples = scope_ctx->samples(); + auto sample_num = scope_ctx->sample_num(); - auto* samples = (SampleInstance*)(*ScopeHelper::get_value( - scope, _trainer_context->cpu_place, "sample_data")); for (size_t i = 0; i < _input_accessors.size(); ++i) { out_items[out_idx] = _input_accessors[i]-> backward(samples, sample_num, scope); } + timer.Pause(); + scope_ctx->push_gradient_cost_ms = timer.ElapsedMS(); for (auto& monitor : _monitors) { - monitor->add_data(epoch_id, this, samples, sample_num); + monitor->add_data(epoch_id, this, scope_ctx); } - delete[] samples; // 所有pipe完成后,再回收sample + delete scope_ctx; // 所有pipe完成后,再回收sample } return 0; }); @@ -167,6 +181,8 @@ paddle::framework::Channel MultiThreadExecutor::run( monitor->compute_result(); VLOG(2) << "[Monitor]" << _train_exe_name << ", monitor:" << monitor->get_name() << ", result:" << monitor->format_result(); + _trainer_context->monitor_ssm << _train_exe_name << ":" << + monitor->get_name() << ":" << monitor->format_result() << ","; monitor->reset(); } } diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h index 12792945..e1fbc42d 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h @@ -11,6 +11,29 @@ namespace feed { class Monitor; typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj; +class ScopeExecutorContext { +public: + ScopeExecutorContext(size_t sample_num) { + _samples = new SampleInstance[sample_num]; + _sample_num = sample_num; + } + virtual ~ScopeExecutorContext() { + delete[] _samples; + } + inline SampleInstance* samples() { + return _samples; + } + inline size_t sample_num() { + return _sample_num; + } + size_t executor_cost_ms = 0; + size_t prepare_cost_ms = 0; + size_t push_gradient_cost_ms = 0; +private: + size_t _sample_num = 0; + SampleInstance* _samples = NULL; +}; + class MultiThreadExecutor { public: MultiThreadExecutor() {} diff --git a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc index 2af8e082..6ef87d7e 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc @@ -73,7 +73,7 @@ public: } shell_execute(string::format_string( - "%s -rmr %s &>/dev/null; true", _hdfs_command.c_str(), path.c_str())); + "%s -rmr %s &>/dev/null; true", hdfs_command(path).c_str(), path.c_str())); } std::vector list(const std::string& path) override { diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.cc b/paddle/fluid/train/custom_trainer/feed/io/shell.cc index d6fae5d0..3fc87085 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/shell.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.cc @@ -356,6 +356,7 @@ std::string shell_get_command_output(const std::string& cmd) { return reader.get(); } } + VLOG(2) << "run shell cmd:" << cmd << ", errno:" << err_no; } while (err_no == -1); return ""; #endif diff --git a/paddle/fluid/train/custom_trainer/feed/main.cc b/paddle/fluid/train/custom_trainer/feed/main.cc index 8e8c9851..de71d9ae 100644 --- a/paddle/fluid/train/custom_trainer/feed/main.cc +++ b/paddle/fluid/train/custom_trainer/feed/main.cc @@ -32,6 +32,7 @@ int main(int argc, char* argv[]) { } auto* environment = trainer_context_ptr->environment.get(); environment->wireup(); + VLOG(2) << "node_num: " << environment->node_num(EnvironmentRole::ALL); if (environment->node_num(EnvironmentRole::ALL) == 1) { environment->add_role(EnvironmentRole::WORKER); environment->add_role(EnvironmentRole::PSERVER); @@ -42,6 +43,7 @@ int main(int argc, char* argv[]) { } trainer_context_ptr->pslib.reset(new PSlib()); std::string ps_config = config["environment"]["ps"].as(); + trainer_context_ptr->environment->barrier(EnvironmentRole::ALL); trainer_context_ptr->pslib->initialize(ps_config, environment); //VLOG(3) << "Node Start With Role:" << role; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc index 7ad7054f..3748d9d3 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc @@ -1,4 +1,5 @@ #include "paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" namespace paddle { namespace custom_trainer { @@ -19,7 +20,9 @@ int AucMonitor::initialize(const YAML::Node& config, std::shared_ptrsample_num(); + auto* samples = ctx->samples(); CHECK(num > 0); std::lock_guard lock(_mutex); for (int i = 0; i < num; ++i) { @@ -80,6 +83,10 @@ std::string AucMonitor::format_result() { } void AucMonitor::add_unlocked(double pred, int label) { + if (std::isnan(pred)) { + VLOG(2) << "pred[" << pred << "] outside of [0,1]"; + continue; + } CHECK(pred >= 0 && pred <= 1) << "pred[" << pred << "] outside of [0,1]"; CHECK(label == 0 || label == 1) << "label[" << label << "] invalid"; _table[label][std::min(int(pred * _table_size), _table_size - 1)]++; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h index 51af181f..c668a731 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h @@ -18,8 +18,8 @@ public: std::shared_ptr context_ptr) override; //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const MultiThreadExecutor* executor, - SampleInstance* samples, size_t num); + virtual void add_data(int epoch_id, + const MultiThreadExecutor* executor, ScopeExecutorContext*); //是否开始结果统计 virtual bool need_compute_result(int epoch_id); diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc index 207bbab8..1c247e0a 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc +++ b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc @@ -1,4 +1,5 @@ #include "paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" namespace paddle { namespace custom_trainer { @@ -12,8 +13,9 @@ int CostMonitor::initialize(const YAML::Node& config, std::shared_ptrsample_num(); + auto* samples = ctx->samples(); CHECK(executor != nullptr); //TODO use paddle time _total_time_ms += 1; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h index f64d93d0..6c78bfa9 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h @@ -18,9 +18,7 @@ public: //添加一项记录,统计内容Monitor自行从Executor按需获取 virtual void add_data(int epoch_id, - const MultiThreadExecutor* executor, - SampleInstance* samples, - size_t num); + const MultiThreadExecutor* executor, ScopeExecutorContext*); //是否开始结果统计 virtual bool need_compute_result(int epoch_id); diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h index 4a963634..fc5258ac 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h @@ -10,6 +10,7 @@ namespace paddle { namespace custom_trainer { namespace feed { class MultiThreadExecutor; +class ScopeExecutorContext; class Monitor { public: @@ -25,8 +26,8 @@ public: } //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const MultiThreadExecutor* executor, - SampleInstance* samples, size_t num) = 0; + virtual void add_data(int epoch_id, + const MultiThreadExecutor* executor, ScopeExecutorContext*) = 0; //是否对于当前epoch_id进行结果统计 virtual bool need_compute_result(int epoch_id) = 0; diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc index 6eb8fb65..0f7b204e 100755 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc @@ -3,6 +3,7 @@ *Train样本 */ #include +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include "paddle/fluid/train/custom_trainer/feed/dataset/dataset.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h" @@ -25,26 +26,18 @@ int LearnerProcess::initialize(std::shared_ptr context_ptr) { return 0; } -std::future LearnerProcess::save_model(uint64_t epoch_id, int table_id, ModelSaveWay way) { - std::promise p; - auto ret = p.get_future(); - auto* ps_client = _context_ptr->pslib->ps_client(); - auto* epoch_accessor = _context_ptr->epoch_accessor.get(); - if (epoch_accessor->need_save_model(epoch_id, way)) { - VLOG(2) << "Start save model, table_id:" << table_id; - auto model_dir = epoch_accessor->model_save_path(epoch_id, way); - return ps_client->save(table_id, model_dir, std::to_string((int)way)); - } else { - p.set_value(0); - } - return ret; -} - int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) { + auto* ps_client = _context_ptr->pslib->ps_client(); auto* environment = _context_ptr->environment.get(); + auto* epoch_accessor = _context_ptr->epoch_accessor.get(); if (!environment->is_master_node(EnvironmentRole::WORKER)) { return 0; } + if (!epoch_accessor->need_save_model(epoch_id, way)) { + return 0; + } + paddle::platform::Timer timer; + timer.Start(); std::set table_set; for (auto& executor : _executors) { const auto& table_accessors = executor->table_accessors(); @@ -56,13 +49,18 @@ int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) { auto table_num = table_set.size(); std::future rets[table_num]; for (auto table_id : table_set) { - rets[ret_size++] = save_model(epoch_id, table_id, way); + VLOG(2) << "Start save model, table_id:" << table_id; + auto model_dir = epoch_accessor->model_save_path(epoch_id, way); + rets[ret_size++] = ps_client->save(table_id, model_dir, std::to_string((int)way)); } int all_ret = 0; for (int i = 0; i < ret_size; ++i) { rets[i].wait(); all_ret |= rets[i].get(); } + timer.Pause(); + VLOG(2) << "Save Model Cost(s):" << timer.ElapsedSec(); + _context_ptr->epoch_accessor->update_model_donefile(epoch_id, way); return all_ret; } @@ -115,6 +113,7 @@ int LearnerProcess::run() { while (true) { epoch_accessor->next_epoch(); + _context_ptr->monitor_ssm.str(""); bool already_dump_inference_model = false; epoch_id = epoch_accessor->current_epoch_id(); std::string epoch_log_title = paddle::string::format_string( @@ -141,6 +140,8 @@ int LearnerProcess::run() { std::map> backup_input_map; for (auto& executor : _executors) { environment->barrier(EnvironmentRole::WORKER); + paddle::platform::Timer timer; + timer.Start(); VLOG(2) << "Start executor:" << executor->train_exe_name(); auto data_name = executor->train_data_name(); paddle::framework::Channel input_channel; @@ -150,12 +151,12 @@ int LearnerProcess::run() { input_channel = dataset->fetch_data(data_name, epoch_id); } input_channel = executor->run(input_channel, dataset->data_parser(data_name)); - VLOG(2) << "End executor:" << executor->train_exe_name(); + timer.Pause(); + VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost" << timer.ElapsedSec(); // 等待异步梯度完成 _context_ptr->ps_client()->flush(); environment->barrier(EnvironmentRole::WORKER); - if (executor->is_dump_all_model()) { already_dump_inference_model = true; wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta); @@ -167,16 +168,12 @@ int LearnerProcess::run() { //Step3. Dump Model For Delta&&Checkpoint { - if (!already_dump_inference_model) { - already_dump_inference_model = true; - wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta); - } + wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceBase); wait_save_model(epoch_id, ModelSaveWay::ModelSaveTrainCheckpoint); environment->barrier(EnvironmentRole::WORKER); epoch_accessor->epoch_done(epoch_id); environment->barrier(EnvironmentRole::WORKER); - } //Step4. Output Monitor && RunStatus diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h index 86f0378a..49b69562 100644 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h @@ -22,8 +22,6 @@ protected: virtual int load_model(uint64_t epoch_id); // 同步保存所有模型 virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way); -// 异步保存指定模型 -virtual std::future save_model(uint64_t epoch_id, int table_id, ModelSaveWay way); private: std::vector> _executors; diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh b/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh new file mode 100755 index 00000000..d7c746fc --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh @@ -0,0 +1,44 @@ +#!/bin/bash +#用于运行期的hadoop访问 +TRAINER_HODOOP_HOME="" +#用于跟据网络脚本生成模型 +TRAINER_PYTHON_HOME="/home/xiexionghang/paddle/py-paddle/" + +#环境准备 +if [ ! -f ${TRAINER_PYTHON_HOME}/python/bin/paddle ];then + echo "Miss File: ${TRAINER_PYTHON_HOME}/python/bin/paddle" + echo "TRAINER_PYTHON_HOME:${TRAINER_PYTHON_HOME} is invalid, Fix it, or Get From here:" + echo "wget ftp://cp01-arch-gr06.epc.baidu.com/home/xiexionghang/paddle/py-paddle.tar.gz" + echo "Then set TRAINER_PYTHON_HOME" + exit 0 +fi +TRAINER_PYTHON_BIN=${TRAINER_PYTHON_HOME}/python/bin/python +# for bad paddle 这里需要想办法解决,paddle的前置目录太多 +if [ ! -f ../../../third_party/install/pslib/lib/libps.so ];then + mkdir -p ../../../third_party/install/pslib/lib/ + ln -s ${TRAINER_PYTHON_HOME}/third_party/install/pslib/lib/libps.so ../../../third_party/install/pslib/lib/libps.so +fi + + +#生成模型配置 +#这里按名匹配 可能会出现匹配错误&兼容性差的问题,最好是先python解析yaml文件 +items=`grep " name:" conf/trainer.yaml | awk -F ':' '{print $2}' |awk '{sub("^ *","");sub(" *$","");print}'` +for item in ${items[@]}; +do + if [ ! -f scripts/${item}.py ];then + echo "Missing model_net config: scripts/${item}.py, skip it $item" + continue + fi + rm -rf model/$item + ${TRAINER_PYTHON_BIN} scripts/create_programs.py scripts/${item}.py + if [ $? -ne 0 ];then + echo "Create model with scripts/${item}.py failed" + exit 1 + fi +done + +#输出package包 +rm -rf package +mkdir package +cp -r bin conf tool scripts model so package +cp -r ${TRAINER_HODOOP_HOME} package/hadoop-client diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/join.py b/paddle/fluid/train/custom_trainer/feed/scripts/join.py index 42279410..94001fd7 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/join.py +++ b/paddle/fluid/train/custom_trainer/feed/scripts/join.py @@ -26,22 +26,33 @@ def inference(): # TODO: build network here cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False) - net = cvm_input net = fluid.layers.data_norm(input=net, name="bn6048", epsilon=1e-4, param_attr={"batch_size":1e4, "batch_sum_default":0.0, "batch_square":1e4}) - net = fluid.layers.fc(net, 511, act='relu', name='fc_1') - net = fluid.layers.fc(net, 255, act='relu', name='fc_2') - net = fluid.layers.fc(net, 255, act='relu', name='fc_3') - net = fluid.layers.fc(net, 127, act='relu', name='fc_4') - net = fluid.layers.fc(net, 127, act='relu', name='fc_5') - net = fluid.layers.fc(net, 127, act='relu', name='fc_6') - net = fluid.layers.fc(net, 127, act='relu', name='fc_7') - + lr_x = 1.0 + init_range = 0.2 + fc_layers_size = [511, 255, 255, 127, 127, 127, 127] + fc_layers_act = ["relu"] * len(fc_layers_size) + scales_tmp = [net.shape[1]] + fc_layers_size + scales = [] + for i in range(len(scales_tmp)): + scales.append(init_range / (scales_tmp[i] ** 0.5)) + for i in range(len(fc_layers_size)): + net = fluid.layers.fc( + input = net, + size = fc_layers_size[i], + name = 'fc_' + str(i+1), + act = fc_layers_act[i], + param_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])), + bias_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i]))) ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr') accessors = [ - { "class": "AbacusSparseUpdateAccessor", "input": "sparses", "table_id": 0, "need_gradient": False}, + { "class": "AbacusSparseJoinAccessor", "input": "sparses", "table_id": 0, "need_gradient": False}, { "class": "DenseInputAccessor", "input": "vars", "table_id": 1, "need_gradient": True, "async_pull": True}, { "class": "DenseInputAccessor", "input": "sums", "table_id": 2, "need_gradient": True, "async_pull": True}, { "class": "LabelInputAccessor", "input": "labels"} diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program index dd455fb1e9d6d8d46f8782b7813b82a3e7c2badc..e828a50d5a918890d2751715cefff2f7b5c75cd8 100644 GIT binary patch literal 74710 zcmeHQU2GfKbrx;QlCCY8rtP)Ob-nCtv!WD-BB|dsqWma9k^+qr#1GlUVlf?&m-58J zAu}_Sr5K5~K!GmMhZY42G(k~do0k?v5+E&**Yv3`L4gGAV_%D+ukB+1_uPB#aPItx zDlAd7ef(fhGiUC(zu!II{W*vI>%Ux>ohm*L{}swrw$R$|nzq{?1Yh{-x4!u0(i^ig z8E`D$cKA2(C@Bv7Dn%#*@eRp7aJV*E) zHrEQgTF~vAjnYj><{G;a*IOt~-I^-@4x8_^%+1=aSx1$ZL3Wm10NH27sq!+r0d2b=up*3rwIqZ~7dmA9T*d3R*RHsrens1e3y zWp^(=KMz%l4I;Z|qCxP@hG@ye8!YOoENbHzwTwkQvX<|$qPfRi?pXnEoBNh$HnS7{ z_-z;nkTs|0`z8!PEo%kn`B-DBZjb7xZ>o+cy4yU|${d>!qqk5L{SAx0o>BDM^0QU> zCc=*Ag<%KY%xr@yyP1P5r6`q;--cA)Wp{mq9qxHO&usPlz^r4h!jYT_56}l=@Ni^h z;~162%Yvl460sW55XSluAo(16u+oEAaHJlCU5%I8{vJ=37kg(j(K|tptW#`G1!PAx zCl*O|)~q7wgEOaNiK0p96c~%cIInM`=-!fjRdye&g$UtfABxrRaSAWHz1B<;Bw-G4tX=L%z{wkbQ1%seBZ*>|tITqN z#c>ky3FANRH%(6UFWkEw`R9O(&*96#NN#Zk~N=qR-N@VsA zQ38FYJ+jfUT2${CktoV4t89cU8oA0SMA@8%`4oF24#Y4I;RpKUt14Prp!hashN{=n z#>#q)U2-+5TMdZ{Fq_3J{ZZZHh{2yz*EsmGTOs(4=vJSW=&Ou4EKsh!4C5_no;5U# zl+;Pk7=`+yQbX0)`BhEitHwSnEiOz~vql6I=!vObJt{Fd!5xv9n*EO|hOmRsbdfDy zN;Ed$H|9(6fe}d=|2D_&vbi0>O&rX_n{OR$!TK&+8gD*d_UgV;%Me zLGj%3g$G~y+P94>>{739dOe2=JHI$>d@{o>!xK>C_<_}WT%6wdyF#V%72DwzW93n= z%U2F;eqi_9y+@Y2x6-%TZHKS)4?i>f6+G&>D~`Pj55>=BZF9x90}cYM$JQSA!*4py zz;3T}Jgdu}^t{Kl{-JTN;@B>)G&Z;D_bRqqvHE@Pw#{CjRp6f$1fIYB_=)B1`L#VB zR1NtAzIxDe`O*^G`PHdr73z+A<|FR(xu@lJZ8amW9uEedTk*NmsdX*eHTzy~4~klA z4Lpy#fw}AST92zswP6vf%c5NOD$C1QLMYCUPro?+itB1+SzX&)lcgE<9IfV`8ock@ zPR}*&SJbcXLoN@(?(v9|liH@t?`ONc_Q2tvlPQX% ztNx*1gATI4#Efflh`KOt{8q2eVK5g~jCnSrMzWwzMv={^CMp&_b@La$-ZK7XjsZ-( zmxc*8FVI6Q1>w5HRd)UxgP;^quX_1>ci^P4;yPP6FkNV?R?jipcGm~Z1>+{W+Ua>M zZg%>sO%uQd76gkIzd2=q_+eT+k?+j1{g6N@7eB`iN`H94InGOLPS%VV`v5*uQy+a^z!5$>NIf|$Uf7m0zhvx+ z04NQ3bZ8Rm4Jl}9l&^4ycVR+(VnZy)#gKQ!g^1u)dLgsn3r9v-9~rn8Vi}x!q1Daqndx1csF<7EW(`|iA zN69{HmRYU9-nRlU*1^0{c5A#GRUszIbT7*OwwTg84%leimD4(bE3%TwwDcIeAUdG5 zz5a79389TNW;V+gJnjsH#c2Cxzl(Vql*lMH%)l>)kEWGmtZre-Uu zGYCXuX>Dz|wnj9TXsouTmNs@yg2pP~(V;0pW7WnB(b(5#Ri9`qU%|Z}xq^^1+K9%A zov--rfi$LvI}@+uhD9EYRRF2eSbRh@_SM|c2JkoA#c(|9&_o+ks`DddWTPJkbSf{ZH)goI_-f+@! zPu;I48cQ^mXe`m#@zGcXJUTQbXsmLMKs1(UEYVnT6K|DvV`(>5J8P!^GHuQgjU^gO zG&Xe#nhq6bUCN!%JF_~CMWE9T6{9~dZ8uhV`5+ogG?r*A(b)0PSOq*0jYZ#hDQKd* zu{bVY3s)g@sF)5F=dbj?u3k{`c4L)OGo8jN-(_?+_LcOcM2Cv$Q1R>H`bnp;3e_T8 z5q)}VwxaR{Ni>#dEYVn^vE!q$3V3vAO6;vCAUhjtrtHf=w1N z-p(+B^_hp;`4bAaGdW(}C)H1}lNWBMDWgyfomEQLl@L8tMS_Ar6c}#@eSg+Et*Xm| zdld-M1HRX{7kzuL+q2t?_bMX3)%GHMxdA!q9v3JJoRM4{6@ zJ^bGkI%0A_hE5Nk+38`zQS^0u<`EP1v%>#Amq5!#3Ll&088ug(L?E4C5*khp8q#6m z6Fw|V7&jpp_h(65YYLouzclAqyWBCs*e+F&jo$WUs9lqLUeAL7uYO?GYeBbf*5msa z{$TfoA}(GVMlX9F{wt&-&Hf`X(#e6xpNu__S7UPngDj^yrDT^FW7yw*x(IQLArRvS zUE3W%l*U2up$Nq|$YH8m8NGw0~cZ?_Ar98c#`C)Da}>eg}}z!5wnHZ`cS5!K*RmRQbikGo>_Ox~r6}?ujleX4zd4^fe!EMj}$9 zd=XpcBn+{CBBYrdr1|MtAPvSdOpW(!q)}o7HGZvsXaHb|{oM(5@A0>zpr%o}g2vCR z1X_Uh@`Wz9+_XLNb3%#9L5Y7JKT6a$*U@+o-9sWnaM=ENV2u^?S zeOTHK0uTO%Uuw>Vci$KU@D~0SHp(A{#zG=^z4Fi+oA{+Z@g!-Z2<;H5UKkFzcpi3R zv4T$K$Tbh(Fqg)gia3v8y>H@%+2A%OBn=Zt>oI%2?lSb1Og>bJi5B@K9!9r^V9F4E9-`K?`U6OK%DBX){YMsPy>w6zf3!u1 zfj85}Gj^NJLT8YAN^!L#Y@=8MHsB$wC61tFz#rOkr8hz= zt#nz~Z*X2M!)5~$Zci1Jxjm>?_D^BmoW4fSlC#Kn;<7*RypLBeUj<1EA53i?> zkb?$sscKWMLbz;1D@tadZ{4gz^SR}|?NTACM69m+B znSm}dbRRqI7re~y7mTG`e>B;f8naFzR~D#+J0l)<1}8?We?upECub9ai1i5*>mTIo zQ?JAQEm-G|-}x$-z}3+-F9(34tf{fMJ}1|LXgI=Z_|5ou_z&sGd9g8 z!uHhEN1r$1%sj-lBr}h}Sr*a}GdJXsVOZ3DKrW!j1r)h}dU@}-WioR_z$7!@5Ifrm zW{#4X%v_3(m6=P8(u9n36EpWGXorQEIWhD63&*rMUNAD4xsI`HxI3N`J1uUnDxoZ9 zE<>#$Jda@JQc}SxG4nom+on3yjTh%$qd30A%=vbsRW{g6oUq0PsZj z%tLHTGV>UmWg#6gb7JPFaHFTOj&2uHL|0zz!C3(Og8~rzZ0xtaS2yZ%bGy^h?-@Pf zP9J>Mr7~RYd)$Pq+WVGpC<8gBMa?r=)XY2$m^d+*v1g75m}KVb8O$6dGnu&*A1gDL z8l?#t=_Y1Q%zSlSzbd(FTi_Q9?@KD56pPk@?Ss=SID>0~Zv#)dqxjYML3Vvh2iZ7? zf}@K#+^TX&tQEiSMcLmLhl)E6l#aXdU@;-^v^lYG zs)VwbxfDMZGnbMIR*9JtGq)XHF;*V+x_srp<_BQl?VVsur{rPHH3V;EcpC8=5a2wh zAzTNMd3h%0}vNn2Hen%%w(YLPolYnG-W7W=_WIOU%r5jE$K2YLc0+WiWFkl*P=Y__3I| zlvJ=v%$%4xF>~58_xARem~mRnJjAvnGmpVpoSEY*V&=rmiJ22KcRL&$k+ygh4BwYE zVoN47*I_PV=BpXZ93?ZExfCBOGnX2r2^r}oW=_nUm^m@?mzbIB7#lJ3W|En&W-xOl zl*P=Y__3I|lvJ=v%$%4xF>_+(r_0PkY)dlp7@WnKIldxhPRyK`IWhBBmYM4?7cui@ z1~W&=OlB^{$I8s5MrlGux`~++Gbd(F%={&0<~qhk%)F6g=FJRdu7t9fxfDMZGnbMI zR*9JtGbd(F%=~njd5CREW*&pHI5Wpr#LS7A6Ei1f{>n0Q9p)lt-pF9)D4EI3rTAEx zxzs34$VfLab7JPi%!!%5EX*9@@@Ctkh_UR3h>un@_qfYFE8uMr3DRupAPwZX^wuy; z+Kr@W7E5mkQhzL#E+rMjhTo8%_6qzXZ*X+yoV>xAClEpH&o8sPY;MO2T8|#u-{YmL zyY5DPZL78`o|ry7VoPiR-+y;tdHms^Te=w+lFv_WHGuqQFo?IJ2emg~kWhtz%7%86>y7xx zE8#-<45-y+jN+~6QHL&Jlu(84ZC{k2n84v3wCIXECGpV@q^CEZybGk#`D+a_TBqR~ zX|m+?Q#I5DG;s5y#J>1UlDCAYp?$QkuR4XTT?N{7*Y)q|*q1%`Z=sx>7Ccx^TI(4& zxxUs3Qq)X39^XNyy8w0u(TO&CC*zhjWd?aM?fOL#mY4Yv7KmK1W} z7u$SQszM@By!p7B3E#G;*|OatSG2ZdSYmIPF7kXrA=A+`?U)s;TE7gbWc*JBIz~iZ zsN#H0lL7lhB5UIe=bIsEp4^cHIChq3HY?zXa=(SsP=t}x%p{UXBsS7dW-S&`!c$pb Y!e=CRDgg_R=0dlL{pSduw!>xk3vrxKBme*a diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml index e13df085..c8093f88 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml +++ b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml @@ -1,6 +1,6 @@ aa_Attention: Do Not Modify This File Manually, Unless You Really Know It input_accessor: -- class: AbacusSparseUpdateAccessor +- class: AbacusSparseJoinAccessor input: - name: cvm_input slot_dim: 11 diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program index 3f0c50e77b41889f790d9c8001e43b7b275fc152..a747784f491e64bf91931900f55e70a88e2126b2 100644 GIT binary patch literal 29822 zcmeHQ&u<$=6povuamV>#%%N$h(6SMV6NPmg$4Of13Kc055)=;HP&FE_$Ig=VuGt@n z1A_DzaR70JJ4oE1clZZDNJxldubhxT0&xp(XX0eNahz>QT-Ur@;@#Pqc{4lnee=!k zzDNH4Es-2bAGH1wGgrx&?m1eSE@en^Z00fkqTzXA=CKV2!(hX8 zk+du-m<00(NrHI?!|8+I2FTPxG&LbpWwFcAVvjTCBN_K+p=2mlW-N5d_?)n3MP!@+ zd#;eNz@9`JZj$~S6x(0!{SPQy_jEA3`?O6hc~!Y zp>~5P@W(vQaZ6h}%&ECrjeA*DoPm6IZHv#(lRIBrsIi7+cm^|JY&vb`-jLRcZlhF~ z>Uz+PHHTFUZh57*9iM}BUtOqm9lLU^uquqx9nL&X8!YH=k2}K1PWl?dtSp`eK zzfem}w#yy5$;}4*D>Ijedb6;+e!9U63&9Q7sAp3-3bH$fw@E|okX;Qe_^D?t2pOM0 zG@doYuq_+R4UgTDI1w}fI(m~ecqzcSMWy2H!f(TB_+Bs%HE|5R;L;#NYxMkZ@hx(5 zmKADFp(yV~RTskrW+P>~XKXVM06-}z@(yb4t<46P1$INbj5-AvalDe{*J;&+mUD~3 zSr=}Q*@BgKZ6jQ1aar7d!>V$JTRIn21L0BI_n?EIT7evz7(}jC)w34&78O7Yu&p%b z8nwD@ROS{H3r2cr4sy;xZM0QeYa%!{%+9(SYV8dLeN6W~R^6 zi5G|9Pfd|gF|3PddYKp(q#C2*+_{UN7JnSNxAAz2ySBhb#V%T8rA~ieIYzqvf<% z$%bZLNiO(9XSBPp!er)a$9g<#+Ine8TLNJPn-{#4*Ort8`P+TRxHlhc4xk1&RRg36 z@m)^qTV}C54_hL`8&n8zBZEp@=|28jR=IC?bwGd2rjO^@7YfJ^DQ_#O^90 z#G7C-FdIb#V)kHQHi`%o5hx=1Rz!&RgJTd9iU=g6!5}0Q5hx;}ue;{gpC%C@J{pOE z*(f3qvj+pSQAD7KKoQY95plopM)%j5zHNJ*Nqln`gOE@}AR!F~A)$ys5z$v7q7!1n zw`;9$)eaKPJ@wPJk0Qi3)G@7!Pxj`k+ll2xlQN7zt$r zGSXl$63Pga5q&QsroxcjJV-=zgmB^%1GP~{AZiZ=YNL)o9f3NccRFI^uk?wh5#4lz za7h@0kx)kV#PCTSA$-inKyB0!h}wgJ+NdK?N1%@AosRhV z%U55HI%x{)h~dcd2;qY~1|y-4Kt>u2MnWBdI-;+1L?_4&>4^Q==E1P~56L_)oxPqG z)#A)p55J%z?nn0gnqS9Q509~cz2I06pZ7j8!pflw;0p5?wZI#xa9SD)5P4?1r?7?Hj}-`4B*j1 zf|&dp`}6M*#Jfmb6xfMkA_~81z}y2#N|^enq#E!zHt1A{v_ZgbwqlVr37$B%hsOkw#`+}d=ERaa<$ zpbAtznF=`u)~ZHOgFUbxU`^~jwVH~jRI{aRowBC6?56TfE$77U51^NGm$E9Z)*w+8 ztits^UfA1dRaqobSnDmcR`%4Y{!?&nk2dO zR&r`tuii?o+8|LCZG5&Ib!e?BOSr{b7UZv2lh@@r?&os{NICalA00zYoZ&;CT9?&O diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program index 6860dda25a06b7aadfdb4ceb97572ec21e9f19b3..6f6b0a9c760d96f87b7dc6ec9c543ceec60f1db0 100644 GIT binary patch literal 33168 zcmeI5-Hzi(6~~k5?&(g=M`yCT>Y0Ebcm1G6r1QPQCNFX63#0`iGF1g(Y;06S*uopZ7(r~IyB`Mo+b`brMojHA#w#!wg z%2hv~zw5Y)J^%TQl|uQ&=&xA)3M)Br*ogZB%dD)eELY!Pt1C-digK}BxL>HQGuCsg zommM#k_oq1**fAr5AB$Dty4R++Lil|@0+Zg%=h`*V|% z0LE5C7h8Z{207qaGj$dQ$P01L^y=ib4OR#H+3wVShP`K3Hp4gni>Jc1!-_+2_G9F5lQ( zeEf}Xe$QBCw}OEc1|ApB&GM4*`(<_;jzB?P6x+^8dFjy~i?!N&uE%Re^El}9=9$aS z+`vCNw*8~#!0vWE-W;62GyWAH2Y%CY583DNBRA%dz&Wvxcr?1E=MCL%vlrTZ z{xk?r8iR9VtLC{rubKM?t*x5t*X+T7`&}y-up0c?aU4dSlc#og6g7@`TsPzqxax7> z^NkJm=x>*5DzqIBtz+&Dc&J&svpvPukjKN&uSMMJHTt&eTZ1q-f~q#0VHk2hwhq0( zIjL_n##O9uih6C;HaD?`P@O+qes!Xz?`w70-r3)gwHfyut>vE?d=R-_;2RHX>eo-e z%Ck}T_%Lu2d6_aN*Y9ztvBPCf0~;E_68Ek1%s52%0|uG#(Bx3?=*X~db$QPoda?1a z)quV*Iw8lKOe{*aMkPBT2j6eh#*tQ`ci)vp91dNtYuV73BKs6A8QTRI=>@e8-YfAB znp=kgzV+z^m&+5AU2udy*YwTVx2pw+q2HR~5U;~UU z-~4XDfaK@dy89bBp$&Xv(c(;*5vQ#2ks9y#Ixr$g;ANY zVZ^QeiP!agInFDrBwI#|eSlA)@UyQA9O1FE%#ri*Vn?R=n6Xs>C=EP1nuMd4L6ccs zZ& z%x#2^^3x7oJ4*Va&(`fTH(GyKgD0Q`K64lg8eM@Wns(iG4|GZn;Ej&$#O|pbgR>6q zjq+QQ^{9(vqE1^A^E={^-t)jmXxvjlV>BB}AtK`t5t6YaV=rCC zD)3BXEQTCs&_uJbuTgLs$yk!H6nB@7yUUZYh|t7rEFK~mOEOl(PycUG(>bv>Ce6l< zf_o{xPX+DgGuk}3pkQNN#ww~!d`0}oP4gAiCp08uNyd_lrJ!Q*Wl#DGhs&9bRp8Ol zl$woI9~;wbEX~GV%-3ysei{W8Q+(Y#$Jgb_SOrKus2C5Cj3pUMGBy(hO+m%eKFXcb zZ)SBFtEe{d6>*6RO~$HqEl9?aj3pUMGWP0ZtOAdYrj(3T>)w!zB^gUHR($lbO+m#J zRIGh6Hd+`g6;n|01+6tov$1N0Dq7-c`f8zbIx&$q8;b}{1Qp{UlCdOXNyd_leN|+v zqT0k)#EmU9Us0_+PcoKdEXi1su~#Q!6?k+srDkI>1<6>Fu_R+<%qz{tzMgwhOQXv zJb8KB?X;zHGhVDxE?o(G@Tr}kU?U3LZwJ@@Z1kME&*QBcY|;a3uXolX_oyGZ-Sw@S z*x#zN4p*+jr((3Z&i}c&&Y#(6uR*o-ZcC$(5UgnmUAA8RH*Fm;KaiodoUUyxC*mls yb$snRChF?+U!@c+%?uyg)n)apx`>T*J|-H@4-IJr;tO7Zm>4%7jH}#ODf}0pSzf^a delta 1532 zcmbQx%+&m!k%jB`KgNwL-#I6LEEUSHc zb%dGnQYZ73Ny?ZS8tNz{CYRW%7p3Nus_Q7^CFZ8us;4E#8>&yfX!vXL1AZ^Y<&(7q zCNnJ6O#eA1M@Vf(^N|U% zkBm%Barnpx*+-w`u=iow`_kM6^*bAN?^x z_Yo;ZKz#Jf9NkA|2p>sF8=2#XA`@f}HP~SF5V0n~{Bzh5-9IFl0P)URH+1iq6XhLK zWbe3pWAzTPCc(VZ6@cy?5=?-2rz8~JI~GKF#|+s!d{J1vL##V!Z diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program index f2061d99b8103af8aa0a4b748e12b6e14c4a4f71..7b5a31c35d856102ee088e2d8f845a7a75714483 100644 GIT binary patch literal 55307 zcmeHQO>7&-6&5YalE#+BFm7x!Zb~fBh*A|&q$Jx3%KjKdiUNgG^pqe77AtZoZME_K^18_S$RF+>tTM5=Gn32SePQ zoq6;3&HHBFyvKg>$N8nX;^W|7q5KuL+HLENn%^7fTi^Kh_rLkA(v79X@-5$62HV&Am?UnFUWTV@?B;NJl{f?S#x~JJni*DU3ASD1yUTq28GLc%6*wbE6Pkg+A+pz|=sK}fZL z#;5|YqGGbB73Fzg`-px?wGWowNRV6#FSr>vpO6_ ziAtB(6+n?=J9ElBo|mm7C*r8qd6BKyeb3X|zKgRZ8bJj)@vsvmg5~uj)RRv<+2JE6 z-ixAN#cV0zNDn4T*sXH@juKoQqZbXla z0vY%i5b$a+?_{rM&Jvy!7tNi#HCM@LRL3L=9VZ_`xL7oB5=Rg&R*v#5c7<0URNSJK zoR=iuh6{kYW*4+yF0-qw@MbBRR&0e8-YW6cAJ?hp*!&wqzriDKpX-q6-o+2#H~ zcl)-f9~y3PLA%DT9_h}o2eZM}J66ww0aMVfv*)^fw{7a(!FE&c4}H+gikH4O$CDqY zldrHvnJXV|+VkuR{B(?->9zXJ-!SO5MU;aVgsoT~bJmh~iYj^6N<(rV8{lM5az2KX! zJ+wOYt_uP@?7I(YgJW&8Vq1<`Y23NHwOO&8iZK|NPDk$#SOxwWzVCXi2M-N*->dDL zepM4s;H&$6$6Q}$_x^sSqT0r>>FNijJuqEG+MVqTSzXf~x=zJ2?QX4SSdKn$`}MT=|kQKY?A zX}k||eGtku9CQrdR5IAPBfhtst_js_n<5*_v3$$0tsk0hOZ0k8c~ZOIcYB8YohVHe znx?f?gCXF-I5b<0+E!%)|MuRt-dX28tp{VW?V2D#(7c`^%B!ymQTB$G-O&vgMxJpP z$Rni#GSlr>TG&ne2MS$<0=_%GfX3Y&Q3CH@QBP46jE#7aJh#^;$qk$CtG#|_Xq#UX zIr6Nl-mzDMv2nQ0v?60e&(wPl?2hAzsa|5sqLujcFBZN$H}}EUA=M&2J4!t{E?#Vj zG=F1km4lOlJt{i!6+DGbjq<8#n>`r54=s=arapkk%CQb(z|aASdWH`u!jWa=DD^Nc zdJ5$pFA@0U;pEi8=Xc>GKMUB;sZL`X@kgTKo@ID(^nY*7II_I8*DA0G>aaq&J_n>J z;C>P)ckik|IRJIlX#3Wo;lt7hh$2^scs;5-8L!i3Bz}vp>RlUFGSd+&J5-1871e<0 z<7=i)l+c?PyU53&(7^t^oJ7$^8aA8di>_%8HOR;Ebl>z)Nn2&h9Lw9o0Lmvo$bS$* z=4z5TkaBkF*+=Jskh`eXNMwmXN9YTvBEv&Kkw2Whcu7#ajcU8p9;FT(hr&o;frimf z!o86G%qH&rDus2;NMpoZUqxe-TT{emiO&+BB|bY>CP5}zeLOMG^!d{$zQicSeWE3J#@eD>Kf4k12Ed=_l7o-O;AN%XqB^Vz@x zGo{oKpCvxaO&C9);bP8lbA!%j$?i{>8lI-z-z2!F@>#?><$PAyRjXTh@{$PgS>m(A zXNk{FmCs7-iTEtug;W$XOhMv&3gp z2GV42oO4m`9PY7wUOtOhr|?m(AXNk{FmCs7-iTEsf$4Q|R@mb=t z#An6b&TX5;DOH2iaxM6V; zht8U}{+GO(%nr{}4 zhqHr+bSw6hZ^aVI%?!%@B8hBGt{J{nQv6U?kdx+noQ~f)^M-pT@(KEU=v*&|%kj97 zc8B~Y;-#|#kRMOo&s^iqZS=gGa)T}yCOX3QTJNoab8T=H`gYH9hTunZ=)c3Ag+9#X zt9P5ir>F1@-yJ&O&$Ew?efg6lTTPW+6_HU|5x)g**4&+YOZ*Q7zZ(>se`l+)g-(Fu z9)v}$xC}B!E{pIit;a;IZpfNH(c5JISz<47$He)FQyPa!jq)Yzozqap{!CaiJ6QAk z^S~P1#Y)ZhY^;&V2yBd=>WqCxZOnc%4s05wRaAh&h#}>RJ=1W~I>{%56SIR8f0;f` zY~9(tjo|=bcnECpj|XN;6QhN{fZ!aUlW1>36osSrVT)Ttvv?2wlB+hndt>OsTlmfO zl)XS*NbRfN#fTHJ#Xr;$l6nd+A0jGnrQI_You!FO_QMV+0?bkg5sL^C3bJigP&SU2Oh z=?mH?6QUGiOKNlLMhKM2Fl9^Z(pv&T%h%M9H?X~hXnB&jH?+rf(x72e$1U39!a$4! zTc-kBKV~dxW6@*~+%{DBl5ms(R2T)Zd_|r+hDzK?^(8udoFRRQ4j<|85uzwQh5mO7 z+Gi{q@(|b(CTSS}C}X?65NK+pSftjt-UbTC$@R9E6H!_33Bz45n7bD%StE0{8A4Dp zXCrgAD6;UgdFQI2spf3|TF%B@$&tgTu-SJMZibY36{1$lG`QKAK;j@Oc6kXxS|j>q zBPb+91P#78gK*b{>~Pm~!C{7=nl3od1&33KZT%r*sk`@0_M*bCQ`nUSYm#9v^Wd$g zMz1M?>kJ_d(d$#D*A%&Wb|Y68w4Yzj0YOQ*SGeu(l2GgTHLxs9l%)H5NV->o_GUe@^Qkq^TM|xV?*x@D0%>INs1mrv@EbAimnMe zVsK%7pPWRIlPGc$^+Y>~%B1KBf=P;wAv3ZtF_WT;HzKo&h2*^o_o)dB{j?gku7DX2k+7O|m1M^IZE>cQlC5k>UosQlgi2HAN zr|ubYGEEe{ZP;e3QQN9)R6PE-_qO%U`Z`>nBsYO4bQ2gqMGxRDNzr47mIZc1(TSpS zV*bh?xb;JEc45T=52x0l!9C1ST%so_E>R{$M-WU>^zB;u6dhA$Qgo4gsuW$6C=|)4 zY@+By(J3e<=#O;DbY7=SlU)2BL(x^3WuZlTv*z~(x)Mchy9WF@Vdo?hWl?mId?Jc2 zQc7ecicS>WvSFjGe$elk^&`tX0H^dx-;y7ml|8|SUnFM z3^llWA;P!w!ndEylM4?DVWfMgwicS=rR9GXFBIT~{2DY}Yu5k+rgP;^Y0Nzp~}sZw-NqEIBGvWcP-MJI|*6#WUJ=%1zUqN^|) zgp5Ay!3qSSOjLm8hv=$<&LB+|L2qzAKM_F}DJ8B3-_T8Th#e_I$B>yjouQMNyWKgN LnS1HN(%k<6n@Oq3 delta 2407 zcmb`}O=uHA6u|M$YziinYP49Qu_QK-f&sgm4-2J`iv{aJgrf1%vMs47aU)_b6|@_Q zSPTe#B2`dCdI^YBo)ksvB~Sz_ilts8kc*->4-vIJXufP_lPS!Iw`G?7!!K_#yVn<# z-&sX*{K!65Mjdp@4hL5~FzuKnX*{0D5O}yYM@~cZg^Ox^pnup!jX0PmUGA`|20Z%s z#R30?cp~Kwcntkqe84|3KC1dbUs;ER*bvFiZU^!S^=BD^9_J+@T|#}uO~5_UPTs;` zN0K}*wes}eGPaierX4alFATeyc|-%}<&M8PEgarLr!hKsw~xaUFR{Aa-Bz7srHcEC zUhwra<7lwAKD#}8wL@Frf-LTSfP)h+vAEBw6(?D#;s)ovP>k?!st_*3;?DPTaiS#_ zw-{~2NmZ26wuA?VpEUpBf|`&k%(|jOT$gBxbzMEy=#r{na=1uoFjO|+scUJ1>oHX7 z3w;C%V?OK}Nm&-LFcws3M1>{e07fUGw)%C9U~6KLk_eb*1Ng;=wbt*QAec*qm zlaseBX(+t+p?PJaaj_SFO&?_z(}Zr=AwP4+dfW()qx2(o5n6P>x0z08yRpnU0p(io z65D9rUg_Pv`SU;QlrnJYb|X+?-&TQh_wqcPRHuw1dF+|rK{znc6x3chWmtE%5hi;2 zZBDNL5f>)asli?&7bcowVardgFsXM$bz;d}CwyPxY)Q#h4QJ#yIPnyVJM+qlllb1) h9Gm%uH#YGPmrAHY`-slfcFu+rZ`Eu^Y&+?;mOn({Rha+) diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program index 5a47d80a18c8a96074fab78e553dc6a1e212230f..2da50a2803f53c50979049e4d2d0f92490ec615f 100644 GIT binary patch literal 19151 zcmeHPzi-<{6t-e3R!;swxI6 zdJrXhZhYvh+X3U$u5UN%>o(&yjuRSF?&By%Hhf5$CZe@>K$4DdJ2b62>J0cjjyPRi zMGwBY(qtKDDI)>rYi#)@?z9He~wCm1Y`{Lz>`S8pQBthX|S*&Bo^T`3bMB$qVk{ zG{SloR1a9Xi*x z{!Hc#{bC^o7)jC=&qCY|py!lhI4j@+@gA&Xu@6%-7j{dpC&t?}q1>Y?I8XwPvJ{30 z0xRrr)ie@r+pBV#uG@eakX>sR-B>HsoOv{Rg8SCg@)Xd(t#*`n z6nk-I-TZjUSwO{o5@ggWyfXzKr;LhfzFOsOK^~|H3a&-x%W@yb0LfMsGR~m65GOE% zQFxYQ{<>vI=6puXfmv>9Hk%D~t+64Q5i5&vkjxlpauAXmdd;6evgDGMqHdh06@5l6 z*&e$LaLPjx2w9RC6o$I;dZ2J0t|BxeDD@RQ&!Op%alqo=zkl+xBacW|AKeEeKSkyv zz~*Q3ON+YXm&!$ng34ltWI`~)&jsW^P1S#aL>UgqjixTF4Iqyy^~M3nrs{t^`K?Rn zF;@LgPeshxvR^aXX-%!l+1%o5>8d|>#(fNBCJ8=xDa)%tak@`SGeCOFcprM(N63J16df~|Vq1?|& zQX{;YlfFPkB)qOjFr1|CIgWn&aU7iV{}%}cv`_9!U(35Umj&8=QsQVpN~oQV5n$U$ ziHmkS^0mv${SE`$*AcdjlrU1lNQr;Fr*K)Ul-qh3loDz`d<0H1Qo?Z3WNd01DdD^M z8p_=NIT0q8v=Mw+pnai~P!B(i0NX}N7}%Z+*fvtaNC_h)#wsPAHXdEPEAeCBu7r93 zcmz%|Qo?Z3WN?y^5=KhIXCHyBZ~uoBWC!99rYY!vfw$Ty> zwBC&PkmxSy=yoEDH$3%5 zInL2az%~=2d delta 1210 zcmX>L>GZ|GGPi(#*Y0S9!5R*AGWBTNef|5+eMj#`Y@=_-ovPnrBnd>MdCYRW%7p3Nu zs_Q7^CFZ8us)MwvZw@qY4pj9?+)X)%zPYn@1)#5?63N??S`q>btpGmY8=x6n3 W3ere^2J%q+d_$(I1Q diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program index 9904b7d2a5a61ecda34d2c8e10342483ff961ba3..e0ce42330ed277e4190c41f6cffc4ac5251b311b 100644 GIT binary patch literal 23822 zcmeHPPj4GV6t|nCcBW0^rj?qS9#)pXPGr_z+o6AiLF%Cr5-J=)2(32jv9rql)9yNn zL8v`d;sbDjegHTi?i}ID6~R|P;yaW#Gi!Ui&YDz>9aEl9_RQOv_h#Pw_We9Fk39Q1 zo6i(p3_saok>tCkzNa4R^{uPbK;31s%Ict|Ob9&H zQX|KQx?50pgKR?Gu-z9~Xd<(niJNbo+Pr9_$s5sTHFBHlp>aK~Kwmks#`;jgO zvHmj#@-0w53TDX(GjEd|lS@u#7RKt9sG6KjRw1>WGMU3694nh5C}v%DkC`(o^_(I% z-E)ed==lxqDot+x26)3d$uVyLcN}@Gn7^6&%kx*CIe)@68%ft>qbeHFX1eAI_c&QH z&0U1#9d2HOn|BS&BP7pP0$7;nF->jJjyyp#Jy|S&;`w^u*;GF=g2GzqCb@Q^yM0GD zJzMWsP6%U`E!`s5yIx>Yz3Vp{y4R0jSr;yUoMGaFY4Ht`i(6$QUb;c9!cW(5XlQyq zEnM2o)=JNFywo&oJB$qTu(0;<`>diovTUl9Y6qS}YbO>xu{^hTV7R@SZ*)2~t@(qu zN8h3Y&#l?kJ`}RA4fSpaV4@ySHf(4EuwzOa+FfU01;cLhwV{*3`PPE53)HS)cE)nbwCsX$BND7!GeJ zsdE25S7pC%*&W>gHwulTp+2#8@FwhfN?S}5`yrIBKnEZGdk5OR7Vm-0FSnC-1->!a zC2Q@`Sy~fA_pal0`Zj%!x5%2Vgo988-#FSKr2--Skm}B%-Em#M)VIhwA0@W@3)wd_ znWyiAR8EwgOqC1@t8FgxD{%;J(KeXaP$fKxAW}gAW)w zAdzE4fFcZ8HcqArr&XUqxfgi^pER6YIQaZ9TIBP9{S7J&OnpxnDB1En9paxf$7$cW(82#6BDB_{2tFfnFRb*z4yz16!m zyveA`-|S!x;;RS)7LTu|#zcv@iI7$1gFFKJYdwjg+B9r7imL&&`z2_{3iXJF5r1}* ztTQY(`$H%{1490e2$>p5ra;R4!n5ZWf{=S6tcj6j3_3hs5Fs)Y0*d@JfAx}}_>KtM zl)*TQP6QIVibQzA)iG)i+mRO>|*(Bj6E_s zCHSn6KtB6F`ZEprEUk&ZTzR0J4Be2=zK;JFz6zg>F;eEUq6qoy>$%;4d^QP+$Y;fd z$bW91^U9F zN!d)n{n-%rDVBVnB5i**#z>jZiX!B*$Y&?|<+CyN$mo>dvqA#-Eb>|8v-~V<6ZdCve>Pi^&l|@W zDf3xTgnSnHEb`f@1JZcjIOU?;1>9qMSw0&B+c;trm-Qt+E3S{@d1E|pjOUH&<3JK)1$Y+tyBA*pmE|AYo^4S<8Wj-s4kk2BYMLvssma-ky<;=@+7C!q- zx<4BO8@bBzL5-4?6+}9yQIfL@XR}{pHmK!EH?XYPpxj|@qv>Q&B2~dCuM$sD0GavX zRArDRMP!2#7t++OS#fDpRkW%CX#v{h(CRs!)hSmMmZYp*h9}E_0GRse6;nU`NQQao z;$(_-35OVQP2$kJdF$Vp)ns{K2JgvU*gaW9QpoZ+|0H^63D)F8#D9$Pc*c0CO(A5X in2X<5=aJ><7sSKm!9%Nd_sxUF8PYw)}^ft24QOL{#qO{ba)V$L?^8m)NQorRJ2X>nP+U=BC=J zrzOW5Zr&5<%E-8KGCSvFrl%a6v8lkNF<8P8Al<;!Q<4Yf@_ zDv(yTF)1}pwiMoi7I>DV1)hy9u4^|L8Cpr&Y>~1!U$Yh2H6fpt-PnQA) diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh b/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh index 1d4c2114..a00f6a9f 100755 --- a/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh +++ b/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh @@ -1,3 +1,51 @@ #!/bin/bash -export LD_LIBRARY_PATH=LD_LIBRARY_PATH:./so -./bin/feed_trainer "$@" +BIN_FILE=feed_trainer +work_dir=`pwd` + +function usage() { + echo -e "\033[41mUSAGE: sh scripts/start_feed_trainer.sh [run_mode]\033[0m" + echo "run_mode=mpi, run job in mpi cluster" + echo "run_mode=mpi_tmp, run 1 node with mpi in /tmp" + echo "run_mode=local, run 1 node in local" + echo "Example: sh scripts/start_feed_trainer.sh local" + exit 0 +} +if [ $# -lt 1 ];then + run_mode="mpi" +else + run_mode="$1" +fi + +export PATH=/usr/local/openmpi/bin:$PATH +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/openmpi/lib/ +if [ "${run_mode}" = "mpi" ];then + mpirun mv package/* . + mpirun mkdir -p log + export HADOOP_HOME="./hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:./bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so + mpirun sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml + export HADOOP_HOME="./hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:/bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so + + GLOG_logtostderr=0 mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir ./bin/feed_trainer --log_dir=log +elif [ "${run_mode}" = "mpi_tmp" ];then + mv package/* . + mkdir temp + export HADOOP_HOME="$work_dir/hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:/bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so + sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml + mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir --mca orte_tmpdir_base ${work_dir}/temp scripts/start_feed_trainer.sh random_log +elif [ "${run_mode}" = "local" ];then + sed -i 's/MPIRuntimeEnvironment/LocalRuntimeEnvironment/g' conf/*.yaml + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so + mkdir log + ./bin/feed_trainer --log_dir=log +elif [ "${run_mode}" = "random_log" ];then + log_dir="log/log.${RANDOM}" + ./bin/feed_trainer --log_dir=log +else + usage +fi diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh b/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh new file mode 100755 index 00000000..df7c6a86 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +export PATH=/bin/:$PATH +set -x + +source conf/env.conf + +echo "# This file is automatically generated. Don't change it." > conf/qsub_f.conf +echo "SERVER=$MPI_SERVER" >> conf/qsub_f.conf +echo "QUEUE=$MPI_QUEUE" >> conf/qsub_f.conf +echo "PRIORITY=$MPI_PRIORITY" >> conf/qsub_f.conf + +export HADOOP_HOME=$HADOOP_HOME + +sh scripts/compake_runable_package.sh + +$HPC_HOME/bin/qsub_f \ + -N $MPI_JOB_NAME \ + --conf conf/qsub_f.conf \ + --hdfs $HADOOP_FS \ + --ugi $HADOOP_UGI \ + --hout $HDFS_ROOT \ + --files package \ + -l nodes=$MPI_NODE_NUM,walltime=$MPI_WALL_TIME,pmem-hard=$MPI_NODE_MEM,pcpu-soft=180,pnetin-soft=1000,pnetout-soft=1000 \ + scripts/start_feed_trainer.sh + +if [ $? -ne 0 ]; then + echo -e "qsub_f failed, please check the config or get help from abacus RD\n" + exit -1 +fi + +exit 0 diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/update.py b/paddle/fluid/train/custom_trainer/feed/scripts/update.py index c89ada5d..cb2cbacb 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/update.py +++ b/paddle/fluid/train/custom_trainer/feed/scripts/update.py @@ -25,11 +25,26 @@ def inference(): cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False) net = cvm_input - net = fluid.layers.fc(net, 511, act='relu', name='fc_1') - net = fluid.layers.fc(net, 255, act='relu', name='fc_2') - net = fluid.layers.fc(net, 127, act='relu', name='fc_3') - net = fluid.layers.fc(net, 127, act='relu', name='fc_4') - net = fluid.layers.fc(net, 127, act='relu', name='fc_5') + lr_x = 1.0 + init_range = 0.2 + fc_layers_size = [511, 255, 127, 127, 127] + fc_layers_act = ["relu"] * len(fc_layers_size) + scales_tmp = [net.shape[1]] + fc_layers_size + scales = [] + for i in range(len(scales_tmp)): + scales.append(init_range / (scales_tmp[i] ** 0.5)) + for i in range(len(fc_layers_size)): + net = fluid.layers.fc( + input = net, + size = fc_layers_size[i], + name = 'fc_' + str(i+1), + act = fc_layers_act[i], + param_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])), + bias_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i]))) ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr') diff --git a/paddle/fluid/train/custom_trainer/feed/trainer_context.h b/paddle/fluid/train/custom_trainer/feed/trainer_context.h index f9c86a45..030df06b 100755 --- a/paddle/fluid/train/custom_trainer/feed/trainer_context.h +++ b/paddle/fluid/train/custom_trainer/feed/trainer_context.h @@ -2,6 +2,7 @@ #include #include #include +#include #include "paddle/fluid/platform/place.h" #include "paddle/fluid/train/custom_trainer/feed/common/yaml_helper.h" #include "paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h" @@ -48,6 +49,7 @@ public: paddle::platform::CPUPlace cpu_place; std::shared_ptr pslib; + std::stringstream monitor_ssm; //记录monitor信息 std::shared_ptr dataset; //训练样本 std::shared_ptr file_system; //文件操作辅助类 std::shared_ptr epoch_accessor; //训练轮次控制 -- GitLab