提交 3d4e61ef 编写于 作者: X xiexionghang

fix code style

上级 d1e62a0a
......@@ -126,7 +126,7 @@ namespace feed {
case ModelSaveWay::ModelSaveInferenceBase:
return is_last_epoch(epoch_id);
case ModelSaveWay::ModelSaveTrainCheckpoint:
return ((epoch_id / SecondsPerHour) % 8) == 0;
return delta_id(epoch_id) % 8 == 0;
}
return false;
}
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
// Hide BLOG
......
......@@ -139,14 +139,13 @@ public:
ar.SetCursor(ar.Buffer());
MPI_Bcast(ar.Buffer(), len, MPI_BYTE, root_id, node_info.mpi_comm);
}
virtual double all_reduce_ele(double x) {
double tot = 0.0;
MPI_Allreduce(&x, &tot, 1, mpi_type_trait<double>::type(), MPI_SUM, MPI_COMM_WORLD);
return tot;
}
virtual void all_reduce_arr(double* x, int n) {
MPI_Allreduce(MPI_IN_PLACE, x, n, mpi_type_trait<double>::type(), MPI_SUM, MPI_COMM_WORLD);
virtual void all_reduce_in_place(double* x, int n, ReduceOperator op, EnvironmentRole role) {
auto& node_info = mpi_node_info(role);
if (op == ReduceOperator::SUM) {
MPI_Allreduce(MPI_IN_PLACE, x, n, MPI_DOUBLE, MPI_SUM, node_info.mpi_comm);
} else {
CHECK(false) << "unsupport operator";
}
}
protected:
......@@ -201,10 +200,7 @@ public:
virtual void bcast(paddle::framework::BinaryArchive& ar, int root_id, EnvironmentRole role) {
return;
}
virtual double all_reduce_ele(double x) {
return x;
}
virtual void all_reduce_arr(double* x, int n) {
virtual void all_reduce_in_place(double* x, int n, ReduceOperator op, EnvironmentRole role) {
return;
}
protected:
......
......@@ -27,7 +27,7 @@ enum class EnvironmentLogType {
ALL_LOG = 1 //所有节点都会对外输出
};
//保持该枚举值的连续递增,且ALL在尾部
// 保持该枚举值的连续递增,且ALL在尾部
enum class EnvironmentRole {
WORKER = 0, //训练Worker
PSERVER = 1, //参数服务器
......@@ -35,6 +35,11 @@ enum class EnvironmentRole {
ALL = 2 //所有角色,请保持在枚举尾部
};
// Reduce的操作类型
enum class ReduceOperator {
SUM = 0 //求和
};
class RuntimeEnvironment {
public:
RuntimeEnvironment();
......@@ -72,10 +77,15 @@ public:
virtual void barrier(EnvironmentRole role) = 0;
// bcast 广播
virtual void bcast(paddle::framework::BinaryArchive& ar, int root_id, EnvironmentRole role) = 0;
// all_reduce sum element 规约元素
virtual double all_reduce_ele(double x) = 0;
// all_reduce sum array 规约数组
virtual void all_reduce_arr(double* x, int n) = 0;
// 全局reduce操作, 返回reduce结果
virtual double all_reduce(double x, ReduceOperator op, EnvironmentRole role) {
double result = x;
all_reduce_in_place(&result, 1, op, role);
return result;
}
// 全局reduce,就地执行
virtual void all_reduce_in_place(double* x, int n,
ReduceOperator op, EnvironmentRole role) = 0;
// 接口只允许在主线程调用 End
protected:
virtual void print_log(EnvironmentRole role, EnvironmentLogType type,
......
-log_dir=log
-v=4
-v=2
-logbufsecs=0
-pslib_push_dense_merge_limit=1
-pslib_push_sparse_merge_limit=1
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/monitor/monitor.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
namespace paddle {
......@@ -63,11 +64,23 @@ int MultiThreadExecutor::initialize(YAML::Node exe_config,
}
}
// Monitor组件
for (const auto& monitor_config : _model_config["monitor"]) {
auto monitor_class = monitor_config["class"].as<std::string>();
auto* monitor_ptr = CREATE_INSTANCE(Monitor, monitor_class);
_monitors.emplace_back(monitor_ptr);
CHECK(monitor_ptr->initialize(monitor_config, context_ptr) == 0)
<< "Monitor init Failed, class:" << monitor_class;
}
return ret;
}
paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
paddle::framework::Channel<DataItem> input, const DataParser* parser) {
uint64_t epoch_id = _trainer_context->epoch_accessor->current_epoch_id();
// 输入流
PipelineOptions input_pipe_option;
input_pipe_option.need_hold_input_data = true;
input_pipe_option.batch_size = 1;
......@@ -97,6 +110,7 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
return 0;
});
// 训练流
PipelineOptions train_pipe_option;
train_pipe_option.input_output_rate = 1;
train_pipe_option.thread_num = _train_thread_num;
......@@ -108,19 +122,20 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
auto* executor = _thread_executors[thread_idx].get();
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
//CHECK(executor->run(in_items[out_idx].get()) == 0);
CHECK(executor->run(in_items[out_idx].get()) == 0);
out_items[out_idx] = std::move(in_items[out_idx]);
}
return 0;
});
// 梯度回传流
PipelineOptions gradient_pipe_option;
gradient_pipe_option.input_output_rate = 1;
gradient_pipe_option.thread_num = _push_gradient_thread_num;
gradient_pipe_option.buffer_batch_count = 2 * _train_thread_num;
auto gradient_pipe = std::make_shared<Pipeline<ScopePoolObj, int>>();
gradient_pipe->connect_to(*train_pipe, gradient_pipe_option,
[this] (ScopePoolObj* in_items, size_t in_num,
[epoch_id, this] (ScopePoolObj* in_items, size_t in_num,
int* out_items, size_t* out_num, size_t thread_idx) -> int {
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
......@@ -134,14 +149,27 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
out_items[out_idx] = _input_accessors[i]->
backward(samples, sample_num, scope);
}
for (auto& monitor : _monitors) {
monitor->add_data(epoch_id, this, samples, sample_num);
}
delete[] samples; // 所有pipe完成后,再回收sample
}
return 0;
});
// 等待训练流结束
std::vector<int> gradient_status;
while (gradient_pipe->read(gradient_status) > 0) {
}
// 输出相关监控&统计项
for (auto& monitor : _monitors) {
if (monitor->need_compute_result(epoch_id)) {
monitor->compute_result();
VLOG(2) << "[Monitor]" << _train_exe_name << ", monitor:" << monitor->get_name()
<< ", result:" << monitor->format_result();
monitor->reset();
}
}
return input_pipe->backup_channel();
}
......
......@@ -8,6 +8,7 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class Monitor;
typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj;
class MultiThreadExecutor {
......@@ -50,6 +51,7 @@ protected:
YAML::Node _model_config;
std::string _train_exe_name;
TrainerContext* _trainer_context = nullptr;
std::vector<std::shared_ptr<Monitor>> _monitors;
std::vector<std::shared_ptr<Executor>> _thread_executors;
std::vector<std::shared_ptr<DataInputAccessor>> _input_accessors;
std::map<uint32_t, std::vector<DataInputAccessor*>> _table_to_accessors;
......
......@@ -6,8 +6,8 @@ namespace feed {
int AucMonitor::initialize(const YAML::Node& config, std::shared_ptr<TrainerContext> context_ptr) {
Monitor::initialize(config, context_ptr);
_target_idx = config["target_idx"].as<int32_t>();
_target_name = config["target"].as<std::string>();
_label_name = config["label"].as<std::string>();
_table_size = 1000000;
if (config["table_size"]) {
_table_size = config["table_size"].as<int>();
......@@ -15,45 +15,34 @@ int AucMonitor::initialize(const YAML::Node& config, std::shared_ptr<TrainerCont
set_table_size(_table_size);
_compute_interval = 3600;
if (config["compute_interval"]) {
uint32_t interval = config["compute_interval"].as<uint32_t>();
if (interval != 3600 || interval != 86400) {
LOG(FATAL) << " AucMonitor config compute_interval just support hour: 3600 or day: 86400. ";
return -1;
}
_compute_interval = interval;
_compute_interval = config["compute_interval"].as<uint32_t>();
CHECK(_compute_interval % 60 == 0);
}
return 0;
}
void AucMonitor::add_data(int epoch_id, const Executor* executor, SampleInstance* instance, size_t num) {
if (executor == nullptr
|| instance == nullptr
|| instance->predicts.empty()
|| instance->labels.empty()
|| num <= 0
|| instance->predicts.size() < num
|| instance->labels.size() < num) {
LOG(FATAL) << "AucMonitor add predict data is invalid, predicts or labels is empty, num[" << num << "]";
return;
}
void AucMonitor::add_data(int epoch_id,
const MultiThreadExecutor* executor, SampleInstance* samples, size_t num) {
CHECK(num > 0);
std::lock_guard<std::mutex> lock(_mutex);
for (int i = 0; i < num; ++i) {
add_unlocked(instance->predicts[i], std::lround(instance->labels[i]));
auto& instance = samples[i];
add_unlocked(instance.predicts[_target_idx], std::lround(instance.labels[_target_idx]));
}
}
bool AucMonitor::need_compute_result(int epoch_id, EpochAccessor* accessor) {
CHECK(accessor != nullptr);
uint64_t epoch_time = accessor->epoch_timestamp(epoch_id);
if (epoch_time % _compute_interval != 0) {
return false;
}
return true;
bool AucMonitor::need_compute_result(int epoch_id) {
CHECK(_epoch_accessor != nullptr);
uint64_t epoch_time = _epoch_accessor->epoch_timestamp(epoch_id);
return epoch_time % _compute_interval == 0;
}
void AucMonitor::compute_result() {
auto* environment = Monitor::_context_ptr->environment.get();
double* table[2] = {&_table[0][0], &_table[1][0]};
for (int i = 0; i < 2; i++) {
Monitor::_context_ptr->environment->all_reduce_arr(table[i], _table_size);
environment->all_reduce_in_place(table[i],
_table_size, ReduceOperator::SUM, EnvironmentRole::WORKER);
}
double area = 0;
double fp = 0;
......@@ -66,11 +55,14 @@ void AucMonitor::compute_result() {
tp = newtp;
}
_auc = area / (fp * tp);
_mae = Monitor::_context_ptr->environment->all_reduce_ele(_local_abserr) / (fp + tp);
_rmse = sqrt(Monitor::_context_ptr->environment->all_reduce_ele(_local_sqrerr) / (fp + tp));
_mae = environment->all_reduce(_local_abserr,
ReduceOperator::SUM, EnvironmentRole::WORKER) / (fp + tp);
_rmse = sqrt(environment->all_reduce(_local_sqrerr,
ReduceOperator::SUM, EnvironmentRole::WORKER) / (fp + tp));
_rmse = sqrt(_rmse / (fp + tp));
_actual_ctr = tp / (fp + tp);
_predicted_ctr = Monitor::_context_ptr->environment->all_reduce_ele(_local_pred) / (fp + tp);
_predicted_ctr = environment->all_reduce(_local_pred,
ReduceOperator::SUM, EnvironmentRole::WORKER) / (fp + tp);
_size = fp + tp;
calculate_bucket_error();
}
......@@ -81,9 +73,8 @@ std::string AucMonitor::format_result() {
copc = _actual_ctr / _predicted_ctr;
}
char buf[10240];
snprintf(buf, 10240 * sizeof(char), "%s: AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "
snprintf(buf, 10240 * sizeof(char), "AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "
"Actual CTR=%.6f Predicted CTR=%.6f COPC=%.6f INS Count=%.0f",
Monitor::_name.c_str(),
_auc,
_bucket_error,
_mae,
......@@ -157,6 +148,8 @@ void AucMonitor::reset() {
_local_pred = 0;
}
REGIST_CLASS(Monitor, AucMonitor);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
......@@ -18,13 +18,11 @@ public:
std::shared_ptr<TrainerContext> context_ptr) override;
//添加一项记录,统计内容Monitor自行从Executor按需获取
virtual void add_data(int epoch_id,
const Executor* executor,
SampleInstance* instance,
size_t num);
virtual void add_data(int epoch_id, const MultiThreadExecutor* executor,
SampleInstance* samples, size_t num);
//是否开始结果统计
virtual bool need_compute_result(int epoch_id, EpochAccessor* accessor);
virtual bool need_compute_result(int epoch_id);
//统计当前结果
virtual void compute_result();
//基于现有结果,输出格式化的统计信息
......@@ -33,7 +31,7 @@ public:
virtual void reset();
protected:
std::string _label_name;
uint32_t _target_idx;
std::string _target_name;
std::string _name;
std::string _output_var;
......@@ -47,7 +45,7 @@ protected:
double _bucket_error;
int _table_size;
void add_unlocked(double pred, int label);
private:
void calculate_bucket_error();
void set_table_size(int table_size);
......
......@@ -9,6 +9,7 @@
namespace paddle {
namespace custom_trainer {
namespace feed {
class MultiThreadExecutor;
class Monitor {
public:
......@@ -19,14 +20,16 @@ public:
std::shared_ptr<TrainerContext> context_ptr) {
_name = config["name"].as<std::string>();
_context_ptr = context_ptr;
_epoch_accessor = _context_ptr->epoch_accessor.get();
return 0;
}
//添加一项记录,统计内容Monitor自行从Executor按需获取
virtual void add_data(int epoch_id, const Executor* executor, SampleInstance* instance, size_t num) = 0;
virtual void add_data(int epoch_id, const MultiThreadExecutor* executor,
SampleInstance* samples, size_t num) = 0;
//是否对于当前epoch_id进行结果统计
virtual bool need_compute_result(int epoch_id, EpochAccessor* accessor) = 0;
virtual bool need_compute_result(int epoch_id) = 0;
//统计当前结果
virtual void compute_result() = 0;
//基于现有结果,输出格式化的统计信息
......@@ -40,6 +43,7 @@ public:
protected:
std::string _name;
EpochAccessor* _epoch_accessor = nullptr;
std::shared_ptr<TrainerContext> _context_ptr;
};
......
......@@ -95,7 +95,8 @@ class ModelBuilder:
main_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(main_program, startup_program):
input_accessor, sparses, inputs, outputs = self._inference()
#TODO return dict maybe better ?
input_accessor, sparses, inputs, outputs, monitors = self._inference()
test_program = main_program.clone(for_test=True)
loss, labels = self._loss_function(*outputs)
......@@ -134,7 +135,11 @@ class ModelBuilder:
accessor["input"] = [
{"label_name": label.name, "shape": label.shape, "output_name": output.name }
for (label, output) in zip(labels, outputs) ]
for monitor in monitors:
idx = outputs.index(monitor['target'])
monitor["target_idx"] = idx
monitor["target"] = outputs[idx].name
model_desc_path = os.path.join(self._save_path, 'model.yaml')
model_desc = {
......@@ -142,7 +147,9 @@ class ModelBuilder:
'outputs': [{"name": var.name, "shape": var.shape} for var in outputs],
'labels': [{"name": var.name, "shape": var.shape} for var in labels],
'loss': loss.name,
'input_accessor': input_accessor
'input_accessor': input_accessor,
'monitor': monitors,
'aa_Attention' : 'Do Not Modify This File Manually, Unless You Really Know It'
}
with open(model_desc_path, 'w') as f:
......
......@@ -32,7 +32,7 @@ def inference():
net = fluid.layers.fc(net, 128, act='relu', name='fc_7')
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
return [cvm_input], [ctr_output]
return [], [], [cvm_input], [ctr_output], monitors
def loss_function(ctr_output):
"""
......
......@@ -46,7 +46,11 @@ def inference():
{ "class": "DenseInputAccessor", "input": "sums", "table_id": 2, "need_gradient": True, "async_pull": True},
{ "class": "LabelInputAccessor", "input": "labels"}
]
return accessors, [sparse_cvm], [cvm_input], [ctr_output]
monitors = [
{ "name": "epoch_auc", "class": "AucMonitor", "target": ctr_output, "compute_interval": 600 },
{ "name": "day_auc", "class": "AucMonitor", "target": ctr_output, "compute_interval": 86400 }
]
return accessors, [sparse_cvm], [cvm_input], [ctr_output], monitors
def loss_function(ctr_output):
"""
......
aa_Attention: Do Not Modify This File Manually, Unless You Really Know It
input_accessor:
- class: AbacusSparseUpdateAccessor
input:
......@@ -98,6 +99,10 @@ labels:
- name: label_ctr
shape: [-1, 1]
loss: loss_ctr
monitor:
- {class: AucMonitor, compute_interval: 3600, name: epoch_auc, target: ctr.tmp_2,
target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_2, target_idx: 0}
outputs:
- name: ctr.tmp_2
shape: [-1, 1]
aa_Attention: Do Not Modify This File Manually, Unless You Really Know It
input_accessor:
- class: AbacusSparseUpdateAccessor
input:
......@@ -79,6 +80,10 @@ labels:
- name: label_ctr
shape: [-1, 1]
loss: loss_ctr
monitor:
- {class: AucMonitor, compute_interval: 3600, name: epoch_auc, target: ctr.tmp_2,
target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_2, target_idx: 0}
outputs:
- name: ctr.tmp_2
shape: [-1, 1]
......@@ -38,8 +38,12 @@ def inference():
{ "class": "DenseInputAccessor", "input": "vars", "table_id": 3, "need_gradient": True, "async_pull": True},
{ "class": "LabelInputAccessor", "input": "labels"}
]
monitors = [
{ "name": "epoch_auc", "class": "AucMonitor", "target": ctr_output, "compute_interval": 600 },
{ "name": "day_auc", "class": "AucMonitor", "target": ctr_output, "compute_interval": 86400 }
]
return accessors, [sparse_cvm], [cvm_input], [ctr_output]
return accessors, [sparse_cvm], [cvm_input], [ctr_output], monitors
def loss_function(ctr_output):
"""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册