diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc old mode 100644 new mode 100755 index bb0342ea445a8703294730df6459d4c9ef6f6d62..ccf5620758b96a6947bac0f8e97ca7af61fa6881 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc @@ -161,7 +161,12 @@ public: protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, EnvironmentLogLevel level, const std::string& log_str) { - if (type == EnvironmentLogType::MASTER_LOG && !is_master_node(role)) { + if (type == EnvironmentLogType::MASTER_LOG) { + if (is_master_node(role)) { + fprintf(stdout, log_str.c_str()); + fprintf(stdout, "\n"); + fflush(stdout); + } return; } VLOG(static_cast(level)) << log_str; diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h old mode 100644 new mode 100755 index d6d91bc66c0fefbb4b0db76a1f66c75d04fc69f3..164e70b088318810aab27b7e6e0e9137ccf14302 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h @@ -75,8 +75,8 @@ public: // 环境定制化log template void log(EnvironmentRole role, EnvironmentLogType type, - EnvironmentLogLevel level, const char* fmt, ARGS && ... args) { - print_log(role, type, level, paddle::string::format_string(fmt, args...)); + EnvironmentLogLevel level, ARGS && ... args) { + print_log(role, type, level, paddle::string::format_string(args...)); } // 多线程可调用接口 End @@ -106,14 +106,14 @@ protected: }; REGIST_REGISTERER(RuntimeEnvironment); -#define ENVLOG_WORKER_ALL_NOTICE \ -environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::NOTICE, -#define ENVLOG_WORKER_MASTER_NOTICE \ -environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::NOTICE, -#define ENVLOG_WORKER_ALL_ERROR \ -environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogType::ERROR, -#define ENVLOG_WORKER_MASTER_ERROR \ -environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogType::ERROR, +#define ENVLOG_WORKER_ALL_NOTICE(...) \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogLevel::NOTICE, __VA_ARGS__); +#define ENVLOG_WORKER_MASTER_NOTICE(...) \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::NOTICE, __VA_ARGS__); +#define ENVLOG_WORKER_ALL_ERROR(...) \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::ALL_LOG, EnvironmentLogLevel::ERROR, __VA_ARGS__); +#define ENVLOG_WORKER_MASTER_ERROR(...) \ +environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::ERROR, __VA_ARGS__); std::string format_timestamp(time_t time, const char* format); inline std::string format_timestamp(time_t time, const std::string& format) { diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc old mode 100644 new mode 100755 index ae607ccb2d899af71cee57b61b67ff77f1b657a4..9ff97f792703a2d956e838ed285a38bf9f723860 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc @@ -137,7 +137,7 @@ paddle::framework::Channel MultiThreadExecutor::run( paddle::framework::Channel input, const DataParser* parser) { uint64_t epoch_id = _trainer_context->epoch_accessor->current_epoch_id(); - + auto* environment = _trainer_context->environment.get(); // 输入流 PipelineOptions input_pipe_option; input_pipe_option.need_hold_input_data = true; @@ -243,8 +243,8 @@ paddle::framework::Channel MultiThreadExecutor::run( for (auto& monitor : _monitors) { if (monitor->need_compute_result(epoch_id)) { monitor->compute_result(); - VLOG(2) << "[Monitor]" << _train_exe_name << ", monitor:" << monitor->get_name() - << ", result:" << monitor->format_result(); + ENVLOG_WORKER_MASTER_NOTICE("[Monitor]%s, monitor:%s, , result:%s", + _train_exe_name.c_str(), monitor->get_name().c_str(), monitor->format_result().c_str()); _trainer_context->monitor_ssm << _train_exe_name << ":" << monitor->get_name() << ":" << monitor->format_result() << ","; monitor->reset(); diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h old mode 100644 new mode 100755 index 6c78bfa92ee87c6149116560099b6ab082b15c0d..44e31544f902d3229fb72fc4af45376ca75fdaf8 --- a/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/cost_monitor.h @@ -46,7 +46,7 @@ private: uint64_t _total_time_ms; uint64_t _total_cnt; uint64_t _avg_time_ms; - uint32_t _compute_interval; + uint32_t _compute_interval; }; } // namespace feed diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h old mode 100644 new mode 100755 index fc5258ac97912583e8e33abf4406a904fd64d380..ab698130dd43215d1cfa3f75125558ce193c3c51 --- a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h @@ -5,6 +5,7 @@ #include "paddle/fluid/train/custom_trainer/feed/trainer_context.h" #include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" #include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" +#include "paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h" namespace paddle { namespace custom_trainer { diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc index 954c1011ec9edd0df0061a093fa19f96a980d938..41233b41366b09dff234a71c740d6cf836b44b6f 100755 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc @@ -77,6 +77,8 @@ int LearnerProcess::load_model(uint64_t epoch_id) { auto* fs = _context_ptr->file_system.get(); std::set loaded_table_set; auto model_dir = _context_ptr->epoch_accessor->checkpoint_path(); + paddle::platform::Timer timer; + timer.Start(); for (auto& executor : _executors) { const auto& table_accessors = executor->table_accessors(); for (auto& itr : table_accessors) { @@ -90,6 +92,7 @@ int LearnerProcess::load_model(uint64_t epoch_id) { auto scope = std::move(executor->fetch_scope()); CHECK(itr.second[0]->create(scope.get()) == 0); } else { + ENVLOG_WORKER_MASTER_NOTICE("Loading model %s", model_dir.c_str()); auto status = _context_ptr->ps_client()->load(itr.first, model_dir, std::to_string((int)ModelSaveWay::ModelSaveTrainCheckpoint)); CHECK(status.get() == 0) << "table load failed, id:" << itr.first; @@ -97,6 +100,8 @@ int LearnerProcess::load_model(uint64_t epoch_id) { loaded_table_set.insert(itr.first); } } + timer.Pause(); + ENVLOG_WORKER_MASTER_NOTICE("Finished loading model, cost:%f", timer.ElapsedSec()); return 0; } @@ -106,9 +111,7 @@ int LearnerProcess::run() { auto* epoch_accessor = _context_ptr->epoch_accessor.get(); uint64_t epoch_id = epoch_accessor->current_epoch_id(); - environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::NOTICE, - "Resume train with epoch_id:%d %s", epoch_id, _context_ptr->epoch_accessor->text(epoch_id).c_str()); - + ENVLOG_WORKER_MASTER_NOTICE("Resume train with epoch_id:%d %s", epoch_id, _context_ptr->epoch_accessor->text(epoch_id).c_str()); //尝试加载模型 or 初始化 CHECK(load_model(epoch_id) == 0); environment->barrier(EnvironmentRole::WORKER); @@ -125,19 +128,16 @@ int LearnerProcess::run() { std::string epoch_log_title = paddle::string::format_string( "train epoch_id:%d label:%s", epoch_id, epoch_accessor->text(epoch_id).c_str()); std::string data_path = paddle::string::to_string(dataset->epoch_data_path(epoch_id)); - + ENVLOG_WORKER_MASTER_NOTICE(" ==== begin %s ====", epoch_accessor->text(epoch_id).c_str()); //Step1. 等待样本ready { - environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::NOTICE, - "%s, wait data ready:%s", epoch_log_title.c_str(), data_path.c_str()); + ENVLOG_WORKER_MASTER_NOTICE(" %s, wait data ready:%s", epoch_log_title.c_str(), data_path.c_str()); while (dataset->epoch_data_status(epoch_id) != DatasetStatus::Ready) { sleep(30); dataset->pre_detect_data(epoch_id); - environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::NOTICE, - "data not ready, wait 30s"); + ENVLOG_WORKER_MASTER_NOTICE(" epoch_id:%d data not ready, wait 30s", epoch_id); } - environment->log(EnvironmentRole::WORKER, EnvironmentLogType::MASTER_LOG, EnvironmentLogLevel::NOTICE, - "Start %s, data is ready", epoch_log_title.c_str()); + ENVLOG_WORKER_MASTER_NOTICE(" Start %s, data is ready", epoch_log_title.c_str()); environment->barrier(EnvironmentRole::WORKER); } @@ -148,7 +148,7 @@ int LearnerProcess::run() { environment->barrier(EnvironmentRole::WORKER); paddle::platform::Timer timer; timer.Start(); - VLOG(2) << "Start executor:" << executor->train_exe_name(); + ENVLOG_WORKER_MASTER_NOTICE("Start executor:%s", executor->train_exe_name().c_str()); auto data_name = executor->train_data_name(); paddle::framework::Channel input_channel; if (backup_input_map.count(data_name)) { @@ -158,7 +158,7 @@ int LearnerProcess::run() { } input_channel = executor->run(input_channel, dataset->data_parser(data_name)); timer.Pause(); - VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost:" << timer.ElapsedSec(); + ENVLOG_WORKER_MASTER_NOTICE("End executor:%s, cost:%f", executor->train_exe_name().c_str(), timer.ElapsedSec()); // 等待异步梯度完成 _context_ptr->ps_client()->flush(); @@ -186,21 +186,22 @@ int LearnerProcess::run() { environment->is_master_node(EnvironmentRole::WORKER)) { paddle::platform::Timer timer; timer.Start(); - VLOG(2) << "Start shrink table"; + ENVLOG_WORKER_MASTER_NOTICE("Start shrink table"); for (auto& executor : _executors) { const auto& table_accessors = executor->table_accessors(); for (auto& itr : table_accessors) { CHECK(itr.second[0]->shrink() == 0); } } - VLOG(2) << "End shrink table, cost:" << timer.ElapsedSec(); + timer.Pause(); + ENVLOG_WORKER_MASTER_NOTICE("End shrink table, cost:%f", timer.ElapsedSec()); } environment->barrier(EnvironmentRole::WORKER); epoch_accessor->epoch_done(epoch_id); environment->barrier(EnvironmentRole::WORKER); } - + ENVLOG_WORKER_MASTER_NOTICE(" ==== end %s ====", epoch_accessor->text(epoch_id).c_str()); //Step4. Output Monitor && RunStatus //TODO }