diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc index f1ca59ecd0fa5582f4d83a11c842473f1538e4bd..b9a40b0a032377b660a859398e51a12a7c808749 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc @@ -1,8 +1,12 @@ +#include +#include "gflags/gflags.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" namespace paddle { namespace custom_trainer { namespace feed { + +DEFINE_string(feed_trainer_debug_dense_name, "", "open dense debug for specif layer_name"); int DenseInputAccessor::initialize(YAML::Node config, std::shared_ptr context_ptr) { @@ -85,7 +89,6 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num, } _pull_mutex.unlock(); } - size_t data_buffer_idx = 0; for (auto& variable : _x_variables) { auto* shape_ptr = &(variable.shape[0]); @@ -97,6 +100,26 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num, memcpy(var_data, _data_buffer + data_buffer_idx, variable.dim * sizeof(float)); data_buffer_idx += variable.dim; } + if (!FLAGS_feed_trainer_debug_dense_name.empty()) { + data_buffer_idx = 0; + std::stringstream ssm; + for (auto& variable : _x_variables) { + if (variable.name != FLAGS_feed_trainer_debug_dense_name) { + data_buffer_idx += variable.dim; + continue; + } + ssm.str(""); + auto& tensor = ScopeHelper::var(scope, variable.name); + const auto* var_data = tensor.data(); + for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) { + if (data_idx > 0) + ssm << ","; + ssm << _data_buffer[data_buffer_idx + data_idx]; + } + data_buffer_idx += variable.dim; + VLOG(2) << "[DEBUG]pull_dense: " << ssm.str(); + } + } if (_need_async_pull) { ++_pull_request_num; } @@ -118,7 +141,25 @@ int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num, } auto* ps_client = _trainer_context->pslib->ps_client(); auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id); - //return push_status.get(); + //push_status.get(); + if (!FLAGS_feed_trainer_debug_dense_name.empty()) { + std::stringstream ssm; + for (auto& variable : _x_variables) { + ssm.str(""); + if (variable.name != FLAGS_feed_trainer_debug_dense_name) { + continue; + } + auto& tensor = scope->Var(variable.gradient_name)-> + Get(); + const auto* var_data = tensor.data(); + for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) { + if (data_idx > 0) + ssm << ","; + ssm << var_data[data_idx]; + } + VLOG(2) << "[DEBUG]push_dense: " << ssm.str(); + } + } return 0; } diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc index dd2d0e95b1c9a9b109ea6346993027b16541f5e4..16bb5195c040d4554a297e4ed1d2a0ef0e3a1a1c 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.cc @@ -15,22 +15,22 @@ namespace feed { return -1; } auto fs = _trainer_context->file_system.get(); - if (config["donefile"]) { - _done_file_path = fs->path_join(_model_root_path, config["donefile"].as()); - } else { - _done_file_path = fs->path_join(_model_root_path, "epoch_donefile.txt"); - } - + _done_file_path = fs->path_join(_model_root_path, config["donefile"].as("epoch_donefile.txt")); if (!fs->exists(_done_file_path)) { VLOG(0) << "missing done file, path:" << _done_file_path; return -1; } - std::string done_text = fs->tail(_done_file_path); _done_status = paddle::string::split_string(done_text, std::string("\t")); _current_epoch_id = get_status(EpochStatusFiled::EpochIdField); _last_checkpoint_epoch_id = get_status(EpochStatusFiled::CheckpointIdField); _last_checkpoint_path = get_status(EpochStatusFiled::CheckpointPathField); + _inference_base_model_key = get_status(EpochStatusFiled::InferenceBaseKeyField); + _inference_model_path = fs->path_join(_model_root_path, config["inference_model_dir"].as("xbox")); + _inference_model_base_done_path = fs->path_join(_inference_model_path, + config["inference_base_done_name"].as("xbox_base_done.txt")); + _inference_model_delta_done_path = fs->path_join(_inference_model_path, + config["inference_delta_done_name"].as("xbox_delta_done.txt")); return 0; } @@ -46,31 +46,64 @@ namespace feed { set_status(EpochStatusFiled::CheckpointIdField, _last_checkpoint_epoch_id); set_status(EpochStatusFiled::CheckpointPathField, _last_checkpoint_path); set_status(EpochStatusFiled::DateField, format_timestamp(epoch_id, "%Y%m%d")); - - // 非主节点不做状态持久化 - if (!_trainer_context->environment->is_master_node(EnvironmentRole::WORKER)) { + set_status(EpochStatusFiled::InferenceBaseKeyField, _inference_base_model_key); + return 0; + } + + int EpochAccessor::update_model_donefile( + uint64_t epoch_id, ModelSaveWay save_way) { + auto* env = _trainer_context->environment.get(); + // 非主节点不做done状态持久化 + if (!env->is_master_node(EnvironmentRole::WORKER)) { return 0; } - auto fs = _trainer_context->file_system.get(); - std::string done_str = paddle::string::join_strings(_done_status, '\t'); + std::string done_str; + std::string donefile; + auto model_path = model_save_path(epoch_id, save_way); + std::string inference_done_format("{\"id\":\"%lu\",\"key\":\"%lu\",\"input\":\"%s/000\",\"record_count\":\"1\",\"file_format\":\"pb\",\"schema_version\":\"2\",\"partition_type\":\"1\",\"job_name\":\"%s\",\"job_id\":\"%s\",\"mpi_size\":\"%d\",\"monitor_data\":\"%s\"}"); + + auto id = time(NULL); + switch (save_way) { + case ModelSaveWay::ModelSaveTrainCheckpoint: + donefile = _done_file_path; + done_str = paddle::string::join_strings(_done_status, '\t'); + break; + case ModelSaveWay::ModelSaveInferenceDelta: + donefile = _inference_model_delta_done_path; + done_str = string::format_string(inference_done_format.c_str(), id, _inference_base_model_key, + model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(), + env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str()); + break; + case ModelSaveWay::ModelSaveInferenceBase: + donefile = _inference_model_base_done_path; + _inference_base_model_key = id; + done_str = string::format_string(inference_done_format.c_str(), id, id, + model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(), + env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str()); + break; + } // 保留末尾1000数据 - std::string tail_done_info = paddle::string::trim_spaces(fs->tail(_done_file_path, 1000)); + std::string tail_done_info; + auto fs = _trainer_context->file_system.get(); + if (fs->exists(donefile)) { + tail_done_info = paddle::string::trim_spaces(fs->tail(donefile, 1000)); + } if (tail_done_info.size() > 0) { tail_done_info = tail_done_info + "\n" + done_str; } else { tail_done_info = done_str; } - VLOG(2) << "Write epoch donefile to " << _done_file_path << ", str:" << done_str; + VLOG(2) << "Write donefile " << donefile << ", str:" << done_str; bool write_success = false; while (true) { - fs->remove(_done_file_path); - auto fp = fs->open_write(_done_file_path, ""); + fs->remove(donefile); + auto fp = fs->open_write(donefile, ""); if (fwrite(tail_done_info.c_str(), tail_done_info.length(), 1, &*fp) == 1) { break; } sleep(10); } - VLOG(2) << "Write epoch donefile success"; + VLOG(2) << "Write donefile " << donefile << "success"; return 0; } @@ -126,7 +159,10 @@ namespace feed { case ModelSaveWay::ModelSaveInferenceBase: return is_last_epoch(epoch_id); case ModelSaveWay::ModelSaveTrainCheckpoint: - return delta_id(epoch_id) % 8 == 0; + if (is_last_epoch(epoch_id)) { + return true; + } + return delta_id(epoch_id) % 24 == 0; } return false; } @@ -137,11 +173,11 @@ namespace feed { std::string date_with_hour = format_timestamp(epoch_id, "%Y%m%d%H"); switch (save_way) { case ModelSaveWay::ModelSaveInferenceDelta: - return _trainer_context->file_system->path_join(_model_root_path, - string::format_string("xbox/%s/delta-%d", date.c_str(), delta)); + return _trainer_context->file_system->path_join(_inference_model_path, + string::format_string("%s/delta-%d", date.c_str(), delta)); case ModelSaveWay::ModelSaveInferenceBase: - return _trainer_context->file_system->path_join(_model_root_path, - string::format_string("xbox/%s/base", date.c_str())); + return _trainer_context->file_system->path_join(_inference_model_path, + string::format_string("%s/base", date.c_str())); case ModelSaveWay::ModelSaveTrainCheckpoint: return _trainer_context->file_system->path_join(_model_root_path, string::format_string("batch_model/%s", date_with_hour.c_str())); diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h index 07b15c62c59228b3c6532550b4554542648f91e5..7d42e73e7c44e0914973b7aaf7d1c6f8521d707a 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h +++ b/paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h @@ -14,7 +14,8 @@ enum class EpochStatusFiled { TimestampField = 1, CheckpointPathField = 2, EpochIdField = 3, - CheckpointIdField = 4 + CheckpointIdField = 4, + InferenceBaseKeyField = 5 }; class EpochAccessor : public Accessor { @@ -62,14 +63,19 @@ public: virtual bool need_save_model(uint64_t epoch_id, ModelSaveWay save_way) = 0; virtual std::string model_save_path(uint64_t epoch_id, ModelSaveWay save_way) = 0; + virtual int update_model_donefile(uint64_t epoch_id, ModelSaveWay save_way); protected: TrainerContext* _trainer_context; std::string _done_file_path; std::string _model_root_path; + std::string _inference_model_path; + std::string _inference_model_base_done_path; + std::string _inference_model_delta_done_path; uint64_t _current_epoch_id = 0; std::string _last_checkpoint_path; uint64_t _last_checkpoint_epoch_id = 0; - std::vector _done_status; //当前完成状态,统一存成string + std::vector _done_status; // 当前完成状态,统一存成string + uint64_t _inference_base_model_key = 0; // 预估模型的base-key }; REGIST_REGISTERER(EpochAccessor); diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h index bc2063c4288b2623bfd477c65afc5ad0be1dac19..b49eb469abe8338728c7fd36eeb86357df281c43 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h +++ b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h @@ -102,7 +102,8 @@ public: paddle::framework::Scope* scope); // SparseGradValue会被依次调用,用于整理push的梯度 virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0; + paddle::ps::ValueAccessor&, SparseInputVariable&, + SampleInstance&, FeatureItem&) = 0; protected: // 输入层列表 diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc index e51fd1b59b52e8d4a39e655bd3a8a7abdc5de5f6..35be38bf315d3745be01473f2637ea57807ef499 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc @@ -1,10 +1,14 @@ #include #include #include +#include +#include "gflags/gflags.h" #include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" +DEFINE_int32(feed_trainer_debug_sparse_slot, 0, "open sparse debug for specif slot"); + namespace paddle { namespace custom_trainer { namespace feed { @@ -99,6 +103,30 @@ int32_t BaseSparseInputAccessor::forward(SampleInstance* samples, } } } + if (FLAGS_feed_trainer_debug_sparse_slot) { + std::stringstream ssm; + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + ssm.str(""); + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) { + continue; + } + if (variable.slot_idx[feature_item.slot()] < 0) { + continue; + } + ssm << "(" << feature_item.sign() << "," << feature_item.slot(); + for (auto weight : feature_item.weights) { + ssm << "," << weight; + } + ssm << ")"; + } + } + VLOG(2) << "[DEBUG][sparse_slot_pull]" << ssm.str(); + } + } // Variable后置处理 for (size_t i = 0; i < _x_variables.size(); ++i) { auto& variable = _x_variables[i]; @@ -145,12 +173,37 @@ int32_t BaseSparseInputAccessor::backward(SampleInstance* samples, const float* grad_data = var_runtime_data[i].gradient_data + samp_idx * variable.total_dim + variable.slot_dim * slot_idx; fill_gradient(&(feature_item.gradients[0]), grad_data, - *value_accessor, variable, samples[samp_idx]); + *value_accessor, variable, samples[samp_idx], feature_item); keys[key_idx] = feature_item.sign(); push_values[key_idx++] = &(feature_item.gradients[0]); } } } + if (FLAGS_feed_trainer_debug_sparse_slot) { + size_t key_idx = 0; + std::stringstream ssm; + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + ssm.str(""); + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) { + continue; + } + if (variable.slot_idx[feature_item.slot()] < 0) { + continue; + } + ssm << "(" << feature_item.sign() << "," << feature_item.slot(); + for (auto weight : feature_item.gradients) { + ssm << "," << weight; + } + ssm << ")"; + } + } + VLOG(2) << "[DEBUG][sparse_slot_push]" << ssm.str(); + } + } auto push_status = ps_client->push_sparse(_table_id, keys.data(), (const float**)push_values, key_idx); //auto ret = push_status.get(); @@ -180,8 +233,8 @@ public: } virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor& value_accessor, - SparseInputVariable& variable, SampleInstance& sample) { + paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable, + SampleInstance& sample, FeatureItem& feature) { // join阶段不回填梯度 CHECK(false); return; @@ -207,12 +260,13 @@ public: } virtual void fill_gradient(float* push_value, const float* gradient_raw, - paddle::ps::ValueAccessor& value_accessor, - SparseInputVariable& variable, SampleInstance& sample) { - push_value[0] += 1; - push_value[1] += sample.labels[0]; + paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable, + SampleInstance& sample, FeatureItem& feature) { + push_value[0] = feature.slot(); + push_value[1] += 1; + push_value[2] += sample.labels[0]; for (size_t i = 0; i < variable.slot_dim; ++i) { - push_value[i + 2] += gradient_raw[i]; + push_value[i + 3] += gradient_raw[i]; } return; } diff --git a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc index fd7bd2d659b0ad0e87376517ee4ba16122630359..2f3a584f3a41c33d58af105c2aaea8b2b730eb95 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.cc @@ -38,6 +38,7 @@ int PSlib::init_server() { _environment->rank_id(EnvironmentRole::PSERVER)); _server_ptr->start(); } + _environment->barrier(EnvironmentRole::ALL); _environment->ps_environment()->gather_ps_servers(); return 0; } diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc index 415f9955b8a5dc1dc90951d6216781d6907e6e37..bb0342ea445a8703294730df6459d4c9ef6f6d62 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc @@ -56,7 +56,6 @@ struct mpi_type_trait { return MPI_UNSIGNED_LONG_LONG; } }; - RuntimeEnvironment::RuntimeEnvironment() {} RuntimeEnvironment::~RuntimeEnvironment() {} bool RuntimeEnvironment::is_master_node(EnvironmentRole role) { @@ -87,13 +86,24 @@ public: return 0; } virtual int wireup() { - int hr = MPI_Init(NULL, NULL); + int argc = 0; + char** argv = NULL; + int hr = MPI_Init(&argc, &argv); if (MPI_SUCCESS != hr) { LOG(FATAL) << "MPI_init failed with error code" << hr; return -1; } _roles_node_info.resize(static_cast(EnvironmentRole::ALL) + 1); add_role(EnvironmentRole::ALL); + + char* value = getenv("JOB_ID"); + if (value) { + _job_id = value; + } + value = getenv("JOB_NAME"); + if (value) { + _job_name = value; + } return 0; } @@ -155,6 +165,11 @@ protected: return; } VLOG(static_cast(level)) << log_str; + /* + static std::mutex mtx; + std::lock_guard guard(mtx); + std::err << log_str; + */ } inline MpiNodeInfo& mpi_node_info(EnvironmentRole role) { diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h index 993ac36deba6d21390b19cb98e316d076e457b1d..c147b91f7bfd257bb84adc5f2298fdb6145de0a4 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h @@ -46,6 +46,15 @@ public: virtual ~RuntimeEnvironment(); // 配置初始化 virtual int initialize(YAML::Node config) = 0; + + // job 信息 + virtual std::string job_id() { + return _job_id; + } + virtual std::string job_name() { + return _job_name; + } + // 设置role virtual int add_role(EnvironmentRole role) = 0; // 判断role @@ -90,9 +99,21 @@ public: protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, EnvironmentLogLevel level, const std::string& log_str) = 0; + + std::string _job_id = "default_job_id"; + std::string _job_name = "default_job_name"; }; REGIST_REGISTERER(RuntimeEnvironment); +#define ENVLOG_WORKER_ALL_NOTICE \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::NOTICE, +#define ENVLOG_WORKER_MASTER_NOTICE \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::NOTICE, +#define ENVLOG_WORKER_ALL_ERROR \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::ERROR, +#define ENVLOG_WORKER_MASTER_ERROR \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::ERROR, + std::string format_timestamp(time_t time, const char* format); inline std::string format_timestamp(time_t time, const std::string& format) { return format_timestamp(time, format.c_str()); diff --git a/paddle/fluid/train/custom_trainer/feed/conf/env.conf b/paddle/fluid/train/custom_trainer/feed/conf/env.conf new file mode 100644 index 0000000000000000000000000000000000000000..f97cf5570b0dabf29b5a48ea69ec01c1415aaf0c --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/conf/env.conf @@ -0,0 +1,19 @@ +HPC_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/smart_client +HADOOP_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/hadoop-client/hadoop/ + +#===============Job-related config====================== +MPI_JOB_NAME=feed_smfw_shoubai_video_cupai_new_arch +MPI_QUEUE=feed5 +MPI_PRIORITY=high +MPI_NODE_NUM=100 +MPI_WALL_TIME=700:00:00 +MPI_NODE_MEM=100000 +MPI_RESOURCE=full + +#===========MPI cluster Server(nmg-off/10g/hlan)========== +MPI_SERVER=yq01-hpc-lvliang01-smart-master.dmop.baidu.com + +#===========Cluster-related (HDFS/MPI Server)============== +HDFS_ROOT=/user/feed/mlarch/mio_temp/$(date +%Y%m%d-%H%M%S-%N) +HADOOP_FS=afs://xingtian.afs.baidu.com:9902 +HADOOP_UGI=mlarch,Fv1M87 diff --git a/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml b/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml index c71205b2468b16226143f660fa806ecd959f9080..5c687fcbe1f3d4d671e1cde0ab7e194e91896efe 100644 --- a/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml +++ b/paddle/fluid/train/custom_trainer/feed/conf/trainer.yaml @@ -44,7 +44,7 @@ executor: train_batch_size: 32 input_parse_thread_num: 10 push_gradient_thread_num: 16 - train_thread_num: 16 + train_thread_num: 12 need_dump_all_model: true - name: update class: SimpleExecutor @@ -52,5 +52,5 @@ executor: train_batch_size: 32 input_parse_thread_num: 10 push_gradient_thread_num: 16 - train_thread_num: 16 + train_thread_num: 12 need_dump_all_model: false diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc index 3898583d82ccc3f08787d5eaddd260c3e83a2aa3..7eb3156bf6eea99cd20cfee3f601ee4268cca919 100755 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc @@ -469,6 +469,7 @@ public: return read_all(file_list, data_channel); } virtual int read_all(const std::vector& file_list, ::paddle::framework::Channel data_channel) { + data_channel->Open(); const int file_list_size = file_list.size(); std::atomic is_failed(false); diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc index 28d0eb4189a9e0c37dd64083127af2fed4a4464f..af3d4f7cbf18bde80dadf10fcd11ba5135110b97 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc @@ -1,3 +1,4 @@ +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include "paddle/fluid/train/custom_trainer/feed/monitor/monitor.h" #include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" @@ -94,20 +95,22 @@ paddle::framework::Channel MultiThreadExecutor::run( [this, parser](DataItem* item, size_t item_num, ScopePoolObj* scope, size_t* scope_num, size_t thread_idx) -> int { *scope_num = 1; + paddle::platform::Timer timer; + timer.Start(); auto scope_obj = _scope_obj_pool->get(); - auto* samples = new SampleInstance[item_num]; + auto* scope_context = new ScopeExecutorContext(item_num); + auto* samples = scope_context->samples(); for (size_t i = 0; i parse_to_sample(item[i], samples[i]) == 0); } for (size_t i = 0; i < _input_accessors.size(); ++i) { _input_accessors[i]->forward(samples, item_num, scope_obj.get()); } - int64_t data_for_scope = (int64_t)samples; + timer.Pause(); + scope_context->prepare_cost_ms = timer.ElapsedMS(); + int64_t data_for_scope = (int64_t)scope_context; ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, - "sample_data", data_for_scope); - data_for_scope = (int64_t)item_num; - ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, - "sample_num", data_for_scope); + "scope_context", data_for_scope); *scope = std::move(scope_obj); return 0; }); @@ -123,7 +126,14 @@ paddle::framework::Channel MultiThreadExecutor::run( auto* executor = _thread_executors[thread_idx].get(); size_t& out_idx = *out_num; for (out_idx = 0; out_idx < in_num; ++out_idx) { - CHECK(executor->run(in_items[out_idx].get()) == 0); + auto* scope = in_items[out_idx].get(); + auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "scope_context")); + paddle::platform::Timer timer; + timer.Start(); + CHECK(executor->run(scope) == 0); + timer.Pause(); + scope_ctx->executor_cost_ms = timer.ElapsedMS(); out_items[out_idx] = std::move(in_items[out_idx]); } return 0; @@ -139,20 +149,24 @@ paddle::framework::Channel MultiThreadExecutor::run( int* out_items, size_t* out_num, size_t thread_idx) -> int { size_t& out_idx = *out_num; for (out_idx = 0; out_idx < in_num; ++out_idx) { + paddle::platform::Timer timer; + timer.Start(); auto* scope = in_items[out_idx].get(); - auto sample_num = *ScopeHelper::get_value( - scope, _trainer_context->cpu_place, "sample_num"); + auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "scope_context")); + auto* samples = scope_ctx->samples(); + auto sample_num = scope_ctx->sample_num(); - auto* samples = (SampleInstance*)(*ScopeHelper::get_value( - scope, _trainer_context->cpu_place, "sample_data")); for (size_t i = 0; i < _input_accessors.size(); ++i) { out_items[out_idx] = _input_accessors[i]-> backward(samples, sample_num, scope); } + timer.Pause(); + scope_ctx->push_gradient_cost_ms = timer.ElapsedMS(); for (auto& monitor : _monitors) { - monitor->add_data(epoch_id, this, samples, sample_num); + monitor->add_data(epoch_id, this, scope_ctx); } - delete[] samples; // 所有pipe完成后,再回收sample + delete scope_ctx; // 所有pipe完成后,再回收sample } return 0; }); @@ -167,6 +181,8 @@ paddle::framework::Channel MultiThreadExecutor::run( monitor->compute_result(); VLOG(2) << "[Monitor]" << _train_exe_name << ", monitor:" << monitor->get_name() << ", result:" << monitor->format_result(); + _trainer_context->monitor_ssm << _train_exe_name << ":" << + monitor->get_name() << ":" << monitor->format_result() << ","; monitor->reset(); } } diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h index 12792945d9613ee892ff174f7881ab3bc73c3b7a..e1fbc42d699ed852d7bee9f9c6d64221e55b824b 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h @@ -11,6 +11,29 @@ namespace feed { class Monitor; typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj; +class ScopeExecutorContext { +public: + ScopeExecutorContext(size_t sample_num) { + _samples = new SampleInstance[sample_num]; + _sample_num = sample_num; + } + virtual ~ScopeExecutorContext() { + delete[] _samples; + } + inline SampleInstance* samples() { + return _samples; + } + inline size_t sample_num() { + return _sample_num; + } + size_t executor_cost_ms = 0; + size_t prepare_cost_ms = 0; + size_t push_gradient_cost_ms = 0; +private: + size_t _sample_num = 0; + SampleInstance* _samples = NULL; +}; + class MultiThreadExecutor { public: MultiThreadExecutor() {} diff --git a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc index 2af8e08231f114166ac39808079528f981101a2d..6ef87d7ec9e9f13d05f71d39075f68ddd6131a2b 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc @@ -73,7 +73,7 @@ public: } shell_execute(string::format_string( - "%s -rmr %s &>/dev/null; true", _hdfs_command.c_str(), path.c_str())); + "%s -rmr %s &>/dev/null; true", hdfs_command(path).c_str(), path.c_str())); } std::vector list(const std::string& path) override { diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.cc b/paddle/fluid/train/custom_trainer/feed/io/shell.cc index d6fae5d0ab1d0df8c84e04b90a2ddf85e6c3a57b..3fc87085a4adf19e44bf0932c1f7b945cad74afb 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/shell.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.cc @@ -356,6 +356,7 @@ std::string shell_get_command_output(const std::string& cmd) { return reader.get(); } } + VLOG(2) << "run shell cmd:" << cmd << ", errno:" << err_no; } while (err_no == -1); return ""; #endif diff --git a/paddle/fluid/train/custom_trainer/feed/main.cc b/paddle/fluid/train/custom_trainer/feed/main.cc index 8e8c9851db6560c9bf9a29b322dc163ad49410cd..de71d9aeaa2e05053a8fc6279a4b54c0f387ec38 100644 --- a/paddle/fluid/train/custom_trainer/feed/main.cc +++ b/paddle/fluid/train/custom_trainer/feed/main.cc @@ -32,6 +32,7 @@ int main(int argc, char* argv[]) { } auto* environment = trainer_context_ptr->environment.get(); environment->wireup(); + VLOG(2) << "node_num: " << environment->node_num(EnvironmentRole::ALL); if (environment->node_num(EnvironmentRole::ALL) == 1) { environment->add_role(EnvironmentRole::WORKER); environment->add_role(EnvironmentRole::PSERVER); @@ -42,6 +43,7 @@ int main(int argc, char* argv[]) { } trainer_context_ptr->pslib.reset(new PSlib()); std::string ps_config = config["environment"]["ps"].as(); + trainer_context_ptr->environment->barrier(EnvironmentRole::ALL); trainer_context_ptr->pslib->initialize(ps_config, environment); //VLOG(3) << "Node Start With Role:" << role; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc index 7ad7054fe639c7a1d162db89c10d55386da964ca..3748d9d3eccf50a106efc75d14d01d3cb9cceed1 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc @@ -1,4 +1,5 @@ #include "paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" namespace paddle { namespace custom_trainer { @@ -19,7 +20,9 @@ int AucMonitor::initialize(const YAML::Node& config, std::shared_ptrsample_num(); + auto* samples = ctx->samples(); CHECK(num > 0); std::lock_guard lock(_mutex); for (int i = 0; i < num; ++i) { @@ -80,6 +83,10 @@ std::string AucMonitor::format_result() { } void AucMonitor::add_unlocked(double pred, int label) { + if (std::isnan(pred)) { + VLOG(2) << "pred[" << pred << "] outside of [0,1]"; + continue; + } CHECK(pred >= 0 && pred <= 1) << "pred[" << pred << "] outside of [0,1]"; CHECK(label == 0 || label == 1) << "label[" << label << "] invalid"; _table[label][std::min(int(pred * _table_size), _table_size - 1)]++; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h index 51af181ff3f80ea593b55c70cdc64a185efe4489..c668a731a9fb37cb6a27c00a97a0a62dfee0a681 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h @@ -18,8 +18,8 @@ public: std::shared_ptr context_ptr) override; //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const MultiThreadExecutor* executor, - SampleInstance* samples, size_t num); + virtual void add_data(int epoch_id, + const MultiThreadExecutor* executor, ScopeExecutorContext*); //是否开始结果统计 virtual bool need_compute_result(int epoch_id); diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc index 207bbab82af95e8f331b0313b3f20e57803464b4..1c247e0af5b9d822a75adcde6af804a3f01f150e 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc +++ b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.cc @@ -1,4 +1,5 @@ #include "paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" namespace paddle { namespace custom_trainer { @@ -12,8 +13,9 @@ int CostMonitor::initialize(const YAML::Node& config, std::shared_ptrsample_num(); + auto* samples = ctx->samples(); CHECK(executor != nullptr); //TODO use paddle time _total_time_ms += 1; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h index f64d93d05eecaad8ea23567415f2adc508215c74..6c78bfa92ee87c6149116560099b6ab082b15c0d 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h @@ -18,9 +18,7 @@ public: //添加一项记录,统计内容Monitor自行从Executor按需获取 virtual void add_data(int epoch_id, - const MultiThreadExecutor* executor, - SampleInstance* samples, - size_t num); + const MultiThreadExecutor* executor, ScopeExecutorContext*); //是否开始结果统计 virtual bool need_compute_result(int epoch_id); diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h index 4a9636344e2518870eb73efca6191c6dc5a2dd54..fc5258ac97912583e8e33abf4406a904fd64d380 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h @@ -10,6 +10,7 @@ namespace paddle { namespace custom_trainer { namespace feed { class MultiThreadExecutor; +class ScopeExecutorContext; class Monitor { public: @@ -25,8 +26,8 @@ public: } //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const MultiThreadExecutor* executor, - SampleInstance* samples, size_t num) = 0; + virtual void add_data(int epoch_id, + const MultiThreadExecutor* executor, ScopeExecutorContext*) = 0; //是否对于当前epoch_id进行结果统计 virtual bool need_compute_result(int epoch_id) = 0; diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc index 6eb8fb653e1c310271a98a43d0c342587389f64e..0f7b204efc0f6509359dd6e6493dcf5164fba87a 100755 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc @@ -3,6 +3,7 @@ *Train样本 */ #include +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include "paddle/fluid/train/custom_trainer/feed/dataset/dataset.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h" @@ -25,26 +26,18 @@ int LearnerProcess::initialize(std::shared_ptr context_ptr) { return 0; } -std::future LearnerProcess::save_model(uint64_t epoch_id, int table_id, ModelSaveWay way) { - std::promise p; - auto ret = p.get_future(); - auto* ps_client = _context_ptr->pslib->ps_client(); - auto* epoch_accessor = _context_ptr->epoch_accessor.get(); - if (epoch_accessor->need_save_model(epoch_id, way)) { - VLOG(2) << "Start save model, table_id:" << table_id; - auto model_dir = epoch_accessor->model_save_path(epoch_id, way); - return ps_client->save(table_id, model_dir, std::to_string((int)way)); - } else { - p.set_value(0); - } - return ret; -} - int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) { + auto* ps_client = _context_ptr->pslib->ps_client(); auto* environment = _context_ptr->environment.get(); + auto* epoch_accessor = _context_ptr->epoch_accessor.get(); if (!environment->is_master_node(EnvironmentRole::WORKER)) { return 0; } + if (!epoch_accessor->need_save_model(epoch_id, way)) { + return 0; + } + paddle::platform::Timer timer; + timer.Start(); std::set table_set; for (auto& executor : _executors) { const auto& table_accessors = executor->table_accessors(); @@ -56,13 +49,18 @@ int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) { auto table_num = table_set.size(); std::future rets[table_num]; for (auto table_id : table_set) { - rets[ret_size++] = save_model(epoch_id, table_id, way); + VLOG(2) << "Start save model, table_id:" << table_id; + auto model_dir = epoch_accessor->model_save_path(epoch_id, way); + rets[ret_size++] = ps_client->save(table_id, model_dir, std::to_string((int)way)); } int all_ret = 0; for (int i = 0; i < ret_size; ++i) { rets[i].wait(); all_ret |= rets[i].get(); } + timer.Pause(); + VLOG(2) << "Save Model Cost(s):" << timer.ElapsedSec(); + _context_ptr->epoch_accessor->update_model_donefile(epoch_id, way); return all_ret; } @@ -115,6 +113,7 @@ int LearnerProcess::run() { while (true) { epoch_accessor->next_epoch(); + _context_ptr->monitor_ssm.str(""); bool already_dump_inference_model = false; epoch_id = epoch_accessor->current_epoch_id(); std::string epoch_log_title = paddle::string::format_string( @@ -141,6 +140,8 @@ int LearnerProcess::run() { std::map> backup_input_map; for (auto& executor : _executors) { environment->barrier(EnvironmentRole::WORKER); + paddle::platform::Timer timer; + timer.Start(); VLOG(2) << "Start executor:" << executor->train_exe_name(); auto data_name = executor->train_data_name(); paddle::framework::Channel input_channel; @@ -150,12 +151,12 @@ int LearnerProcess::run() { input_channel = dataset->fetch_data(data_name, epoch_id); } input_channel = executor->run(input_channel, dataset->data_parser(data_name)); - VLOG(2) << "End executor:" << executor->train_exe_name(); + timer.Pause(); + VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost" << timer.ElapsedSec(); // 等待异步梯度完成 _context_ptr->ps_client()->flush(); environment->barrier(EnvironmentRole::WORKER); - if (executor->is_dump_all_model()) { already_dump_inference_model = true; wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta); @@ -167,16 +168,12 @@ int LearnerProcess::run() { //Step3. Dump Model For Delta&&Checkpoint { - if (!already_dump_inference_model) { - already_dump_inference_model = true; - wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta); - } + wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceBase); wait_save_model(epoch_id, ModelSaveWay::ModelSaveTrainCheckpoint); environment->barrier(EnvironmentRole::WORKER); epoch_accessor->epoch_done(epoch_id); environment->barrier(EnvironmentRole::WORKER); - } //Step4. Output Monitor && RunStatus diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h index 86f0378a305d2604082a4f1b03be9e7f1e93ab19..49b6956206dca6fd74b7df90f62e8995285e3c1f 100644 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h @@ -22,8 +22,6 @@ protected: virtual int load_model(uint64_t epoch_id); // 同步保存所有模型 virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way); -// 异步保存指定模型 -virtual std::future save_model(uint64_t epoch_id, int table_id, ModelSaveWay way); private: std::vector> _executors; diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh b/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh new file mode 100755 index 0000000000000000000000000000000000000000..d7c746fcbfabb11aa74737a65fa4308905b48ecb --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/compake_runable_package.sh @@ -0,0 +1,44 @@ +#!/bin/bash +#用于运行期的hadoop访问 +TRAINER_HODOOP_HOME="" +#用于跟据网络脚本生成模型 +TRAINER_PYTHON_HOME="/home/xiexionghang/paddle/py-paddle/" + +#环境准备 +if [ ! -f ${TRAINER_PYTHON_HOME}/python/bin/paddle ];then + echo "Miss File: ${TRAINER_PYTHON_HOME}/python/bin/paddle" + echo "TRAINER_PYTHON_HOME:${TRAINER_PYTHON_HOME} is invalid, Fix it, or Get From here:" + echo "wget ftp://cp01-arch-gr06.epc.baidu.com/home/xiexionghang/paddle/py-paddle.tar.gz" + echo "Then set TRAINER_PYTHON_HOME" + exit 0 +fi +TRAINER_PYTHON_BIN=${TRAINER_PYTHON_HOME}/python/bin/python +# for bad paddle 这里需要想办法解决,paddle的前置目录太多 +if [ ! -f ../../../third_party/install/pslib/lib/libps.so ];then + mkdir -p ../../../third_party/install/pslib/lib/ + ln -s ${TRAINER_PYTHON_HOME}/third_party/install/pslib/lib/libps.so ../../../third_party/install/pslib/lib/libps.so +fi + + +#生成模型配置 +#这里按名匹配 可能会出现匹配错误&兼容性差的问题,最好是先python解析yaml文件 +items=`grep " name:" conf/trainer.yaml | awk -F ':' '{print $2}' |awk '{sub("^ *","");sub(" *$","");print}'` +for item in ${items[@]}; +do + if [ ! -f scripts/${item}.py ];then + echo "Missing model_net config: scripts/${item}.py, skip it $item" + continue + fi + rm -rf model/$item + ${TRAINER_PYTHON_BIN} scripts/create_programs.py scripts/${item}.py + if [ $? -ne 0 ];then + echo "Create model with scripts/${item}.py failed" + exit 1 + fi +done + +#输出package包 +rm -rf package +mkdir package +cp -r bin conf tool scripts model so package +cp -r ${TRAINER_HODOOP_HOME} package/hadoop-client diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/join.py b/paddle/fluid/train/custom_trainer/feed/scripts/join.py index 42279410da99e7049fd12e0b7bb1e46fe40afe62..94001fd75560d934e4bc0ab6ae48402f8aec7ac8 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/join.py +++ b/paddle/fluid/train/custom_trainer/feed/scripts/join.py @@ -26,22 +26,33 @@ def inference(): # TODO: build network here cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False) - net = cvm_input net = fluid.layers.data_norm(input=net, name="bn6048", epsilon=1e-4, param_attr={"batch_size":1e4, "batch_sum_default":0.0, "batch_square":1e4}) - net = fluid.layers.fc(net, 511, act='relu', name='fc_1') - net = fluid.layers.fc(net, 255, act='relu', name='fc_2') - net = fluid.layers.fc(net, 255, act='relu', name='fc_3') - net = fluid.layers.fc(net, 127, act='relu', name='fc_4') - net = fluid.layers.fc(net, 127, act='relu', name='fc_5') - net = fluid.layers.fc(net, 127, act='relu', name='fc_6') - net = fluid.layers.fc(net, 127, act='relu', name='fc_7') - + lr_x = 1.0 + init_range = 0.2 + fc_layers_size = [511, 255, 255, 127, 127, 127, 127] + fc_layers_act = ["relu"] * len(fc_layers_size) + scales_tmp = [net.shape[1]] + fc_layers_size + scales = [] + for i in range(len(scales_tmp)): + scales.append(init_range / (scales_tmp[i] ** 0.5)) + for i in range(len(fc_layers_size)): + net = fluid.layers.fc( + input = net, + size = fc_layers_size[i], + name = 'fc_' + str(i+1), + act = fc_layers_act[i], + param_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])), + bias_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i]))) ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr') accessors = [ - { "class": "AbacusSparseUpdateAccessor", "input": "sparses", "table_id": 0, "need_gradient": False}, + { "class": "AbacusSparseJoinAccessor", "input": "sparses", "table_id": 0, "need_gradient": False}, { "class": "DenseInputAccessor", "input": "vars", "table_id": 1, "need_gradient": True, "async_pull": True}, { "class": "DenseInputAccessor", "input": "sums", "table_id": 2, "need_gradient": True, "async_pull": True}, { "class": "LabelInputAccessor", "input": "labels"} diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program index dd455fb1e9d6d8d46f8782b7813b82a3e7c2badc..e828a50d5a918890d2751715cefff2f7b5c75cd8 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/main_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml index e13df08563cbda5bf25d5ad430c430ae3418f6f5..c8093f88f28e20c0e7303c6641819893742a0d8d 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml +++ b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/model.yaml @@ -1,6 +1,6 @@ aa_Attention: Do Not Modify This File Manually, Unless You Really Know It input_accessor: -- class: AbacusSparseUpdateAccessor +- class: AbacusSparseJoinAccessor input: - name: cvm_input slot_dim: 11 diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program index 3f0c50e77b41889f790d9c8001e43b7b275fc152..a747784f491e64bf91931900f55e70a88e2126b2 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/startup_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program index 6860dda25a06b7aadfdb4ceb97572ec21e9f19b3..6f6b0a9c760d96f87b7dc6ec9c543ceec60f1db0 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/join/test_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program index f2061d99b8103af8aa0a4b748e12b6e14c4a4f71..7b5a31c35d856102ee088e2d8f845a7a75714483 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/main_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program index 5a47d80a18c8a96074fab78e553dc6a1e212230f..2da50a2803f53c50979049e4d2d0f92490ec615f 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/startup_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program index 9904b7d2a5a61ecda34d2c8e10342483ff961ba3..e0ce42330ed277e4190c41f6cffc4ac5251b311b 100644 Binary files a/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program and b/paddle/fluid/train/custom_trainer/feed/scripts/model/update/test_program differ diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh b/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh index 1d4c21148e05e2c9cba0c3a132d53b2871635963..a00f6a9faf5a4b4e4e5cbc661aa66983adfa1fec 100755 --- a/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh +++ b/paddle/fluid/train/custom_trainer/feed/scripts/start_feed_trainer.sh @@ -1,3 +1,51 @@ #!/bin/bash -export LD_LIBRARY_PATH=LD_LIBRARY_PATH:./so -./bin/feed_trainer "$@" +BIN_FILE=feed_trainer +work_dir=`pwd` + +function usage() { + echo -e "\033[41mUSAGE: sh scripts/start_feed_trainer.sh [run_mode]\033[0m" + echo "run_mode=mpi, run job in mpi cluster" + echo "run_mode=mpi_tmp, run 1 node with mpi in /tmp" + echo "run_mode=local, run 1 node in local" + echo "Example: sh scripts/start_feed_trainer.sh local" + exit 0 +} +if [ $# -lt 1 ];then + run_mode="mpi" +else + run_mode="$1" +fi + +export PATH=/usr/local/openmpi/bin:$PATH +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/openmpi/lib/ +if [ "${run_mode}" = "mpi" ];then + mpirun mv package/* . + mpirun mkdir -p log + export HADOOP_HOME="./hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:./bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so + mpirun sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml + export HADOOP_HOME="./hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:/bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so + + GLOG_logtostderr=0 mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir ./bin/feed_trainer --log_dir=log +elif [ "${run_mode}" = "mpi_tmp" ];then + mv package/* . + mkdir temp + export HADOOP_HOME="$work_dir/hadoop-client/hadoop" + export PATH=$HADOOP_HOME/bin/:/bin:$PATH + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so + sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml + mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir --mca orte_tmpdir_base ${work_dir}/temp scripts/start_feed_trainer.sh random_log +elif [ "${run_mode}" = "local" ];then + sed -i 's/MPIRuntimeEnvironment/LocalRuntimeEnvironment/g' conf/*.yaml + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so + mkdir log + ./bin/feed_trainer --log_dir=log +elif [ "${run_mode}" = "random_log" ];then + log_dir="log/log.${RANDOM}" + ./bin/feed_trainer --log_dir=log +else + usage +fi diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh b/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh new file mode 100755 index 0000000000000000000000000000000000000000..df7c6a8687b5eb386077d98eedc4cb775bf2d76a --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/submit_mpi.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +export PATH=/bin/:$PATH +set -x + +source conf/env.conf + +echo "# This file is automatically generated. Don't change it." > conf/qsub_f.conf +echo "SERVER=$MPI_SERVER" >> conf/qsub_f.conf +echo "QUEUE=$MPI_QUEUE" >> conf/qsub_f.conf +echo "PRIORITY=$MPI_PRIORITY" >> conf/qsub_f.conf + +export HADOOP_HOME=$HADOOP_HOME + +sh scripts/compake_runable_package.sh + +$HPC_HOME/bin/qsub_f \ + -N $MPI_JOB_NAME \ + --conf conf/qsub_f.conf \ + --hdfs $HADOOP_FS \ + --ugi $HADOOP_UGI \ + --hout $HDFS_ROOT \ + --files package \ + -l nodes=$MPI_NODE_NUM,walltime=$MPI_WALL_TIME,pmem-hard=$MPI_NODE_MEM,pcpu-soft=180,pnetin-soft=1000,pnetout-soft=1000 \ + scripts/start_feed_trainer.sh + +if [ $? -ne 0 ]; then + echo -e "qsub_f failed, please check the config or get help from abacus RD\n" + exit -1 +fi + +exit 0 diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/update.py b/paddle/fluid/train/custom_trainer/feed/scripts/update.py index c89ada5de133dd35e466f59d7ebf935d679b1302..cb2cbacb5bdbcc42fdff930898e71af6885d1bec 100644 --- a/paddle/fluid/train/custom_trainer/feed/scripts/update.py +++ b/paddle/fluid/train/custom_trainer/feed/scripts/update.py @@ -25,11 +25,26 @@ def inference(): cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False) net = cvm_input - net = fluid.layers.fc(net, 511, act='relu', name='fc_1') - net = fluid.layers.fc(net, 255, act='relu', name='fc_2') - net = fluid.layers.fc(net, 127, act='relu', name='fc_3') - net = fluid.layers.fc(net, 127, act='relu', name='fc_4') - net = fluid.layers.fc(net, 127, act='relu', name='fc_5') + lr_x = 1.0 + init_range = 0.2 + fc_layers_size = [511, 255, 127, 127, 127] + fc_layers_act = ["relu"] * len(fc_layers_size) + scales_tmp = [net.shape[1]] + fc_layers_size + scales = [] + for i in range(len(scales_tmp)): + scales.append(init_range / (scales_tmp[i] ** 0.5)) + for i in range(len(fc_layers_size)): + net = fluid.layers.fc( + input = net, + size = fc_layers_size[i], + name = 'fc_' + str(i+1), + act = fc_layers_act[i], + param_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])), + bias_attr = \ + fluid.ParamAttr(learning_rate=lr_x, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i]))) ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr') diff --git a/paddle/fluid/train/custom_trainer/feed/trainer_context.h b/paddle/fluid/train/custom_trainer/feed/trainer_context.h index f9c86a45a307791d6700e8bac9759e2184022368..030df06b9724fa37c3c553b8f3df828fe352f130 100755 --- a/paddle/fluid/train/custom_trainer/feed/trainer_context.h +++ b/paddle/fluid/train/custom_trainer/feed/trainer_context.h @@ -2,6 +2,7 @@ #include #include #include +#include #include "paddle/fluid/platform/place.h" #include "paddle/fluid/train/custom_trainer/feed/common/yaml_helper.h" #include "paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h" @@ -48,6 +49,7 @@ public: paddle::platform::CPUPlace cpu_place; std::shared_ptr pslib; + std::stringstream monitor_ssm; //记录monitor信息 std::shared_ptr dataset; //训练样本 std::shared_ptr file_system; //文件操作辅助类 std::shared_ptr epoch_accessor; //训练轮次控制