From f7d671a6e4574d53f62c85afa16c6509966deda9 Mon Sep 17 00:00:00 2001 From: yaopenghui Date: Mon, 26 Aug 2019 13:16:02 +0800 Subject: [PATCH] add auc monitor --- .../feed/common/runtime_environment.cc | 67 ++++++++ .../feed/common/runtime_environment.h | 4 + .../custom_trainer/feed/dataset/data_reader.h | 1 + .../feed/monitor/auc_monitor.cc | 162 ++++++++++++++++++ .../custom_trainer/feed/monitor/auc_monitor.h | 37 +++- .../custom_trainer/feed/monitor/monitor.h | 8 +- 6 files changed, 271 insertions(+), 8 deletions(-) create mode 100644 paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc index 83435853..0ffb7c92 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.cc @@ -5,6 +5,58 @@ namespace paddle { namespace custom_trainer { namespace feed { +template +struct mpi_type_trait { +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_DOUBLE; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_FLOAT; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_INT; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_UNSIGNED; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_LONG_LONG; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_UNSIGNED_LONG_LONG; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_LONG_LONG; + } +}; +template<> +struct mpi_type_trait { + static MPI_Datatype type() { + return MPI_UNSIGNED_LONG_LONG; + } +}; + RuntimeEnvironment::RuntimeEnvironment() {} RuntimeEnvironment::~RuntimeEnvironment() {} bool RuntimeEnvironment::is_master_node(EnvironmentRole role) { @@ -79,6 +131,15 @@ public: MPI_Bcast(ar.Buffer(), len, MPI_BYTE, root_id, node_info.mpi_comm); } + virtual double all_reduce_ele(double x) { + double tot; + MPI_Allreduce(&x, &tot, 1, mpi_type_trait::type(), MPI_SUM, MPI_COMM_WORLD); + return tot; + } + virtual void all_reduce_arr(double* x, int n) { + MPI_Allreduce(MPI_IN_PLACE, x, n, mpi_type_trait::type(), MPI_SUM, MPI_COMM_WORLD); + } + protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, EnvironmentLogLevel level, const std::string& log_str) { @@ -123,6 +184,12 @@ public: virtual void bcast(paddle::framework::BinaryArchive& ar, int root_id, EnvironmentRole role) { return; } + virtual double all_reduce_ele(double x) { + return x; + } + virtual void all_reduce_arr(double* x, int n) { + return; + } protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, EnvironmentLogLevel level, const std::string& log_str) { diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h index d0277483..1107dbef 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h @@ -67,6 +67,10 @@ public: virtual void barrier(EnvironmentRole role) = 0; //bcast 广播 virtual void bcast(paddle::framework::BinaryArchive& ar, int root_id, EnvironmentRole role) = 0; + //all_reduce sum element 规约元素 + virtual double all_reduce_ele(double x) = 0; + //all_reduce sum array 规约数组 + virtual void all_reduce_arr(double* x, int n) = 0; //接口只允许在主线程调用 End protected: virtual void print_log(EnvironmentRole role, EnvironmentLogType type, diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h index ea868f90..109bcbb0 100755 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h @@ -51,6 +51,7 @@ private: struct SampleInstance { std::string id; + std::vector predicts; std::vector labels; std::vector features; std::vector embedx; diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc new file mode 100644 index 00000000..cb472fe7 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.cc @@ -0,0 +1,162 @@ +#include "paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +int AucMonitor::initialize(const YAML::Node& config, std::shared_ptr context_ptr) { + Monitor::initialize(config, context_ptr); + _target_name = config["target"].as(); + _label_name = config["label"].as(); + _table_size = 1000000; + if (config["table_size"]) { + _table_size = config["table_size"].as(); + } + set_table_size(_table_size); + _compute_interval = 3600; + if (config["compute_interval"]) { + uint32_t interval = config["compute_interval"].as(); + if (interval != 3600 || interval != 86400) { + LOG(FATAL) << " AucMonitor config compute_interval just support hour: 3600 or day: 86400. "; + return -1; + } + _compute_interval = interval; + } +} + +void AucMonitor::add_data(int epoch_id, const Executor* executor, SampleInstance* instance, size_t num) { + if (executor == nullptr + || instance == nullptr + || instance->predicts.empty() + || instance->labels.empty() + || num <= 0 + || instance->predicts.size() < num + || instance->labels.size() < num) { + LOG(FATAL) << "AucMonitor add predict data is invalid, predicts or labels is empty, num[" << num << "]"; + return; + } + std::lock_guard lock(_mutex); + for (int i = 0; i < num; ++i) { + add_unlocked(instance->predicts[i], std::lround(instance->labels[i])); + } +} + +bool AucMonitor::need_compute_result(int epoch_id, EpochAccessor* accessor) { + CHECK(accessor != nullptr); + uint64_t epoch_time = accessor->epoch_timestamp(epoch_id); + if (epoch_time % _compute_interval != 0) { + return false; + } + return true; +} + +void AucMonitor::compute_result() { + double* table[2] = {&_table[0][0], &_table[1][0]}; + for (int i = 0; i < 2; i++) { + Monitor::_context_ptr->environment->all_reduce_arr(table[i], _table_size); + } + double area = 0; + double fp = 0; + double tp = 0; + for (int i = _table_size - 1; i >= 0; i--) { + double newfp = fp + table[0][i]; + double newtp = tp + table[1][i]; + area += (newfp - fp) * (tp + newtp) / 2; + fp = newfp; + tp = newtp; + } + _auc = area / (fp * tp); + _mae = Monitor::_context_ptr->environment->all_reduce_ele(_local_abserr) / (fp + tp); + _rmse = sqrt(Monitor::_context_ptr->environment->all_reduce_ele(_local_sqrerr) / (fp + tp)); + _rmse = sqrt(_rmse / (fp + tp)); + _actual_ctr = tp / (fp + tp); + _predicted_ctr = Monitor::_context_ptr->environment->all_reduce_ele(_local_pred) / (fp + tp); + _size = fp + tp; + calculate_bucket_error(); +} + +std::string AucMonitor::format_result() { + double copc = 0.0; + if (fabs(_predicted_ctr) > 1e-6) { + copc = _actual_ctr / _predicted_ctr; + } + char buf[10240]; + snprintf(buf, 10240 * sizeof(char), "%s: AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " + "Actual CTR=%.6f Predicted CTR=%.6f COPC=%.6f INS Count=%.0f", + Monitor::_name.c_str(), + _auc, + _bucket_error, + _mae, + _rmse, + _actual_ctr, + _predicted_ctr, + copc, + _size); + + return std::string(buf); +} + +void AucMonitor::add_unlocked(double pred, int label) { + CHECK(pred >= 0 && pred <= 1) << "pred[" << pred << "] outside of [0,1]"; + CHECK(label == 0 || label == 1) << "label[" << label << "] invalid"; + _table[label][std::min(int(pred * _table_size), _table_size - 1)]++; + _local_abserr += fabs(pred - label); + _local_sqrerr += (pred - label) * (pred - label); + _local_pred += pred; +} + +void AucMonitor::calculate_bucket_error() { + double last_ctr = -1; + double impression_sum = 0; + double ctr_sum = 0.0; + double click_sum = 0.0; + double error_sum = 0.0; + double error_count = 0; + double* table[2] = {&_table[0][0], &_table[1][0]}; + for (int i = 0; i < _table_size; i++) { + double click = table[1][i]; + double show = table[0][i] + table[1][i]; + double ctr = (double)i / _table_size; + if (fabs(ctr - last_ctr) > kMaxSpan) { + last_ctr = ctr; + impression_sum = 0.0; + ctr_sum = 0.0; + click_sum = 0.0; + } + impression_sum += show; + ctr_sum += ctr * show; + click_sum += click; + double adjust_ctr = ctr_sum / impression_sum; + double relative_error = sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum)); + if (relative_error < kRelativeErrorBound) { + double actual_ctr = click_sum / impression_sum; + double relative_ctr_error = fabs(actual_ctr / adjust_ctr - 1); + error_sum += relative_ctr_error * impression_sum; + error_count += impression_sum; + last_ctr = -1; + } + } + _bucket_error = error_count > 0 ? error_sum / error_count : 0.0; +} + +void AucMonitor::set_table_size(int table_size) { + CHECK(table_size >= 1); + _table_size = table_size; + for (int i = 0; i < 2; i++) { + _table[i] = std::vector(); + } + reset(); +} + +void AucMonitor::reset() { + for (int i = 0; i < 2; i++) { + _table[i].assign(_table_size, 0.0); + } + _local_abserr = 0; + _local_sqrerr = 0; + _local_pred = 0; +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h index fef362cb..aa2bd34c 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/auc_monitor.h @@ -1,5 +1,6 @@ #pragma once #include +#include //std::lround #include "paddle/fluid/train/custom_trainer/feed/monitor/monitor.h" namespace paddle { @@ -14,14 +15,13 @@ public: virtual ~AucMonitor() {} virtual int initialize(const YAML::Node& config, - std::shared_ptr context_ptr) { - Monitor::initialize(config, context_ptr); - //一些额外配置 对于AUC主要是target && label 信息 - return 0; - } + std::shared_ptr context_ptr) override; //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const Executor* executor); + virtual void add_data(int epoch_id, + const Executor* executor, + SampleInstance* instance, + size_t num); //是否开始结果统计 virtual bool need_compute_result(int epoch_id, EpochAccessor* accessor); @@ -31,6 +31,31 @@ public: virtual std::string format_result(); virtual void reset(); + +protected: + std::string _label_name; + std::string _target_name; + std::string _name; + std::string _output_var; + std::mutex _mutex; + double _local_abserr, _local_sqrerr, _local_pred; + double _auc; + double _mae; + double _rmse; + double _actual_ctr, _predicted_ctr; + double _size; + double _bucket_error; + int _table_size; + void add_unlocked(double pred, int label); + +private: + void calculate_bucket_error(); + void set_table_size(int table_size); + + uint32_t _compute_interval; + std::vector _table[2]; + static constexpr double kRelativeErrorBound = 0.05; + static constexpr double kMaxSpan = 0.01; }; } // namespace feed diff --git a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h index a7b9186c..d6dcdf38 100644 --- a/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h +++ b/paddle/fluid/train/custom_trainer/feed/monitor/monitor.h @@ -1,8 +1,10 @@ #pragma once #include +#include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h" #include "paddle/fluid/train/custom_trainer/feed/common/registerer.h" #include "paddle/fluid/train/custom_trainer/feed/trainer_context.h" #include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" namespace paddle { namespace custom_trainer { @@ -15,12 +17,13 @@ public: virtual int initialize(const YAML::Node& config, std::shared_ptr context_ptr) { - _name = conf["name"].as(); + _name = config["name"].as(); + _context_ptr = context_ptr; return 0; } //添加一项记录,统计内容Monitor自行从Executor按需获取 - virtual void add_data(int epoch_id, const Executor* executor) = 0; + virtual void add_data(int epoch_id, const Executor* executor, SampleInstance* instance, size_t num) = 0; //是否对于当前epoch_id进行结果统计 virtual bool need_compute_result(int epoch_id, EpochAccessor* accessor) = 0; @@ -37,6 +40,7 @@ public: protected: std::string _name; + std::shared_ptr _context_ptr; }; REGISTER_REGISTERER(Monitor); -- GitLab