提交 6c6a7a14 编写于 作者: X xiexionghang

mpi control && trainer-net update && bug fix

上级 50e6bfc0
#include <sstream>
#include "gflags/gflags.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
DEFINE_string(feed_trainer_debug_dense_name, "", "open dense debug for specif layer_name");
int DenseInputAccessor::initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
......@@ -85,7 +89,6 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num,
}
_pull_mutex.unlock();
}
size_t data_buffer_idx = 0;
for (auto& variable : _x_variables) {
auto* shape_ptr = &(variable.shape[0]);
......@@ -97,6 +100,26 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num,
memcpy(var_data, _data_buffer + data_buffer_idx, variable.dim * sizeof(float));
data_buffer_idx += variable.dim;
}
if (!FLAGS_feed_trainer_debug_dense_name.empty()) {
data_buffer_idx = 0;
std::stringstream ssm;
for (auto& variable : _x_variables) {
if (variable.name != FLAGS_feed_trainer_debug_dense_name) {
data_buffer_idx += variable.dim;
continue;
}
ssm.str("");
auto& tensor = ScopeHelper::var<paddle::framework::LoDTensor>(scope, variable.name);
const auto* var_data = tensor.data<float>();
for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) {
if (data_idx > 0)
ssm << ",";
ssm << _data_buffer[data_buffer_idx + data_idx];
}
data_buffer_idx += variable.dim;
VLOG(2) << "[DEBUG]pull_dense: " << ssm.str();
}
}
if (_need_async_pull) {
++_pull_request_num;
}
......@@ -118,7 +141,25 @@ int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num,
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id);
//return push_status.get();
//push_status.get();
if (!FLAGS_feed_trainer_debug_dense_name.empty()) {
std::stringstream ssm;
for (auto& variable : _x_variables) {
ssm.str("");
if (variable.name != FLAGS_feed_trainer_debug_dense_name) {
continue;
}
auto& tensor = scope->Var(variable.gradient_name)->
Get<paddle::framework::LoDTensor>();
const auto* var_data = tensor.data<float>();
for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) {
if (data_idx > 0)
ssm << ",";
ssm << var_data[data_idx];
}
VLOG(2) << "[DEBUG]push_dense: " << ssm.str();
}
}
return 0;
}
......
......@@ -15,22 +15,22 @@ namespace feed {
return -1;
}
auto fs = _trainer_context->file_system.get();
if (config["donefile"]) {
_done_file_path = fs->path_join(_model_root_path, config["donefile"].as<std::string>());
} else {
_done_file_path = fs->path_join(_model_root_path, "epoch_donefile.txt");
}
_done_file_path = fs->path_join(_model_root_path, config["donefile"].as<std::string>("epoch_donefile.txt"));
if (!fs->exists(_done_file_path)) {
VLOG(0) << "missing done file, path:" << _done_file_path;
return -1;
}
std::string done_text = fs->tail(_done_file_path);
_done_status = paddle::string::split_string(done_text, std::string("\t"));
_current_epoch_id = get_status<uint64_t>(EpochStatusFiled::EpochIdField);
_last_checkpoint_epoch_id = get_status<uint64_t>(EpochStatusFiled::CheckpointIdField);
_last_checkpoint_path = get_status<std::string>(EpochStatusFiled::CheckpointPathField);
_inference_base_model_key = get_status<uint64_t>(EpochStatusFiled::InferenceBaseKeyField);
_inference_model_path = fs->path_join(_model_root_path, config["inference_model_dir"].as<std::string>("xbox"));
_inference_model_base_done_path = fs->path_join(_inference_model_path,
config["inference_base_done_name"].as<std::string>("xbox_base_done.txt"));
_inference_model_delta_done_path = fs->path_join(_inference_model_path,
config["inference_delta_done_name"].as<std::string>("xbox_delta_done.txt"));
return 0;
}
......@@ -46,31 +46,64 @@ namespace feed {
set_status(EpochStatusFiled::CheckpointIdField, _last_checkpoint_epoch_id);
set_status(EpochStatusFiled::CheckpointPathField, _last_checkpoint_path);
set_status(EpochStatusFiled::DateField, format_timestamp(epoch_id, "%Y%m%d"));
// 非主节点不做状态持久化
if (!_trainer_context->environment->is_master_node(EnvironmentRole::WORKER)) {
set_status(EpochStatusFiled::InferenceBaseKeyField, _inference_base_model_key);
return 0;
}
int EpochAccessor::update_model_donefile(
uint64_t epoch_id, ModelSaveWay save_way) {
auto* env = _trainer_context->environment.get();
// 非主节点不做done状态持久化
if (!env->is_master_node(EnvironmentRole::WORKER)) {
return 0;
}
auto fs = _trainer_context->file_system.get();
std::string done_str = paddle::string::join_strings(_done_status, '\t');
std::string done_str;
std::string donefile;
auto model_path = model_save_path(epoch_id, save_way);
std::string inference_done_format("{\"id\":\"%lu\",\"key\":\"%lu\",\"input\":\"%s/000\",\"record_count\":\"1\",\"file_format\":\"pb\",\"schema_version\":\"2\",\"partition_type\":\"1\",\"job_name\":\"%s\",\"job_id\":\"%s\",\"mpi_size\":\"%d\",\"monitor_data\":\"%s\"}");
auto id = time(NULL);
switch (save_way) {
case ModelSaveWay::ModelSaveTrainCheckpoint:
donefile = _done_file_path;
done_str = paddle::string::join_strings(_done_status, '\t');
break;
case ModelSaveWay::ModelSaveInferenceDelta:
donefile = _inference_model_delta_done_path;
done_str = string::format_string(inference_done_format.c_str(), id, _inference_base_model_key,
model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(),
env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str());
break;
case ModelSaveWay::ModelSaveInferenceBase:
donefile = _inference_model_base_done_path;
_inference_base_model_key = id;
done_str = string::format_string(inference_done_format.c_str(), id, id,
model_path.c_str(), env->job_name().c_str(), env->job_id().c_str(),
env->node_num(EnvironmentRole::PSERVER), _trainer_context->monitor_ssm.str().c_str());
break;
}
// 保留末尾1000数据
std::string tail_done_info = paddle::string::trim_spaces(fs->tail(_done_file_path, 1000));
std::string tail_done_info;
auto fs = _trainer_context->file_system.get();
if (fs->exists(donefile)) {
tail_done_info = paddle::string::trim_spaces(fs->tail(donefile, 1000));
}
if (tail_done_info.size() > 0) {
tail_done_info = tail_done_info + "\n" + done_str;
} else {
tail_done_info = done_str;
}
VLOG(2) << "Write epoch donefile to " << _done_file_path << ", str:" << done_str;
VLOG(2) << "Write donefile " << donefile << ", str:" << done_str;
bool write_success = false;
while (true) {
fs->remove(_done_file_path);
auto fp = fs->open_write(_done_file_path, "");
fs->remove(donefile);
auto fp = fs->open_write(donefile, "");
if (fwrite(tail_done_info.c_str(), tail_done_info.length(), 1, &*fp) == 1) {
break;
}
sleep(10);
}
VLOG(2) << "Write epoch donefile success";
VLOG(2) << "Write donefile " << donefile << "success";
return 0;
}
......@@ -126,7 +159,10 @@ namespace feed {
case ModelSaveWay::ModelSaveInferenceBase:
return is_last_epoch(epoch_id);
case ModelSaveWay::ModelSaveTrainCheckpoint:
return delta_id(epoch_id) % 8 == 0;
if (is_last_epoch(epoch_id)) {
return true;
}
return delta_id(epoch_id) % 24 == 0;
}
return false;
}
......@@ -137,11 +173,11 @@ namespace feed {
std::string date_with_hour = format_timestamp(epoch_id, "%Y%m%d%H");
switch (save_way) {
case ModelSaveWay::ModelSaveInferenceDelta:
return _trainer_context->file_system->path_join(_model_root_path,
string::format_string("xbox/%s/delta-%d", date.c_str(), delta));
return _trainer_context->file_system->path_join(_inference_model_path,
string::format_string("%s/delta-%d", date.c_str(), delta));
case ModelSaveWay::ModelSaveInferenceBase:
return _trainer_context->file_system->path_join(_model_root_path,
string::format_string("xbox/%s/base", date.c_str()));
return _trainer_context->file_system->path_join(_inference_model_path,
string::format_string("%s/base", date.c_str()));
case ModelSaveWay::ModelSaveTrainCheckpoint:
return _trainer_context->file_system->path_join(_model_root_path,
string::format_string("batch_model/%s", date_with_hour.c_str()));
......
......@@ -14,7 +14,8 @@ enum class EpochStatusFiled {
TimestampField = 1,
CheckpointPathField = 2,
EpochIdField = 3,
CheckpointIdField = 4
CheckpointIdField = 4,
InferenceBaseKeyField = 5
};
class EpochAccessor : public Accessor {
......@@ -62,14 +63,19 @@ public:
virtual bool need_save_model(uint64_t epoch_id, ModelSaveWay save_way) = 0;
virtual std::string model_save_path(uint64_t epoch_id, ModelSaveWay save_way) = 0;
virtual int update_model_donefile(uint64_t epoch_id, ModelSaveWay save_way);
protected:
TrainerContext* _trainer_context;
std::string _done_file_path;
std::string _model_root_path;
std::string _inference_model_path;
std::string _inference_model_base_done_path;
std::string _inference_model_delta_done_path;
uint64_t _current_epoch_id = 0;
std::string _last_checkpoint_path;
uint64_t _last_checkpoint_epoch_id = 0;
std::vector<std::string> _done_status; //当前完成状态,统一存成string
std::vector<std::string> _done_status; // 当前完成状态,统一存成string
uint64_t _inference_base_model_key = 0; // 预估模型的base-key
};
REGIST_REGISTERER(EpochAccessor);
......
......@@ -102,7 +102,8 @@ public:
paddle::framework::Scope* scope);
// SparseGradValue会被依次调用,用于整理push的梯度
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0;
paddle::ps::ValueAccessor&, SparseInputVariable&,
SampleInstance&, FeatureItem&) = 0;
protected:
// 输入层列表
......
#include <math.h>
#include <vector>
#include <utility>
#include <sstream>
#include "gflags/gflags.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h"
DEFINE_int32(feed_trainer_debug_sparse_slot, 0, "open sparse debug for specif slot");
namespace paddle {
namespace custom_trainer {
namespace feed {
......@@ -99,6 +103,30 @@ int32_t BaseSparseInputAccessor::forward(SampleInstance* samples,
}
}
}
if (FLAGS_feed_trainer_debug_sparse_slot) {
std::stringstream ssm;
for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) {
ssm.str("");
auto& features = samples[samp_idx].features;
for (auto& feature_item : features) {
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) {
continue;
}
if (variable.slot_idx[feature_item.slot()] < 0) {
continue;
}
ssm << "(" << feature_item.sign() << "," << feature_item.slot();
for (auto weight : feature_item.weights) {
ssm << "," << weight;
}
ssm << ")";
}
}
VLOG(2) << "[DEBUG][sparse_slot_pull]" << ssm.str();
}
}
// Variable后置处理
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
......@@ -145,12 +173,37 @@ int32_t BaseSparseInputAccessor::backward(SampleInstance* samples,
const float* grad_data = var_runtime_data[i].gradient_data +
samp_idx * variable.total_dim + variable.slot_dim * slot_idx;
fill_gradient(&(feature_item.gradients[0]), grad_data,
*value_accessor, variable, samples[samp_idx]);
*value_accessor, variable, samples[samp_idx], feature_item);
keys[key_idx] = feature_item.sign();
push_values[key_idx++] = &(feature_item.gradients[0]);
}
}
}
if (FLAGS_feed_trainer_debug_sparse_slot) {
size_t key_idx = 0;
std::stringstream ssm;
for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) {
ssm.str("");
auto& features = samples[samp_idx].features;
for (auto& feature_item : features) {
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
if (feature_item.slot() != FLAGS_feed_trainer_debug_sparse_slot) {
continue;
}
if (variable.slot_idx[feature_item.slot()] < 0) {
continue;
}
ssm << "(" << feature_item.sign() << "," << feature_item.slot();
for (auto weight : feature_item.gradients) {
ssm << "," << weight;
}
ssm << ")";
}
}
VLOG(2) << "[DEBUG][sparse_slot_push]" << ssm.str();
}
}
auto push_status = ps_client->push_sparse(_table_id,
keys.data(), (const float**)push_values, key_idx);
//auto ret = push_status.get();
......@@ -180,8 +233,8 @@ public:
}
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable,
SampleInstance& sample, FeatureItem& feature) {
// join阶段不回填梯度
CHECK(false);
return;
......@@ -207,12 +260,13 @@ public:
}
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
push_value[0] += 1;
push_value[1] += sample.labels[0];
paddle::ps::ValueAccessor& value_accessor, SparseInputVariable& variable,
SampleInstance& sample, FeatureItem& feature) {
push_value[0] = feature.slot();
push_value[1] += 1;
push_value[2] += sample.labels[0];
for (size_t i = 0; i < variable.slot_dim; ++i) {
push_value[i + 2] += gradient_raw[i];
push_value[i + 3] += gradient_raw[i];
}
return;
}
......
......@@ -38,6 +38,7 @@ int PSlib::init_server() {
_environment->rank_id(EnvironmentRole::PSERVER));
_server_ptr->start();
}
_environment->barrier(EnvironmentRole::ALL);
_environment->ps_environment()->gather_ps_servers();
return 0;
}
......
......@@ -56,7 +56,6 @@ struct mpi_type_trait<unsigned long long> {
return MPI_UNSIGNED_LONG_LONG;
}
};
RuntimeEnvironment::RuntimeEnvironment() {}
RuntimeEnvironment::~RuntimeEnvironment() {}
bool RuntimeEnvironment::is_master_node(EnvironmentRole role) {
......@@ -87,13 +86,24 @@ public:
return 0;
}
virtual int wireup() {
int hr = MPI_Init(NULL, NULL);
int argc = 0;
char** argv = NULL;
int hr = MPI_Init(&argc, &argv);
if (MPI_SUCCESS != hr) {
LOG(FATAL) << "MPI_init failed with error code" << hr;
return -1;
}
_roles_node_info.resize(static_cast<int>(EnvironmentRole::ALL) + 1);
add_role(EnvironmentRole::ALL);
char* value = getenv("JOB_ID");
if (value) {
_job_id = value;
}
value = getenv("JOB_NAME");
if (value) {
_job_name = value;
}
return 0;
}
......@@ -155,6 +165,11 @@ protected:
return;
}
VLOG(static_cast<int>(level)) << log_str;
/*
static std::mutex mtx;
std::lock_guard<std::mutex> guard(mtx);
std::err << log_str;
*/
}
inline MpiNodeInfo& mpi_node_info(EnvironmentRole role) {
......
......@@ -46,6 +46,15 @@ public:
virtual ~RuntimeEnvironment();
// 配置初始化
virtual int initialize(YAML::Node config) = 0;
// job 信息
virtual std::string job_id() {
return _job_id;
}
virtual std::string job_name() {
return _job_name;
}
// 设置role
virtual int add_role(EnvironmentRole role) = 0;
// 判断role
......@@ -90,9 +99,21 @@ public:
protected:
virtual void print_log(EnvironmentRole role, EnvironmentLogType type,
EnvironmentLogLevel level, const std::string& log_str) = 0;
std::string _job_id = "default_job_id";
std::string _job_name = "default_job_name";
};
REGIST_REGISTERER(RuntimeEnvironment);
#define ENVLOG_WORKER_ALL_NOTICE \
environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::NOTICE,
#define ENVLOG_WORKER_MASTER_NOTICE \
environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::NOTICE,
#define ENVLOG_WORKER_ALL_ERROR \
environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::ERROR,
#define ENVLOG_WORKER_MASTER_ERROR \
environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::ERROR,
std::string format_timestamp(time_t time, const char* format);
inline std::string format_timestamp(time_t time, const std::string& format) {
return format_timestamp(time, format.c_str());
......
HPC_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/smart_client
HADOOP_HOME=/home/work/xiexionghang/trainer/paddle_trainer/feed_muye/hadoop-client/hadoop/
#===============Job-related config======================
MPI_JOB_NAME=feed_smfw_shoubai_video_cupai_new_arch
MPI_QUEUE=feed5
MPI_PRIORITY=high
MPI_NODE_NUM=100
MPI_WALL_TIME=700:00:00
MPI_NODE_MEM=100000
MPI_RESOURCE=full
#===========MPI cluster Server(nmg-off/10g/hlan)==========
MPI_SERVER=yq01-hpc-lvliang01-smart-master.dmop.baidu.com
#===========Cluster-related (HDFS/MPI Server)==============
HDFS_ROOT=/user/feed/mlarch/mio_temp/$(date +%Y%m%d-%H%M%S-%N)
HADOOP_FS=afs://xingtian.afs.baidu.com:9902
HADOOP_UGI=mlarch,Fv1M87
......@@ -44,7 +44,7 @@ executor:
train_batch_size: 32
input_parse_thread_num: 10
push_gradient_thread_num: 16
train_thread_num: 16
train_thread_num: 12
need_dump_all_model: true
- name: update
class: SimpleExecutor
......@@ -52,5 +52,5 @@ executor:
train_batch_size: 32
input_parse_thread_num: 10
push_gradient_thread_num: 16
train_thread_num: 16
train_thread_num: 12
need_dump_all_model: false
......@@ -469,6 +469,7 @@ public:
return read_all(file_list, data_channel);
}
virtual int read_all(const std::vector<std::string>& file_list, ::paddle::framework::Channel<DataItem> data_channel) {
data_channel->Open();
const int file_list_size = file_list.size();
std::atomic<bool> is_failed(false);
......
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/monitor/monitor.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
......@@ -94,20 +95,22 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
[this, parser](DataItem* item, size_t item_num,
ScopePoolObj* scope, size_t* scope_num, size_t thread_idx) -> int {
*scope_num = 1;
paddle::platform::Timer timer;
timer.Start();
auto scope_obj = _scope_obj_pool->get();
auto* samples = new SampleInstance[item_num];
auto* scope_context = new ScopeExecutorContext(item_num);
auto* samples = scope_context->samples();
for (size_t i = 0; i <item_num; ++i) {
CHECK(parser->parse_to_sample(item[i], samples[i]) == 0);
}
for (size_t i = 0; i < _input_accessors.size(); ++i) {
_input_accessors[i]->forward(samples, item_num, scope_obj.get());
}
int64_t data_for_scope = (int64_t)samples;
timer.Pause();
scope_context->prepare_cost_ms = timer.ElapsedMS();
int64_t data_for_scope = (int64_t)scope_context;
ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place,
"sample_data", data_for_scope);
data_for_scope = (int64_t)item_num;
ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place,
"sample_num", data_for_scope);
"scope_context", data_for_scope);
*scope = std::move(scope_obj);
return 0;
});
......@@ -123,7 +126,14 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
auto* executor = _thread_executors[thread_idx].get();
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
CHECK(executor->run(in_items[out_idx].get()) == 0);
auto* scope = in_items[out_idx].get();
auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "scope_context"));
paddle::platform::Timer timer;
timer.Start();
CHECK(executor->run(scope) == 0);
timer.Pause();
scope_ctx->executor_cost_ms = timer.ElapsedMS();
out_items[out_idx] = std::move(in_items[out_idx]);
}
return 0;
......@@ -139,20 +149,24 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
int* out_items, size_t* out_num, size_t thread_idx) -> int {
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
paddle::platform::Timer timer;
timer.Start();
auto* scope = in_items[out_idx].get();
auto sample_num = *ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "sample_num");
auto* scope_ctx = (ScopeExecutorContext*)(*ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "scope_context"));
auto* samples = scope_ctx->samples();
auto sample_num = scope_ctx->sample_num();
auto* samples = (SampleInstance*)(*ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "sample_data"));
for (size_t i = 0; i < _input_accessors.size(); ++i) {
out_items[out_idx] = _input_accessors[i]->
backward(samples, sample_num, scope);
}
timer.Pause();
scope_ctx->push_gradient_cost_ms = timer.ElapsedMS();
for (auto& monitor : _monitors) {
monitor->add_data(epoch_id, this, samples, sample_num);
monitor->add_data(epoch_id, this, scope_ctx);
}
delete[] samples; // 所有pipe完成后,再回收sample
delete scope_ctx; // 所有pipe完成后,再回收sample
}
return 0;
});
......@@ -167,6 +181,8 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
monitor->compute_result();
VLOG(2) << "[Monitor]" << _train_exe_name << ", monitor:" << monitor->get_name()
<< ", result:" << monitor->format_result();
_trainer_context->monitor_ssm << _train_exe_name << ":" <<
monitor->get_name() << ":" << monitor->format_result() << ",";
monitor->reset();
}
}
......
......@@ -11,6 +11,29 @@ namespace feed {
class Monitor;
typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj;
class ScopeExecutorContext {
public:
ScopeExecutorContext(size_t sample_num) {
_samples = new SampleInstance[sample_num];
_sample_num = sample_num;
}
virtual ~ScopeExecutorContext() {
delete[] _samples;
}
inline SampleInstance* samples() {
return _samples;
}
inline size_t sample_num() {
return _sample_num;
}
size_t executor_cost_ms = 0;
size_t prepare_cost_ms = 0;
size_t push_gradient_cost_ms = 0;
private:
size_t _sample_num = 0;
SampleInstance* _samples = NULL;
};
class MultiThreadExecutor {
public:
MultiThreadExecutor() {}
......
......@@ -73,7 +73,7 @@ public:
}
shell_execute(string::format_string(
"%s -rmr %s &>/dev/null; true", _hdfs_command.c_str(), path.c_str()));
"%s -rmr %s &>/dev/null; true", hdfs_command(path).c_str(), path.c_str()));
}
std::vector<std::string> list(const std::string& path) override {
......
......@@ -356,6 +356,7 @@ std::string shell_get_command_output(const std::string& cmd) {
return reader.get();
}
}
VLOG(2) << "run shell cmd:" << cmd << ", errno:" << err_no;
} while (err_no == -1);
return "";
#endif
......
......@@ -32,6 +32,7 @@ int main(int argc, char* argv[]) {
}
auto* environment = trainer_context_ptr->environment.get();
environment->wireup();
VLOG(2) << "node_num: " << environment->node_num(EnvironmentRole::ALL);
if (environment->node_num(EnvironmentRole::ALL) == 1) {
environment->add_role(EnvironmentRole::WORKER);
environment->add_role(EnvironmentRole::PSERVER);
......@@ -42,6 +43,7 @@ int main(int argc, char* argv[]) {
}
trainer_context_ptr->pslib.reset(new PSlib());
std::string ps_config = config["environment"]["ps"].as<std::string>();
trainer_context_ptr->environment->barrier(EnvironmentRole::ALL);
trainer_context_ptr->pslib->initialize(ps_config, environment);
//VLOG(3) << "Node Start With Role:" << role;
......
#include "paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
namespace paddle {
namespace custom_trainer {
......@@ -19,7 +20,9 @@ int AucMonitor::initialize(const YAML::Node& config, std::shared_ptr<TrainerCont
}
void AucMonitor::add_data(int epoch_id,
const MultiThreadExecutor* executor, SampleInstance* samples, size_t num) {
const MultiThreadExecutor* executor, ScopeExecutorContext* ctx) {
auto num = ctx->sample_num();
auto* samples = ctx->samples();
CHECK(num > 0);
std::lock_guard<std::mutex> lock(_mutex);
for (int i = 0; i < num; ++i) {
......@@ -80,6 +83,10 @@ std::string AucMonitor::format_result() {
}
void AucMonitor::add_unlocked(double pred, int label) {
if (std::isnan(pred)) {
VLOG(2) << "pred[" << pred << "] outside of [0,1]";
continue;
}
CHECK(pred >= 0 && pred <= 1) << "pred[" << pred << "] outside of [0,1]";
CHECK(label == 0 || label == 1) << "label[" << label << "] invalid";
_table[label][std::min(int(pred * _table_size), _table_size - 1)]++;
......
......@@ -18,8 +18,8 @@ public:
std::shared_ptr<TrainerContext> context_ptr) override;
//添加一项记录,统计内容Monitor自行从Executor按需获取
virtual void add_data(int epoch_id, const MultiThreadExecutor* executor,
SampleInstance* samples, size_t num);
virtual void add_data(int epoch_id,
const MultiThreadExecutor* executor, ScopeExecutorContext*);
//是否开始结果统计
virtual bool need_compute_result(int epoch_id);
......
#include "paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
namespace paddle {
namespace custom_trainer {
......@@ -12,8 +13,9 @@ int CostMonitor::initialize(const YAML::Node& config, std::shared_ptr<TrainerCon
}
void CostMonitor::add_data(int epoch_id,
const MultiThreadExecutor* executor,
SampleInstance* samples, size_t num) {
const MultiThreadExecutor* executor, ScopeExecutorContext* ctx) {
auto num = ctx->sample_num();
auto* samples = ctx->samples();
CHECK(executor != nullptr);
//TODO use paddle time
_total_time_ms += 1;
......
......@@ -18,9 +18,7 @@ public:
//添加一项记录,统计内容Monitor自行从Executor按需获取
virtual void add_data(int epoch_id,
const MultiThreadExecutor* executor,
SampleInstance* samples,
size_t num);
const MultiThreadExecutor* executor, ScopeExecutorContext*);
//是否开始结果统计
virtual bool need_compute_result(int epoch_id);
......
......@@ -10,6 +10,7 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class MultiThreadExecutor;
class ScopeExecutorContext;
class Monitor {
public:
......@@ -25,8 +26,8 @@ public:
}
//添加一项记录,统计内容Monitor自行从Executor按需获取
virtual void add_data(int epoch_id, const MultiThreadExecutor* executor,
SampleInstance* samples, size_t num) = 0;
virtual void add_data(int epoch_id,
const MultiThreadExecutor* executor, ScopeExecutorContext*) = 0;
//是否对于当前epoch_id进行结果统计
virtual bool need_compute_result(int epoch_id) = 0;
......
......@@ -3,6 +3,7 @@
*Train样本
*/
#include <omp.h>
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/dataset/dataset.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h"
......@@ -25,26 +26,18 @@ int LearnerProcess::initialize(std::shared_ptr<TrainerContext> context_ptr) {
return 0;
}
std::future<int> LearnerProcess::save_model(uint64_t epoch_id, int table_id, ModelSaveWay way) {
std::promise<int> p;
auto ret = p.get_future();
auto* ps_client = _context_ptr->pslib->ps_client();
auto* epoch_accessor = _context_ptr->epoch_accessor.get();
if (epoch_accessor->need_save_model(epoch_id, way)) {
VLOG(2) << "Start save model, table_id:" << table_id;
auto model_dir = epoch_accessor->model_save_path(epoch_id, way);
return ps_client->save(table_id, model_dir, std::to_string((int)way));
} else {
p.set_value(0);
}
return ret;
}
int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) {
auto* ps_client = _context_ptr->pslib->ps_client();
auto* environment = _context_ptr->environment.get();
auto* epoch_accessor = _context_ptr->epoch_accessor.get();
if (!environment->is_master_node(EnvironmentRole::WORKER)) {
return 0;
}
if (!epoch_accessor->need_save_model(epoch_id, way)) {
return 0;
}
paddle::platform::Timer timer;
timer.Start();
std::set<uint32_t> table_set;
for (auto& executor : _executors) {
const auto& table_accessors = executor->table_accessors();
......@@ -56,13 +49,18 @@ int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way) {
auto table_num = table_set.size();
std::future<int> rets[table_num];
for (auto table_id : table_set) {
rets[ret_size++] = save_model(epoch_id, table_id, way);
VLOG(2) << "Start save model, table_id:" << table_id;
auto model_dir = epoch_accessor->model_save_path(epoch_id, way);
rets[ret_size++] = ps_client->save(table_id, model_dir, std::to_string((int)way));
}
int all_ret = 0;
for (int i = 0; i < ret_size; ++i) {
rets[i].wait();
all_ret |= rets[i].get();
}
timer.Pause();
VLOG(2) << "Save Model Cost(s):" << timer.ElapsedSec();
_context_ptr->epoch_accessor->update_model_donefile(epoch_id, way);
return all_ret;
}
......@@ -115,6 +113,7 @@ int LearnerProcess::run() {
while (true) {
epoch_accessor->next_epoch();
_context_ptr->monitor_ssm.str("");
bool already_dump_inference_model = false;
epoch_id = epoch_accessor->current_epoch_id();
std::string epoch_log_title = paddle::string::format_string(
......@@ -141,6 +140,8 @@ int LearnerProcess::run() {
std::map<std::string, paddle::framework::Channel<DataItem>> backup_input_map;
for (auto& executor : _executors) {
environment->barrier(EnvironmentRole::WORKER);
paddle::platform::Timer timer;
timer.Start();
VLOG(2) << "Start executor:" << executor->train_exe_name();
auto data_name = executor->train_data_name();
paddle::framework::Channel<DataItem> input_channel;
......@@ -150,12 +151,12 @@ int LearnerProcess::run() {
input_channel = dataset->fetch_data(data_name, epoch_id);
}
input_channel = executor->run(input_channel, dataset->data_parser(data_name));
VLOG(2) << "End executor:" << executor->train_exe_name();
timer.Pause();
VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost" << timer.ElapsedSec();
// 等待异步梯度完成
_context_ptr->ps_client()->flush();
environment->barrier(EnvironmentRole::WORKER);
if (executor->is_dump_all_model()) {
already_dump_inference_model = true;
wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta);
......@@ -167,16 +168,12 @@ int LearnerProcess::run() {
//Step3. Dump Model For Delta&&Checkpoint
{
if (!already_dump_inference_model) {
already_dump_inference_model = true;
wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta);
}
wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceBase);
wait_save_model(epoch_id, ModelSaveWay::ModelSaveTrainCheckpoint);
environment->barrier(EnvironmentRole::WORKER);
epoch_accessor->epoch_done(epoch_id);
environment->barrier(EnvironmentRole::WORKER);
}
//Step4. Output Monitor && RunStatus
......
......@@ -22,8 +22,6 @@ protected:
virtual int load_model(uint64_t epoch_id);
// 同步保存所有模型
virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way);
// 异步保存指定模型
virtual std::future<int> save_model(uint64_t epoch_id, int table_id, ModelSaveWay way);
private:
std::vector<std::shared_ptr<MultiThreadExecutor>> _executors;
......
#!/bin/bash
#用于运行期的hadoop访问
TRAINER_HODOOP_HOME=""
#用于跟据网络脚本生成模型
TRAINER_PYTHON_HOME="/home/xiexionghang/paddle/py-paddle/"
#环境准备
if [ ! -f ${TRAINER_PYTHON_HOME}/python/bin/paddle ];then
echo "Miss File: ${TRAINER_PYTHON_HOME}/python/bin/paddle"
echo "TRAINER_PYTHON_HOME:${TRAINER_PYTHON_HOME} is invalid, Fix it, or Get From here:"
echo "wget ftp://cp01-arch-gr06.epc.baidu.com/home/xiexionghang/paddle/py-paddle.tar.gz"
echo "Then set TRAINER_PYTHON_HOME"
exit 0
fi
TRAINER_PYTHON_BIN=${TRAINER_PYTHON_HOME}/python/bin/python
# for bad paddle 这里需要想办法解决,paddle的前置目录太多
if [ ! -f ../../../third_party/install/pslib/lib/libps.so ];then
mkdir -p ../../../third_party/install/pslib/lib/
ln -s ${TRAINER_PYTHON_HOME}/third_party/install/pslib/lib/libps.so ../../../third_party/install/pslib/lib/libps.so
fi
#生成模型配置
#这里按名匹配 可能会出现匹配错误&兼容性差的问题,最好是先python解析yaml文件
items=`grep " name:" conf/trainer.yaml | awk -F ':' '{print $2}' |awk '{sub("^ *","");sub(" *$","");print}'`
for item in ${items[@]};
do
if [ ! -f scripts/${item}.py ];then
echo "Missing model_net config: scripts/${item}.py, skip it $item"
continue
fi
rm -rf model/$item
${TRAINER_PYTHON_BIN} scripts/create_programs.py scripts/${item}.py
if [ $? -ne 0 ];then
echo "Create model with scripts/${item}.py failed"
exit 1
fi
done
#输出package包
rm -rf package
mkdir package
cp -r bin conf tool scripts model so package
cp -r ${TRAINER_HODOOP_HOME} package/hadoop-client
......@@ -26,22 +26,33 @@ def inference():
# TODO: build network here
cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False)
net = cvm_input
net = fluid.layers.data_norm(input=net, name="bn6048", epsilon=1e-4,
param_attr={"batch_size":1e4, "batch_sum_default":0.0, "batch_square":1e4})
net = fluid.layers.fc(net, 511, act='relu', name='fc_1')
net = fluid.layers.fc(net, 255, act='relu', name='fc_2')
net = fluid.layers.fc(net, 255, act='relu', name='fc_3')
net = fluid.layers.fc(net, 127, act='relu', name='fc_4')
net = fluid.layers.fc(net, 127, act='relu', name='fc_5')
net = fluid.layers.fc(net, 127, act='relu', name='fc_6')
net = fluid.layers.fc(net, 127, act='relu', name='fc_7')
lr_x = 1.0
init_range = 0.2
fc_layers_size = [511, 255, 255, 127, 127, 127, 127]
fc_layers_act = ["relu"] * len(fc_layers_size)
scales_tmp = [net.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
scales.append(init_range / (scales_tmp[i] ** 0.5))
for i in range(len(fc_layers_size)):
net = fluid.layers.fc(
input = net,
size = fc_layers_size[i],
name = 'fc_' + str(i+1),
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])),
bias_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])))
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
accessors = [
{ "class": "AbacusSparseUpdateAccessor", "input": "sparses", "table_id": 0, "need_gradient": False},
{ "class": "AbacusSparseJoinAccessor", "input": "sparses", "table_id": 0, "need_gradient": False},
{ "class": "DenseInputAccessor", "input": "vars", "table_id": 1, "need_gradient": True, "async_pull": True},
{ "class": "DenseInputAccessor", "input": "sums", "table_id": 2, "need_gradient": True, "async_pull": True},
{ "class": "LabelInputAccessor", "input": "labels"}
......
aa_Attention: Do Not Modify This File Manually, Unless You Really Know It
input_accessor:
- class: AbacusSparseUpdateAccessor
- class: AbacusSparseJoinAccessor
input:
- name: cvm_input
slot_dim: 11
......
#!/bin/bash
export LD_LIBRARY_PATH=LD_LIBRARY_PATH:./so
./bin/feed_trainer "$@"
BIN_FILE=feed_trainer
work_dir=`pwd`
function usage() {
echo -e "\033[41mUSAGE: sh scripts/start_feed_trainer.sh [run_mode]\033[0m"
echo "run_mode=mpi, run job in mpi cluster"
echo "run_mode=mpi_tmp, run 1 node with mpi in /tmp"
echo "run_mode=local, run 1 node in local"
echo "Example: sh scripts/start_feed_trainer.sh local"
exit 0
}
if [ $# -lt 1 ];then
run_mode="mpi"
else
run_mode="$1"
fi
export PATH=/usr/local/openmpi/bin:$PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/openmpi/lib/
if [ "${run_mode}" = "mpi" ];then
mpirun mv package/* .
mpirun mkdir -p log
export HADOOP_HOME="./hadoop-client/hadoop"
export PATH=$HADOOP_HOME/bin/:./bin:$PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so
mpirun sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml
export HADOOP_HOME="./hadoop-client/hadoop"
export PATH=$HADOOP_HOME/bin/:/bin:$PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./so
GLOG_logtostderr=0 mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir ./bin/feed_trainer --log_dir=log
elif [ "${run_mode}" = "mpi_tmp" ];then
mv package/* .
mkdir temp
export HADOOP_HOME="$work_dir/hadoop-client/hadoop"
export PATH=$HADOOP_HOME/bin/:/bin:$PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so
sed -i 's/LocalRuntimeEnvironment/MPIRuntimeEnvironment/g' conf/*.yaml
mpirun -npernode 2 -timestamp-output -tag-output --prefix $work_dir --mca orte_tmpdir_base ${work_dir}/temp scripts/start_feed_trainer.sh random_log
elif [ "${run_mode}" = "local" ];then
sed -i 's/MPIRuntimeEnvironment/LocalRuntimeEnvironment/g' conf/*.yaml
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${work_dir}/so
mkdir log
./bin/feed_trainer --log_dir=log
elif [ "${run_mode}" = "random_log" ];then
log_dir="log/log.${RANDOM}"
./bin/feed_trainer --log_dir=log
else
usage
fi
#!/bin/bash
export PATH=/bin/:$PATH
set -x
source conf/env.conf
echo "# This file is automatically generated. Don't change it." > conf/qsub_f.conf
echo "SERVER=$MPI_SERVER" >> conf/qsub_f.conf
echo "QUEUE=$MPI_QUEUE" >> conf/qsub_f.conf
echo "PRIORITY=$MPI_PRIORITY" >> conf/qsub_f.conf
export HADOOP_HOME=$HADOOP_HOME
sh scripts/compake_runable_package.sh
$HPC_HOME/bin/qsub_f \
-N $MPI_JOB_NAME \
--conf conf/qsub_f.conf \
--hdfs $HADOOP_FS \
--ugi $HADOOP_UGI \
--hout $HDFS_ROOT \
--files package \
-l nodes=$MPI_NODE_NUM,walltime=$MPI_WALL_TIME,pmem-hard=$MPI_NODE_MEM,pcpu-soft=180,pnetin-soft=1000,pnetout-soft=1000 \
scripts/start_feed_trainer.sh
if [ $? -ne 0 ]; then
echo -e "qsub_f failed, please check the config or get help from abacus RD\n"
exit -1
fi
exit 0
......@@ -25,11 +25,26 @@ def inference():
cvm_input = fluid.layers.data(name='cvm_input', shape=[sparse_cvm_dim(sparse_cvm)], dtype='float32', stop_gradient=False)
net = cvm_input
net = fluid.layers.fc(net, 511, act='relu', name='fc_1')
net = fluid.layers.fc(net, 255, act='relu', name='fc_2')
net = fluid.layers.fc(net, 127, act='relu', name='fc_3')
net = fluid.layers.fc(net, 127, act='relu', name='fc_4')
net = fluid.layers.fc(net, 127, act='relu', name='fc_5')
lr_x = 1.0
init_range = 0.2
fc_layers_size = [511, 255, 127, 127, 127]
fc_layers_act = ["relu"] * len(fc_layers_size)
scales_tmp = [net.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
scales.append(init_range / (scales_tmp[i] ** 0.5))
for i in range(len(fc_layers_size)):
net = fluid.layers.fc(
input = net,
size = fc_layers_size[i],
name = 'fc_' + str(i+1),
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])),
bias_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])))
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
......
......@@ -2,6 +2,7 @@
#include <string>
#include <memory>
#include <vector>
#include <sstream>
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/train/custom_trainer/feed/common/yaml_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h"
......@@ -48,6 +49,7 @@ public:
paddle::platform::CPUPlace cpu_place;
std::shared_ptr<PSlib> pslib;
std::stringstream monitor_ssm; //记录monitor信息
std::shared_ptr<Dataset> dataset; //训练样本
std::shared_ptr<FileSystem> file_system; //文件操作辅助类
std::shared_ptr<EpochAccessor> epoch_accessor; //训练轮次控制
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册