提交 cec6dc35 编写于 作者: X xiexionghang

pipeline trainer

......@@ -79,7 +79,6 @@ NEED_OUTPUT("baidu/third-party/mklml")
NEED_OUTPUT("baidu/third-party/openmpi")
OUTPUT('paddle/fluid/train/custom_trainer/feed/conf', '$OUT')
OUTPUT('paddle/fluid/train/custom_trainer/feed/scripts', '$OUT')
OUTPUT('paddle/fluid/train/custom_trainer/feed/so', '$OUT')
def UT_FILE(filename):
UT_DIR = 'paddle/fluid/train/custom_trainer/feed/unit_test'
......@@ -93,9 +92,21 @@ CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADD
CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -fopenmp -mavx -O0 -DNDEBUG '
CXXFLAGS_STR = '-std=c++11 ' + CFLAGS_STR
Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc', custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so']), Libs(module='baidu/third-party/openmpi', libs=['libmpi.so', 'libmpi_cxx.so', 'libopen-pal.so', 'libopen-rte.so']))
SharedLibrary("paddle_fluid_avx_mklml", PreBuilt(True))
application_args = [
CppFlags(CPPFLAGS_STR),
CFlags(CFLAGS_STR),
CxxFlags(CXXFLAGS_STR),
Libs(libs=['libpaddle_fluid_avx_mklml.so']),
Libs(module='baidu/third-party/openmpi', libs=['libmpi.so', 'libmpi_cxx.so', 'libopen-pal.so', 'libopen-rte.so']),
]
StaticLibrary("feed_trainer", Sources(custom_trainer_src), *application_args)
Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc'), WholeArchives("$OUT/lib/libfeed_trainer.a"), *application_args)
#feed unit test
UT_MAIN = UT_FILE('main.cc')
#UTApplication('unit_test', Sources(UT_MAIN, GLOB(UT_FILE('test_*.cc')), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so']))
# bug: shared librarys can not be found when run on server
UTApplication('unit_test', UTOnServer(False), Sources(UT_FILE('main.cc'), GLOB(UT_FILE('test_*.cc'))), WholeArchives("$OUT/lib/libfeed_trainer.a"), *application_args)
#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
int DenseInputAccessor::initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
CHECK(DataInputAccessor::initialize(config, context_ptr) == 0);
CHECK(config["input"] && config["input"].Type() == YAML::NodeType::Map);
_total_dim = 0;
_pull_request_num.store(0);
for (const auto& input : config["input"]) {
DenseInputVariable variable;
variable.name = input.first.as<std::string>();
variable.gradient_name = paddle::framework::GradVarName(variable.name);
variable.shape = input.second["shape"].as<std::vector<int>>();
variable.dim = 1;
for (int i = 0; i < variable.shape.size(); ++i) {
if (variable.shape[i] <= 0) {
variable.shape[i] = 1;
}
variable.dim *= variable.shape[i];
}
_total_dim += variable.dim;
}
if (config["async_pull"] && config["async_pull"].as<bool>()) {
_need_async_pull = true;
}
return 0;
}
// rpc拉取数据,需保证单线程运行
int32_t DenseInputAccessor::pull_dense(size_t table_id) {
float* data_buffer = NULL;
if (_data_buffer != nullptr) {
data_buffer = _data_buffer;
} else {
data_buffer = new float[_total_dim];
}
size_t data_buffer_idx = 0;
std::vector<paddle::ps::Region> regions;
for (auto& variable : _x_variables) {
regions.emplace_back(data_buffer + data_buffer_idx, variable.dim);
data_buffer_idx += variable.dim;
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto push_status = ps_client->pull_dense(regions.data(), regions.size(), table_id);
return push_status.get();
}
int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope) {
// 首次同步pull,之后异步pull
if (_data_buffer == nullptr) {
_pull_mutex.lock();
if (_data_buffer == nullptr) {
CHECK(pull_dense(_table_id) == 0);
_async_pull_thread = std::make_shared<std::thread>(
[this]() {
while (_need_async_pull) {
if (_pull_request_num > 0) {
pull_dense(_table_id);
_pull_request_num = 0;
} else {
usleep(50000);
}
}
});
}
_pull_mutex.unlock();
}
size_t data_buffer_idx = 0;
for (auto& variable : _x_variables) {
auto* shape_ptr = &(variable.shape[0]);
paddle::framework::DDim ddim(shape_ptr, variable.shape.size());
auto* tensor = ScopeHelper::resize_lod_tensor(scope, variable.name, ddim);
auto* var_data = tensor->mutable_data<float>(_trainer_context->cpu_place);
memcpy(var_data, _data_buffer + data_buffer_idx, variable.dim * sizeof(float));
data_buffer_idx += variable.dim;
}
if (_need_async_pull) {
++_pull_request_num;
}
return 0;
}
int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope) {
if (!_need_gradient) {
return 0;
}
size_t data_buffer_idx = 0;
std::vector<paddle::ps::Region> regions;
for (auto& variable : _x_variables) {
auto* tensor = scope->Var(variable.gradient_name)->
GetMutable<paddle::framework::LoDTensor>();
auto* grad_data = tensor->mutable_data<float>(_trainer_context->cpu_place);
regions.emplace_back(grad_data, variable.dim);
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id);
return push_status.get();
}
int32_t EbdVariableInputAccessor::forward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope) {
CHECK(_x_variables.size() == 1);
CHECK(_x_variables[0].shape.size() == 1);
auto& variable = _x_variables[0];
auto* tensor = ScopeHelper::resize_lod_tensor(scope,
variable.name, {num, variable.shape[0]});
auto* var_data = tensor->mutable_data<float>(_trainer_context->cpu_place);
for (size_t i = 0; i < num; ++i) {
auto& sample = samples[i];
CHECK(sample.embedx.size() == variable.dim);
memcpy(var_data, sample.embedx.data(), variable.dim * sizeof(float));
var_data += variable.dim;
}
return 0;
}
int32_t EbdVariableInputAccessor::backward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope) {
return 0;
}
REGIST_CLASS(DataInputAccessor, DenseInputAccessor);
REGIST_CLASS(DataInputAccessor, EbdVariableInputAccessor);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#pragma once
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/accessor.h"
#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h"
#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h"
namespace paddle {
namespace custom_trainer {
......@@ -12,20 +14,126 @@ public:
DataInputAccessor() {}
virtual ~DataInputAccessor() {}
virtual int initialize(const YAML::Node& config,
std::shared_ptr<TrainerContext> context_ptr);
virtual int initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
_trainer_context = context_ptr.get();
_table_id = config["table_id"].as<int>();
_need_gradient = config["need_gradient"].as<bool>();
return 0;
}
// 前向, 一般用于填充输入,在训练网络执行前调用
virtual int32_t forward(const SampleInstance* samples,
::paddle::framework::Scope* scope, size_t table_id, size_t num) = 0;
virtual int32_t forward(SampleInstance* samples, size_t num,
::paddle::framework::Scope* scope) = 0;
// 后向,一般用于更新梯度,在训练网络执行后调用
virtual int32_t backward(const SampleInstance* samples,
::paddle::framework::Scope* scope, size_t table_id, size_t num) = 0;
virtual int32_t backward(SampleInstance* samples, size_t num,
::paddle::framework::Scope* scope) = 0;
protected:
size_t _table_id;
bool _need_gradient = false;
TrainerContext* _trainer_context = nullptr;
};
REGIST_REGISTERER(DataInputAccessor);
struct SparseInputVariable {
size_t slot_dim;
size_t total_dim;
std::string name;
std::string gradient_name;
std::vector<int32_t> slot_idx;
std::vector<uint16_t> slot_list;
};
struct SparseVarRuntimeData {
uint32_t row_size;
uint32_t total_size;
float* variable_data;
float* gradient_data;
};
class BaseSparseInputAccessor : public DataInputAccessor {
public:
BaseSparseInputAccessor() {}
virtual ~BaseSparseInputAccessor() {}
virtual int initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr);
// forword过程的input填充
virtual int32_t forward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
// 取得单个SparseKey的PullValue, 实现单个SparseValue的填充
virtual void fill_input(float* var_data, const float* pull_raw,
paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0;
// 所有SparseValue填充完成后,调用,可进一步全局处理
virtual void post_process_input(float* var_data, SparseInputVariable&, SampleInstance*, size_t num) = 0;
// backward过程的梯度push
virtual int32_t backward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
// SparseGradValue会被依次调用,用于整理push的梯度
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0;
protected:
// 输入层列表
std::vector<SparseInputVariable> _x_variables;
};
struct DenseInputVariable {
size_t dim;
std::string name;
std::vector<int> shape;
std::string gradient_name;
};
class DenseInputAccessor : public DataInputAccessor {
public:
DenseInputAccessor() {}
virtual ~DenseInputAccessor() {
if (_data_buffer) {
delete[] _data_buffer;
}
_need_async_pull = false;
if (_async_pull_thread) {
_async_pull_thread->join();
}
}
virtual int initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr);
virtual int32_t forward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
virtual int32_t backward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
protected:
virtual int32_t pull_dense(size_t table_id);
size_t _total_dim = 0;
std::mutex _pull_mutex;
bool _need_async_pull = false;
float* _data_buffer = nullptr;
std::atomic<int> _pull_request_num;
std::vector<DenseInputVariable> _x_variables;
std::shared_ptr<std::thread> _async_pull_thread;
};
class EbdVariableInputAccessor : public DenseInputAccessor {
public:
EbdVariableInputAccessor() {}
virtual ~EbdVariableInputAccessor() {}
virtual int32_t forward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
virtual int32_t backward(SampleInstance* samples, size_t num,
paddle::framework::Scope* scope);
};
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#include <math.h>
#include <vector>
#include <utility>
#include "paddle/fluid/string/string_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class CommonSparseInputAccessor : public DataInputAccessor {
public:
CommonSparseInputAccessor() {}
virtual ~CommonSparseInputAccessor() {}
virtual int initialize(const YAML::Node& config,
std::shared_ptr<TrainerContext> context_ptr) {
CHECK(config["sparse_input"] && config["sparse_input"].Type() == YAML::NodeType::Map);
for (auto& input : config["sparse_input"]) {
std::pair<std::string, std::vector<uint16_t>> sparse_slots;
sparse_slots.first = input.first.as<std::string>();
std::string slots_str = input.second["slots"].as<std::string>();
std::vector<std::string> slots = paddle::string::split_string(slots_str, ",");
for (int i = 0; i < slots.size(); ++i) {
sparse_slots.second.push_back((uint16_t)atoi(slots[i].c_str()));
int BaseSparseInputAccessor::initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
CHECK(DataInputAccessor::initialize(config, context_ptr) == 0);
CHECK(config["input"] && config["input"].Type() == YAML::NodeType::Map);
for (const auto& input : config["input"]) {
SparseInputVariable variable;
variable.name = input.first.as<std::string>();
variable.gradient_name = paddle::framework::GradVarName(variable.name);
std::string slots_str = input.second["slots"].as<std::string>();
std::vector<std::string> slots = paddle::string::split_string(slots_str, ",");
variable.slot_idx.resize(UINT16_MAX, -1);
for (int i = 0; i < slots.size(); ++i) {
auto slot = (uint16_t)atoi(slots[i].c_str());
variable.slot_idx[slot] = i;
variable.slot_list.push_back(slot);
}
variable.slot_dim = input.second["slot_dim"].as<int>();
variable.total_dim = variable.slot_list.size() * variable.slot_dim;
_x_variables.push_back(variable);
}
return 0;
}
// 取sparse数据
int32_t BaseSparseInputAccessor::forward(SampleInstance* samples,
size_t num, paddle::framework::Scope* scope) {
CHECK(num > 0);
auto* ps_client = _trainer_context->pslib->ps_client();
auto* value_accessor = ps_client->table_accessor(_table_id);
size_t key_num = 0;
for (size_t i = 0; i < num; ++i) {
key_num += samples[i].features.size();
}
std::vector<uint64_t> keys(key_num);
float** pull_values = new float*[key_num];
auto pull_value_dim = value_accessor->select_dim();
// 填入sparseKey Request
size_t key_idx = 0;
for (size_t i = 0; i < num; ++i) {
auto& features = samples[i].features;
for (auto& feature_item : features) {
feature_item.weights.resize(pull_value_dim, 0.0);
keys[key_idx] = feature_item.feature_sign;
pull_values[key_idx++] = &(feature_item.weights[0]);
}
}
auto pull_status = ps_client->pull_sparse(pull_values, _table_id, keys.data(), key_num);
auto ret = pull_status.get();
delete[] pull_values;
if (ret != 0) {
return ret;
}
auto* runtime_data_ptr = new std::vector<SparseVarRuntimeData>();
auto& var_runtime_data = *runtime_data_ptr;
var_runtime_data.resize(_x_variables.size());
int64_t runtime_data_for_scope = (int64_t)runtime_data_ptr;
ScopeHelper::fill_value(scope, _trainer_context->cpu_place,
"sparse_runtime_data", runtime_data_for_scope);
// Variable空间初始化
for (size_t i = 0; i < _x_variables.size(); ++i) {
const auto& variable = _x_variables[i];
var_runtime_data[i].row_size = num;
var_runtime_data[i].total_size = num * variable.total_dim;
auto* tensor = ScopeHelper::resize_lod_tensor(
scope, variable.name, {num, variable.total_dim});
auto* grad_tensor = ScopeHelper::resize_lod_tensor(
scope, variable.gradient_name, {num, variable.total_dim});
var_runtime_data[i].variable_data = tensor->mutable_data<float>(_trainer_context->cpu_place);
var_runtime_data[i].gradient_data = grad_tensor->mutable_data<float>(_trainer_context->cpu_place);
memset((void*) var_runtime_data[i].variable_data, 0, var_runtime_data[i].total_size * sizeof(float));
memset((void*) var_runtime_data[i].gradient_data, 0, var_runtime_data[i].total_size * sizeof(float));
}
// 参数填入Variable
for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) {
auto& features = samples[samp_idx].features;
for (auto& feature_item : features) {
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
auto slot_idx = variable.slot_idx[feature_item.slot_id];
if (slot_idx < 0) {
continue;
}
float* item_data = var_runtime_data[i].variable_data +
samp_idx * var_runtime_data[i].total_size + variable.slot_dim * slot_idx;
fill_input(item_data, &(feature_item.weights[0]), *value_accessor, variable, samples[samp_idx]);
}
}
return 0;
}
// Variable后置处理
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
post_process_input(var_runtime_data[i].variable_data, variable, samples, num);
}
return 0;
}
// 取sparse数据
virtual int32_t forward(const SampleInstance* samples,
::paddle::framework::Scope* scope, size_t table_id, size_t num) {
// pull
// 更新spare数据
int32_t BaseSparseInputAccessor::backward(SampleInstance* samples,
size_t num, paddle::framework::Scope* scope) {
int64_t runtime_data_for_scope = *ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "sparse_runtime_data");
auto* runtime_data_ptr = (std::vector<SparseVarRuntimeData>*)runtime_data_for_scope;
auto& var_runtime_data = *runtime_data_ptr;
DoneGuard gurad([runtime_data_ptr](){
delete runtime_data_ptr;
});
if (!_need_gradient) {
return 0;
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto* value_accessor = ps_client->table_accessor(_table_id);
// 更新spare数据
virtual int32_t backward(const SampleInstance* samples,
::paddle::framework::Scope* scope, size_t table_id, size_t num) {
return 0;
}
size_t key_num = 0;
for (size_t i = 0; i < num; ++i) {
key_num += samples[i].features.size();
}
std::vector<uint64_t> keys(key_num);
float** push_values = new float*[key_num];
auto push_value_dim = value_accessor->update_dim();
size_t key_idx = 0;
for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) {
auto& features = samples[samp_idx].features;
for (auto& feature_item : features) {
feature_item.gradients.resize(push_value_dim, 0.0);
for (size_t i = 0; i < _x_variables.size(); ++i) {
auto& variable = _x_variables[i];
auto slot_idx = variable.slot_idx[feature_item.slot_id];
if (slot_idx < 0) {
continue;
}
const float* grad_data = var_runtime_data[i].gradient_data +
samp_idx * var_runtime_data[i].total_size + variable.slot_dim * slot_idx;
fill_gradient(&(feature_item.gradients[0]), grad_data,
*value_accessor, variable, samples[samp_idx]);
keys[key_idx] = feature_item.feature_sign;
push_values[key_idx++] = &(feature_item.gradients[0]);
}
}
}
auto push_status = ps_client->push_sparse(_table_id,
keys.data(), (const float**)push_values, key_num);
auto ret = push_status.get();
delete[] push_values;
return ret;
}
protected:
class AbacusSparseJoinAccessor : public BaseSparseInputAccessor {
public:
AbacusSparseJoinAccessor() {}
virtual ~AbacusSparseJoinAccessor() {}
virtual void fill_input(float* var_data, const float* pull_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
for (size_t i = 0; i < variable.slot_dim; ++i) {
var_data[i] += pull_raw[i];
}
}
virtual void post_process_input(float* var_data,
SparseInputVariable& variable, SampleInstance* samples, size_t num) {
for (size_t i = 0; i < num * variable.slot_list.size(); ++i) {
var_data[0] = log(var_data[0] + 1); // show
var_data[1] = log(var_data[1] + 1) - var_data[0]; // ctr
var_data += variable.slot_dim;
}
}
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
// join阶段不回填梯度
CHECK(false);
return;
}
};
REGIST_CLASS(DataInputAccessor, AbacusSparseJoinAccessor);
// 输入层列表
// <data_name, slot_id_list>
std::vector<std::pair<std::string, std::vector<uint16_t> > > _x_variables;
class AbacusSparseUpdateAccessor : public BaseSparseInputAccessor {
public:
AbacusSparseUpdateAccessor() {}
virtual ~AbacusSparseUpdateAccessor() {}
virtual void fill_input(float* var_data, const float* pull_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
for (size_t i = 0; i < variable.slot_dim; ++i) {
var_data[i] += pull_raw[i + 2];
}
}
virtual void post_process_input(float* var_data,
SparseInputVariable& variable, SampleInstance* samples, size_t num) {
return;
}
virtual void fill_gradient(float* push_value, const float* gradient_raw,
paddle::ps::ValueAccessor& value_accessor,
SparseInputVariable& variable, SampleInstance& sample) {
push_value[0] += 1;
push_value[1] += sample.labels[0];
for (size_t i = 0; i < variable.slot_dim; ++i) {
push_value[i + 2] += gradient_raw[i];
}
return;
}
};
REGIST_CLASS(DataInputAccessor, AbacusSparseUpdateAccessor);
} // namespace feed
} // namespace custom_trainer
......
......@@ -6,12 +6,22 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class DoneGuard {
public:
DoneGuard(std::function<void()> func) : _func(func) {}
virtual ~DoneGuard() { _func(); }
private:
std::function<void()> _func;
};
class PipelineOptions {
public:
PipelineOptions() = default;
uint32_t buffer_data_num = 400 ; //缓冲区数据个数,需大于batch_size
uint32_t batch_size = 100 ; //从pipe读数据的batch大小
bool need_hold_input_data = false; //是否保存input流数据,否则消费后释放
uint32_t batch_size = 10; // pipe输出的batch大小
uint32_t thread_num = 1; // converter的并发线程数
float input_output_rate = 1; // 输入/输出 qps流量比
uint32_t buffer_batch_count = 4; // pipe预存count组batch数据
bool need_hold_input_data = false; // 是否保存input流数据,否则消费后释放
};
/*
......@@ -29,7 +39,8 @@ public:
Pipeline() {}
Pipeline(Pipeline&&) = delete;
Pipeline(const Pipeline&) = delete;
typedef std::function<int(const TypeIn*, TypeOut*, size_t num)> PipeDataConverter;
typedef std::function<int(TypeIn*, size_t in_num,
TypeOut*, size_t* out_num, size_t thread_idx)> PipeDataConverter;
int initialize(const PipelineOptions& options,
::paddle::framework::Channel<TypeIn> input_channel,
......@@ -42,18 +53,12 @@ public:
_converter = data_converter;
_input_channel = input_channel;
_output_channel = ::paddle::framework::MakeChannel<TypeOut>();
auto batch_size = options.batch_size;
auto buffer_data_num = options.buffer_data_num;
_input_channel->SetBlockSize(batch_size);
_output_channel->SetBlockSize(batch_size);
_input_data_buffer.resize(buffer_data_num);
_output_data_buffer.resize(buffer_data_num);
if (buffer_data_num / batch_size < 3) {
buffer_data_num = batch_size * 3;
}
buffer_data_num = (buffer_data_num / batch_size) * batch_size;
_output_channel->SetCapacity(buffer_data_num);
_output_channel->SetBlockSize(options.batch_size);
size_t input_batch_size = options.batch_size * options.input_output_rate;
_input_channel->SetBlockSize(input_batch_size);
_input_data_buffer.resize(input_batch_size * options.buffer_batch_count);
_output_data_buffer.resize(options.batch_size * options.buffer_batch_count);
_output_channel->SetCapacity(_output_data_buffer.size());
CHECK(_input_channel != nullptr) << " Input Channel is null";
_convert_thread = std::make_shared<std::thread>([this](){
async_convert_data();
......@@ -63,7 +68,9 @@ public:
template <class PreTypeIn>
int connect_to(Pipeline<PreTypeIn, TypeIn>& pre_pipeline,
PipeDataConverter data_converter) {
PipelineOptions& options, PipeDataConverter data_converter) {
// 保证全局batch一致
options.batch_size = pre_pipeline.options().batch_size / options.input_output_rate;
return initialize(pre_pipeline.options(), pre_pipeline.output_chnnel(), data_converter);
}
......@@ -87,24 +94,26 @@ public:
inline ::paddle::framework::Channel<TypeOut> output_chnnel() {
return _output_channel;
}
// 返回对input_channel的消费备份
inline ::paddle::framework::Channel<TypeIn> backup_channel() {
return _input_channel_backup;
}
private:
void async_convert_data() {
size_t convete_batch_size = _input_data_buffer.size() / 4;
if (convete_batch_size < _options.batch_size * 3) {
convete_batch_size = 3 * _options.batch_size;
}
convete_batch_size = (convete_batch_size / _options.batch_size) * _options.batch_size;
size_t input_batch_size = _options.batch_size * _options.input_output_rate;
while (!_is_read_end) {
while (_output_channel->Size() < _input_data_buffer.size()) {
size_t read_size = _input_channel->
Read(convete_batch_size, &_input_data_buffer[0]);
Read(input_batch_size, &_input_data_buffer[0]);
if (read_size == 0) {
_is_read_end = true;
break;
}
CHECK(_converter(&_input_data_buffer[0], &_output_data_buffer[0],
read_size) == 0) << "Data Converter Do Failed";
_output_channel->WriteMove(read_size, &_output_data_buffer[0]);
size_t write_size = 0;
CHECK(_converter(&_input_data_buffer[0], read_size,
&_output_data_buffer[0], &write_size, 0) == 0) << "Data Converter Do Failed";
_output_channel->WriteMove(write_size, &_output_data_buffer[0]);
if (_options.need_hold_input_data) {
_input_channel_backup->WriteMove(read_size, &_input_data_buffer[0]);
}
......
......@@ -15,8 +15,6 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class paddle::ps::PSEnvironment;
enum class EnvironmentLogLevel {
FATAL = 0,
ERROR = 1,
......
#pragma once
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/lod_tensor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class ScopeHelper {
public:
//直接取var
template <class T>
static const T& var(paddle::framework::Scope* scope, const std::string& name) {
return scope->Var(name)->Get<T>();
}
template <class T>
static T* mutable_var(paddle::framework::Scope* scope, const std::string& name) {
return scope->Var(name)->GetMutable<T>();
}
template <class T>
static T* resize_variable(paddle::framework::Scope* scope,
const std::string& name, const paddle::framework::DDim& dim) {
auto* tensor = scope->Var(name)->GetMutable<T>();
tensor->Resize(dim);
return tensor;
}
static paddle::framework::LoDTensor* resize_lod_tensor(
paddle::framework::Scope* scope,
const std::string& name, const paddle::framework::DDim& dim) {
return resize_variable<paddle::framework::LoDTensor>(scope, name, dim);
}
template <class T>
static void fill_value(paddle::framework::Scope* scope,
paddle::platform::Place place, const std::string& name, T& value) {
auto* tensor = resize_variable<paddle::framework::Tensor>(scope, name, { 1 });
T* data = tensor->mutable_data<T>(place);
*data = value;
return;
}
template <class T>
static T* get_value(paddle::framework::Scope* scope,
paddle::platform::Place place, const std::string& name) {
auto* tensor = scope->Var(name)->GetMutable<paddle::framework::Tensor>();
return tensor->mutable_data<T>(place);
}
};
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
......@@ -131,53 +131,66 @@ public:
return read_all(file_list, data_channel);
}
virtual int read_all(const std::vector<std::string>& file_list, ::paddle::framework::Channel<DataItem> data_channel) {
auto deleter = [](framework::ChannelWriter<DataItem> *writer) {
if (writer) {
writer->Flush();
VLOG(3) << "writer auto flush";
}
delete writer;
};
std::unique_ptr<framework::ChannelWriter<DataItem>, decltype(deleter)> writer(new framework::ChannelWriter<DataItem>(data_channel.get()), deleter);
DataItem data_item;
int file_list_size = file_list.size();
const int file_list_size = file_list.size();
std::atomic<bool> is_failed(false);
const int max_threads = omp_get_max_threads();
std::vector<framework::ChannelWriter<DataItem>> writers; // writer is not thread safe
writers.reserve(max_threads);
for (int i = 0; i < max_threads; ++i) {
writers.emplace_back(data_channel.get());
}
VLOG(5) << "file_list: " << string::join_strings(file_list, ' ');
#pragma omp parallel for
for (int i = 0; i < file_list_size; ++i) {
if (is_failed) {
continue;
}
const int thread_num = omp_get_thread_num();
framework::ChannelWriter<DataItem> *writer = nullptr;
if (thread_num < max_threads) {
writer = &writers[thread_num];
}
const auto& filepath = file_list[i];
if (!is_failed) {
std::shared_ptr<FILE> fin = _file_system->open_read(filepath, _pipeline_cmd);
if (fin == nullptr) {
VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd;
is_failed = true;
std::shared_ptr<FILE> fin = _file_system->open_read(filepath, _pipeline_cmd);
if (fin == nullptr) {
VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd;
is_failed = true;
continue;
}
char *buffer = nullptr;
size_t buffer_size = 0;
ssize_t line_len = 0;
while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) {
// 去掉行位回车
if (line_len > 0 && buffer[line_len - 1] == '\n') {
buffer[--line_len] = '\0';
}
// 忽略空行
if (line_len <= 0) {
continue;
}
char *buffer = nullptr;
size_t buffer_size = 0;
ssize_t line_len = 0;
while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) {
if (line_len > 0 && buffer[line_len - 1] == '\n') {
buffer[--line_len] = '\0';
}
if (line_len <= 0) {
continue;
}
if (_parser->parse(buffer, line_len, data_item) == 0) {
DataItem data_item;
if (_parser->parse(buffer, line_len, data_item) == 0) {
VLOG(5) << "parse data: " << data_item.id << " " << data_item.data << ", filename: " << filepath << ", thread_num: " << thread_num << ", max_threads: " << max_threads;
if (writer == nullptr) {
if (!data_channel->Put(std::move(data_item))) {
VLOG(2) << "fail to put data, thread_num: " << thread_num;
}
} else {
(*writer) << std::move(data_item);
}
}
if (buffer != nullptr) {
free(buffer);
buffer = nullptr;
buffer_size = 0;
}
if (ferror(fin.get()) != 0) {
VLOG(2) << "fail to read file: " << filepath;
is_failed = true;
continue;
}
}
if (buffer != nullptr) {
free(buffer);
buffer = nullptr;
buffer_size = 0;
}
if (ferror(fin.get()) != 0) {
VLOG(2) << "fail to read file: " << filepath;
is_failed = true;
continue;
}
if (_file_system->err_no() != 0) {
_file_system->reset_err_no();
......@@ -185,10 +198,14 @@ public:
continue;
}
}
writer->Flush();
if (!(*writer)) {
VLOG(2) << "fail when write to channel";
is_failed = true;
// omp end
for (int i = 0; i < max_threads; ++i) {
writers[i].Flush();
if (!writers[i]) {
VLOG(2) << "writer " << i << " is failed";
is_failed = true;
}
}
data_channel->Close();
return is_failed ? -1 : 0;
......
......@@ -20,11 +20,13 @@ class TrainerContext;
struct FeatureItem {
uint64_t feature_sign;
uint16_t slot_id;
std::vector<float> weights;
std::vector<float> gradients;
};
struct SampleInstance {
std::string id;
std::vector<float> lables;
std::vector<float> labels;
std::vector<FeatureItem> features;
std::vector<float> embedx;
};
......
......@@ -53,25 +53,9 @@ inline ::paddle::framework::Channel<DataItem> Dataset::fetch_data(
return _data_containers[data_name]->fetch(epoch_id);
}
SampleInstancePipe Dataset::fetch_sample(
const std::string& data_name, uint32_t batch_size, uint64_t epoch_id) {
inline const DataParser* Dataset::data_parser(const std::string& data_name) {
auto* data_container = _data_containers[data_name].get();
auto data_channel = data_container->fetch(epoch_id);
const auto* data_parser = data_container->data_parser();
PipelineOptions options;
options.batch_size = batch_size;
options.need_hold_input_data = true;
options.buffer_data_num = batch_size * 10;
SampleInstancePipe pipe = make_sample_instance_channel();
pipe->initialize(options, data_channel,
[data_parser] (const DataItem* data, SampleInstance* sample, size_t num) -> int {
int ret = 0;
for (int i = 0; i < num; ++i, ++data, ++sample) {
ret |= data_parser->parse_to_sample(*data, *sample);
}
return ret;
});
return pipe;
return data_container->data_parser();
}
......
......@@ -33,10 +33,9 @@ public:
virtual ::paddle::framework::Channel<DataItem> fetch_data(
const std::string& data_name, uint64_t epoch_id);
//以管道形式返回标准样本流,管道内会对数据做异步转换
virtual SampleInstancePipe fetch_sample(
const std::string& data_name, uint32_t batch_size, uint64_t epoch_id);
//获取DataItem解析器
virtual const DataParser* data_parser(const std::string& data_name);
private:
std::unordered_map<std::string, std::shared_ptr<DatasetContainer>> _data_containers;
};
......
......@@ -54,8 +54,6 @@ public:
paddle::framework::InitDevices(false);
if (exe_config["num_threads"]) {
paddle::platform::SetNumThreads(exe_config["num_threads"].as<int>());
} else {
paddle::platform::SetNumThreads(1);
}
if (!exe_config["startup_program"] ||
......@@ -66,13 +64,11 @@ public:
try {
_context.reset(new SimpleExecutor::Context(context_ptr->cpu_place));
auto startup_program = Load(&_context->executor, exe_config["startup_program"].as<std::string>());
if (startup_program == nullptr) {
_context->startup_program = Load(&_context->executor, exe_config["startup_program"].as<std::string>());
if (_context->startup_program == nullptr) {
VLOG(2) << "fail to load startup_program: " << exe_config["startup_program"].as<std::string>();
return -1;
}
_context->executor.Run(*startup_program, this->scope(), 0, false, true);
_context->main_program = Load(&_context->executor, exe_config["main_program"].as<std::string>());
if (_context->main_program == nullptr) {
......@@ -80,7 +76,6 @@ public:
return -1;
}
_context->prepare_context = _context->executor.Prepare(*_context->main_program, 0);
_context->executor.CreateVariables(*_context->main_program, this->scope(), 0);
} catch (::paddle::platform::EnforceNotMet& err) {
VLOG(2) << err.what();
_context.reset(nullptr);
......@@ -89,18 +84,23 @@ public:
return 0;
}
virtual int run() {
virtual int initialize_scope(::paddle::framework::Scope* scope) {
_context->executor.Run(*_context->startup_program, scope, 0, false, true);
_context->executor.CreateVariables(*_context->main_program, scope, 0);
return 0;
}
virtual int run(::paddle::framework::Scope* scope) {
if (_context == nullptr) {
VLOG(2) << "need initialize before run";
return -1;
}
try {
_context->executor.RunPreparedContext(_context->prepare_context.get(), this->scope(),
_context->executor.RunPreparedContext(_context->prepare_context.get(), scope,
false, /* don't create local scope each time*/
false /* don't create variable each time */);
// For some other vector like containers not cleaned after each batch.
_context->tensor_array_batch_cleaner.CollectNoTensorVars(this->scope());
_context->tensor_array_batch_cleaner.CollectNoTensorVars(scope);
_context->tensor_array_batch_cleaner.ResetNoTensorVars();
} catch (::paddle::platform::EnforceNotMet& err) {
VLOG(2) << err.what();
......@@ -115,6 +115,7 @@ protected:
const ::paddle::platform::Place& place;
::paddle::framework::Executor executor;
::std::unique_ptr<::paddle::framework::ProgramDesc> main_program;
::std::unique_ptr<::paddle::framework::ProgramDesc> startup_program;
::std::unique_ptr<framework::ExecutorPrepareContext> prepare_context;
details::TensorArrayBatchCleaner tensor_array_batch_cleaner;
};
......
......@@ -13,32 +13,16 @@ public:
Executor() {}
virtual ~Executor() {}
//初始化,包括进行训练网络&配置加载工作
// 初始化,包括进行训练网络&配置加载工作
virtual int initialize(YAML::Node exe_config,
std::shared_ptr<TrainerContext> context_ptr) = 0;
//scope 可用于填充&取 var
virtual ::paddle::framework::Scope* scope() {
return &_scope;
}
//直接取var
template <class T>
const T& var(const std::string& name) {
return _scope.Var(name)->Get<T>();
}
template <class T>
T* mutable_var(const std::string& name) {
return _scope.Var(name)->GetMutable<T>();
}
// 初始化scope, 后续反复执行训练,不再初始化
virtual int initialize_scope(::paddle::framework::Scope* scope) = 0;
// 执行训练
virtual int run(::paddle::framework::Scope* scope) = 0;
//执行训练
virtual int run() = 0;
virtual bool is_dump_all_model() {
return false;
}
protected:
::paddle::framework::Scope _scope;
};
REGIST_REGISTERER(Executor);
......
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
int MultiThreadExecutor::initialize(YAML::Node exe_config,
std::shared_ptr<TrainerContext> context_ptr) {
int ret = 0;
_trainer_context = context_ptr.get();
_train_data_name = exe_config["train_data_name"].as<std::string>();
_train_batch_size = exe_config["train_batch_size"].as<int>();
_input_parse_thread_num = exe_config["input_parse_thread_num"].as<int>();
_push_gradient_thread_num = exe_config["push_gradient_thread_num "].as<int>();
_train_thread_num = exe_config["train_thread_num"].as<int>();
_need_dump_all_model = exe_config["need_dump_all_model"].as<bool>();
CHECK(_train_thread_num > 0 && _train_batch_size > 0);
_thread_executors.resize(_train_thread_num);
auto e_class = exe_config["class"].as<std::string>();
_train_exe_name = exe_config["name"].as<std::string>();
omp_set_num_threads(_train_thread_num);
#pragma omp parallel for
for (int i = 0; i < _train_thread_num; ++i) {
auto* e_ptr = CREATE_INSTANCE(Executor, e_class);
_thread_executors[i].reset(e_ptr);
if (e_ptr->initialize(exe_config, context_ptr) != 0) {
ret = -1;
}
}
CHECK(ret == 0);
_scope_obj_pool.config(
[this]() -> ::paddle::framework::Scope* {
auto* scope = new ::paddle::framework::Scope();
_thread_executors[0]->initialize_scope(scope);
return scope;
}, _train_thread_num * 8);
std::string model_config_path = _trainer_context->file_system->path_join(
"./model", string::format_string("%s.yaml", _train_exe_name.c_str()));
CHECK(_trainer_context->file_system->exists(model_config_path))
<< "miss model config file:" << model_config_path;
_model_config = YAML::LoadFile(model_config_path);
_input_accessors.resize(_model_config["input_accessor"].size());
for (const auto& accessor_config : _model_config["input_accessor"]) {
auto accessor_class = accessor_config["class"].as<std::string>();
_input_accessors.emplace_back(CREATE_INSTANCE(DataInputAccessor, accessor_class));
CHECK(_input_accessors.back()->initialize(accessor_config, context_ptr) == 0)
<< "InputAccessor init Failed, class:" << accessor_class;
}
return ret;
}
paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
paddle::framework::Channel<DataItem> input, const DataParser* parser) {
PipelineOptions input_pipe_option;
input_pipe_option.need_hold_input_data = true;
input_pipe_option.batch_size = _train_batch_size;
input_pipe_option.thread_num = _input_parse_thread_num;
input_pipe_option.input_output_rate = _train_batch_size;
input_pipe_option.buffer_batch_count = _train_thread_num;
auto input_pipe = std::make_shared<Pipeline<DataItem, ScopePoolObj>>();
input_pipe->initialize(input_pipe_option, input,
[this, parser](DataItem* item, size_t item_num,
ScopePoolObj* scope, size_t* scope_num, size_t thread_idx) -> int {
*scope_num = 1;
auto scope_obj = _scope_obj_pool.get();
auto* samples = new SampleInstance[item_num];
for (size_t i = 0; i <item_num; ++i) {
CHECK(parser->parse_to_sample(item[i], samples[i]) == 0);
}
for (size_t i = 0; i < _input_accessors.size(); ++i) {
_input_accessors[i]->forward(samples, item_num, scope_obj.get());
}
int64_t data_for_scope = (int64_t)samples;
ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place,
"sample_data", data_for_scope);
data_for_scope = (int64_t)item_num;
ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place,
"sample_num", data_for_scope);
*scope = std::move(scope_obj);
return 0;
});
PipelineOptions train_pipe_option;
train_pipe_option.input_output_rate = 1;
train_pipe_option.thread_num = _train_thread_num;
train_pipe_option.buffer_batch_count = 2 * _train_thread_num;
auto train_pipe = std::make_shared<Pipeline<ScopePoolObj, ScopePoolObj>>();
train_pipe->connect_to(*input_pipe, train_pipe_option,
[this] (ScopePoolObj* in_items, size_t in_num,
ScopePoolObj* out_items, size_t* out_num, size_t thread_idx) -> int {
auto* executor = _thread_executors[thread_idx].get();
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
executor->run(in_items[out_idx].get());
out_items[out_idx] = std::move(in_items[out_idx]);
}
return 0;
});
PipelineOptions gradient_pipe_option;
gradient_pipe_option.input_output_rate = 1;
gradient_pipe_option.thread_num = _push_gradient_thread_num;
gradient_pipe_option.buffer_batch_count = 2 * _train_thread_num;
auto gradient_pipe = std::make_shared<Pipeline<ScopePoolObj, int>>();
gradient_pipe->connect_to(*train_pipe, gradient_pipe_option,
[this] (ScopePoolObj* in_items, size_t in_num,
int* out_items, size_t* out_num, size_t thread_idx) -> int {
size_t& out_idx = *out_num;
for (out_idx = 0; out_idx < in_num; ++out_idx) {
auto* scope = in_items[out_idx].get();
auto sample_num = *ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "sample_num");
auto* samples = (SampleInstance*)(*ScopeHelper::get_value<int64_t>(
scope, _trainer_context->cpu_place, "sample_data"));
for (size_t i = 0; i < _input_accessors.size(); ++i) {
out_items[out_idx] = _input_accessors[i]->
backward(samples, sample_num, scope);
}
delete[] samples; // 所有pipe完成后,再回收sample
}
return 0;
});
std::vector<int> gradient_status;
while (gradient_pipe->read(gradient_status) > 0) {
}
return input_pipe->backup_channel();
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#pragma once
#include <functional>
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class MultiThreadExecutor {
public:
MultiThreadExecutor() {}
virtual ~MultiThreadExecutor() {}
//初始化,包括进行训练网络&配置加载工作
virtual int initialize(YAML::Node exe_config,
std::shared_ptr<TrainerContext> context_ptr);
//执行训练
virtual paddle::framework::Channel<DataItem> run(
paddle::framework::Channel<DataItem> input, const DataParser* parser);
virtual bool is_dump_all_model() {
return _need_dump_all_model;
}
virtual const std::string& train_data_name() {
return _train_data_name;
}
protected:
std::string _train_data_name;
size_t _train_batch_size = 32;
size_t _train_thread_num = 12;
size_t _input_parse_thread_num = 10;
size_t _push_gradient_thread_num = 10;
bool _need_dump_all_model = false;
YAML::Node _model_config;
std::string _train_exe_name;
TrainerContext* _trainer_context = nullptr;
std::vector<std::shared_ptr<Executor>> _thread_executors;
std::vector<std::shared_ptr<DataInputAccessor>> _input_accessors;
paddle::ps::ObjectPool<::paddle::framework::Scope> _scope_obj_pool;
typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj;
};
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
namespace paddle {
......@@ -179,14 +165,14 @@ static int shell_popen_fork_internal(
if (child_end != child_std_end) {
if (dup2(child_end, child_std_end) != child_std_end) {
return -1;
exit(127);
}
close(child_end);
}
close_open_fds_internal();
if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) {
return -1;
exit(127);
}
exit(127);
#endif
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fcntl.h>
......
......@@ -14,23 +14,11 @@ namespace feed {
int LearnerProcess::initialize(std::shared_ptr<TrainerContext> context_ptr) {
int ret = Process::initialize(context_ptr);
auto& config = _context_ptr->trainer_config;
_train_thread_num = config["train_thread_num"].as<int>();
_threads_executor.resize(_train_thread_num);
if (config["executor"]) {
_executor_num = config["executor"].size();
omp_set_num_threads(_train_thread_num);
#pragma omp parallel for
for (int i = 0; i < _train_thread_num; ++i) {
_threads_executor[i].resize(_executor_num);
for (int e = 0; e < _executor_num; ++e) {
auto e_class = config["executor"][e]["class"].as<std::string>();
auto* e_ptr = CREATE_INSTANCE(Executor, e_class);
_threads_executor[i][e].reset(e_ptr);
if (e_ptr->initialize(config["executor"][e], context_ptr) != 0) {
ret = -1;
}
}
_executors.resize(config["executor"].size());
for (size_t i = 0; i < _executors.size(); ++i) {
_executors[i].reset(new MultiThreadExecutor());
CHECK(_executors[i]->initialize(config["executor"][i], context_ptr) == 0);
}
}
return 0;
......@@ -40,8 +28,7 @@ std::future<int> LearnerProcess::save_model(uint64_t epoch_id, int table_id, Mod
std::promise<int> p;
auto ret = p.get_future();
if (_context_ptr->epoch_accessor->need_save_model(epoch_id, way)) {
//TODO
//context_ptr->pslib_client()->save();
LOG(NOTICE) << "save table, table_id:" << table_id;
} else {
p.set_value(0);
}
......@@ -106,23 +93,22 @@ int LearnerProcess::run() {
//Step2. 运行训练网络
{
for (int i = 0; i < _executor_num; ++i) {
std::vector<std::shared_ptr<std::thread>> train_threads(_train_thread_num);
for (int thread_id = 0; thread_id < _train_thread_num; ++thread_id) {
train_threads[i].reset(new std::thread([this](int exe_idx, int thread_idx) {
auto* executor = _threads_executor[thread_idx][exe_idx].get();
run_executor(executor);
}, i, thread_id));
}
for (int i = 0; i < _train_thread_num; ++i) {
train_threads[i]->join();
}
std::map<std::string, paddle::framework::Channel<DataItem>> backup_input_map;
for (auto& executor : _executors) {
environment->barrier(EnvironmentRole::WORKER);
if (_threads_executor[0][i]->is_dump_all_model()) {
auto data_name = executor->train_data_name();
paddle::framework::Channel<DataItem> input_channel;
if (backup_input_map.count(data_name)) {
input_channel = backup_input_map[data_name];
} else {
input_channel = dataset->fetch_data(data_name, epoch_id);
}
input_channel = executor->run(input_channel, dataset->data_parser(data_name));
if (executor->is_dump_all_model()) {
already_dump_inference_model = true;
wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta);
}
backup_input_map[data_name] = input_channel;
environment->barrier(EnvironmentRole::WORKER);
}
}
......@@ -144,11 +130,6 @@ int LearnerProcess::run() {
return 0;
}
int LearnerProcess::run_executor(Executor* executor) {
//TODO
return 0;
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
......@@ -4,13 +4,11 @@
*/
#pragma once
#include "paddle/fluid/train/custom_trainer/feed/process/process.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
typedef std::vector<std::shared_ptr<Executor>> MultiExecutor;
class LearnerProcess : public Process {
public:
LearnerProcess() {}
......@@ -20,19 +18,13 @@ public:
virtual int initialize(std::shared_ptr<TrainerContext> context_ptr);
protected:
//同步保存所有模型
// 同步保存所有模型
virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way);
//异步保存指定模型
// 异步保存指定模型
virtual std::future<int> save_model(uint64_t epoch_id, int table_id, ModelSaveWay way);
//执行指定训练网络
virtual int run_executor(Executor* executor);
private:
int _executor_num = 0; //需要执行训练的网络个数
int _train_thread_num = 1;//并行训练线程数
std::vector<MultiExecutor> _threads_executor;
std::vector<std::shared_ptr<MultiThreadExecutor>> _executors;
};
} // namespace feed
......
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from __future__ import print_function, division
import os
import sys
import paddle
from paddle import fluid
import yaml
def print_help(this_name):
"""Print help
"""
dirname = os.path.dirname(this_name)
print("Usage: {} <network building filename> [model_dir]\n".format(this_name))
print(" example: {} {}".format(this_name, os.path.join(dirname, 'example.py')))
def inference_warpper(filename):
"""Build inference network(without loss and optimizer)
Args:
filename: path of file which defined real inference function
Returns:
list<Variable>: inputs
and
list<Variable>: outputs
"""
with open(filename, 'r') as f:
code = f.read()
compiled = compile(code, filename, 'exec')
scope = dict()
exec(compiled, scope)
return scope['inference']()
def main(argv):
"""Create programs
Args:
argv: arg list, length should be 2
"""
if len(argv) < 2 or not os.path.exists(argv[1]):
print_help(argv[0])
exit(1)
network_build_file = argv[1]
if len(argv) > 2:
model_dir = argv[2]
else:
model_dir = './model'
main_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(main_program, startup_program):
inputs, outputs = inference_warpper(network_build_file)
test_program = main_program.clone(for_test=True)
labels = list()
losses = list()
for output in outputs:
label = fluid.layers.data(name='label_' + output.name, shape=output.shape, dtype='float32')
loss = fluid.layers.square_error_cost(input=output, label=label)
loss = fluid.layers.mean(loss, name='loss_' + output.name)
labels.append(label)
losses.append(loss)
loss_all = fluid.layers.sum(losses)
optimizer = fluid.optimizer.SGD(learning_rate=1.0)
params_grads = optimizer.backward(loss_all)
if not os.path.exists(model_dir):
os.mkdir(model_dir)
programs = {
'startup_program': startup_program,
'main_program': main_program,
'test_program': test_program,
}
for save_path, program in programs.items():
with open(os.path.join(model_dir, save_path), 'w') as f:
f.write(program.desc.serialize_to_string())
model_desc_path = os.path.join(model_dir, 'model.yaml')
model_desc = {
'inputs': [{"name": var.name, "shape": var.shape} for var in inputs],
'outputs': [{"name": var.name, "shape": var.shape, "label_name": label.name, "loss_name": loss.name} for var, label, loss in zip(outputs, labels, losses)],
'loss_all': loss_all.name,
}
with open(model_desc_path, 'w') as f:
yaml.safe_dump(model_desc, f, encoding='utf-8', allow_unicode=True)
if __name__ == "__main__":
main(sys.argv)
#!/usr/bin/env python
#-*- coding:utf-8 -*-
"""
This is an example of network building
"""
from __future__ import print_function, division
import paddle
from paddle import fluid
def inference():
"""Build inference network(without loss and optimizer)
Returns:
list<Variable>: inputs
and
list<Variable>: outputs
"""
# TODO: build network here
cvm_input = fluid.layers.data(name='cvm_input', shape=[4488], dtype='float32')
net = cvm_input
net = fluid.layers.fc(net, 512, act='relu')
net = fluid.layers.fc(net, 256, act='relu')
net = fluid.layers.fc(net, 256, act='relu')
net = fluid.layers.fc(net, 128, act='relu')
net = fluid.layers.fc(net, 128, act='relu')
net = fluid.layers.fc(net, 128, act='relu')
net = fluid.layers.fc(net, 128, act='relu')
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
return [cvm_input], [ctr_output]
#include <iostream>
#include <fstream>
#include <gtest/gtest.h>
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
namespace {
const char feed_path[] = "paddle/fluid/train/custom_trainer/feed";
const char test_data_dir[] = "test_data";
const char main_program_path[] = "test_data/main_program";
const char startup_program_path[] = "test_data/startup_program";
const char model_desc_path[] = "test_data/model.yaml";
}
class CreateProgramsTest : public testing::Test
{
public:
static void SetUpTestCase()
{
std::unique_ptr<FileSystem> fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem"));
if (fs->exists("./scripts/create_programs.py")) {
shell_execute(string::format_string("python ./scripts/create_programs.py ./scripts/example.py %s", test_data_dir));
} else if (fs->exists(string::format_string("%s/scripts/create_programs.py", feed_path))) {
shell_execute(string::format_string("python %s/scripts/create_programs.py %s/scripts/example.py %s", feed_path, feed_path, test_data_dir));
}
}
static void TearDownTestCase()
{
std::unique_ptr<FileSystem> fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem"));
fs->remove(test_data_dir);
}
virtual void SetUp()
{
context_ptr.reset(new TrainerContext());
}
virtual void TearDown()
{
context_ptr = nullptr;
}
std::shared_ptr<TrainerContext> context_ptr;
};
TEST_F(CreateProgramsTest, example_network) {
std::unique_ptr<Executor> executor(CREATE_INSTANCE(Executor, "SimpleExecutor"));
ASSERT_NE(nullptr, executor);
auto config = YAML::Load(string::format_string("{thread_num: 2, startup_program: %s, main_program: %s}", startup_program_path, main_program_path));
auto model_desc = YAML::LoadFile(model_desc_path);
ASSERT_EQ(0, executor->initialize(config, context_ptr));
std::string input_name = "cvm_input";
ASSERT_TRUE(model_desc["inputs"]);
ASSERT_EQ(1, model_desc["inputs"].size());
ASSERT_TRUE(model_desc["inputs"][0]["name"]);
ASSERT_TRUE(model_desc["inputs"][0]["shape"]);
ASSERT_EQ(input_name, model_desc["inputs"][0]["name"].as<std::string>());
std::vector<int> input_shape = model_desc["inputs"][0]["shape"].as<std::vector<int>>(std::vector<int>());
ASSERT_EQ(2, input_shape.size());
ASSERT_EQ(-1, input_shape[0]);
ASSERT_EQ(4488, input_shape[1]);
ASSERT_TRUE(model_desc["loss_all"]);
auto loss_all_name = model_desc["loss_all"].as<std::string>();
ASSERT_TRUE(model_desc["outputs"]);
ASSERT_EQ(1, model_desc["outputs"].size());
ASSERT_TRUE(model_desc["outputs"][0]["name"]);
ASSERT_TRUE(model_desc["outputs"][0]["shape"]);
ASSERT_TRUE(model_desc["outputs"][0]["label_name"]);
ASSERT_TRUE(model_desc["outputs"][0]["loss_name"]);
auto ctr_output_label_name = model_desc["outputs"][0]["label_name"].as<std::string>();
auto ctr_output_loss_name = model_desc["outputs"][0]["loss_name"].as<std::string>();
auto ctr_output_name = model_desc["outputs"][0]["name"].as<std::string>();
std::vector<int> output_shape = model_desc["outputs"][0]["shape"].as<std::vector<int>>(std::vector<int>());
ASSERT_EQ(2, output_shape.size());
ASSERT_EQ(-1, output_shape[0]);
ASSERT_EQ(1, output_shape[1]);
paddle::framework::Scope scope;
executor->initialize_scope(&scope);
auto input_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, input_name);
auto label_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, ctr_output_label_name);
ASSERT_NE(nullptr, input_var);
ASSERT_NE(nullptr, label_var);
input_var->Resize({1, input_shape[1]});
auto input_data = input_var->mutable_data<float>(context_ptr->cpu_place);
ASSERT_NE(nullptr, input_data);
for (int i = 0; i < input_shape[1]; ++i) {
input_data[i] = 0.1;
}
label_var->Resize({1, 1});
auto label_data = label_var->mutable_data<float>(context_ptr->cpu_place);
ASSERT_NE(nullptr, label_data);
label_data[0] = 0.5;
ASSERT_EQ(0, executor->run(&scope));
auto loss_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, ctr_output_loss_name);
auto loss = loss_var.data<float>()[0];
auto loss_all_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, loss_all_name);
auto loss_all = loss_all_var.data<float>()[0];
auto ctr_output_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, ctr_output_name);
auto ctr_output = ctr_output_var.data<float>()[0];
std::cout << "loss: " << loss << std::endl;
std::cout << "ctr_output: " << ctr_output << std::endl;
ASSERT_NEAR(loss, loss_all, 1e-9);
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <iostream>
#include <fstream>
#include <gtest/gtest.h>
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <iostream>
#include <fstream>
#include <algorithm>
......@@ -39,6 +25,9 @@ class DataReaderOmpTest : public testing::Test {
public:
static void SetUpTestCase() {
std::unique_ptr<FileSystem> fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem"));
if (fs->exists(test_data_dir)) {
fs->remove(test_data_dir);
}
fs->mkdir(test_data_dir);
shell_set_verbose(true);
std_items.clear();
......@@ -92,6 +81,15 @@ public:
return true;
}
static void read_all(framework::Channel<DataItem>& channel, std::vector<DataItem>& items) {
channel->ReadAll(items);
// framework::ChannelReader<DataItem> reader(channel.get());
// DataItem data_item;
// while (reader >> data_item) {
// items.push_back(std::move(data_item));
// }
}
static bool is_same_with_std_items(const std::vector<DataItem>& items) {
return is_same(items, std_items);
}
......@@ -100,6 +98,14 @@ public:
return is_same(items, sorted_std_items);
}
static std::string to_string(const std::vector<DataItem>& items) {
std::string items_str = "";
for (const auto& item : items) {
items_str.append(item.id);
}
return items_str;
}
static std::vector<DataItem> std_items;
static std::vector<DataItem> sorted_std_items;
std::shared_ptr<TrainerContext> context_ptr;
......@@ -130,22 +136,16 @@ TEST_F(DataReaderOmpTest, LineDataReaderSingleThread) {
ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]);
}
int same_count = 0;
for (int i = 0; i < n_run; ++i) {
auto channel = framework::MakeChannel<DataItem>(128);
ASSERT_NE(nullptr, channel);
ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel));
std::vector<DataItem> items;
channel->ReadAll(items);
read_all(channel, items);
if (is_same_with_std_items(items)) {
++same_count;
}
ASSERT_TRUE(is_same_with_std_items(items));
}
// n_run 次都相同
ASSERT_EQ(n_run, same_count);
}
TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) {
......@@ -181,30 +181,32 @@ TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) {
omp_set_num_threads(4);
channel->SetBlockSize(1);
ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel));
std::vector<DataItem> items;
channel->ReadAll(items);
read_all(channel, items);
ASSERT_EQ(std_items_size, items.size());
if (is_same_with_std_items(items)) {
++same_count;
}
VLOG(5) << "before sort items: " << to_string(items);
std::sort(items.begin(), items.end(), [] (const DataItem& a, const DataItem& b) {
return a.id < b.id;
});
if (is_same_with_sorted_std_items(items)) {
++sort_same_count;
bool is_same_with_std = is_same_with_sorted_std_items(items);
if (!is_same_with_std) {
VLOG(5) << "after sort items: " << to_string(items);
}
// 排序后都是相同的
ASSERT_TRUE(is_same_with_std);
}
// n_run次有不同的(证明是多线程)
ASSERT_EQ(4, omp_get_max_threads());
ASSERT_GT(n_run, same_count);
// 但排序后都是相同的
ASSERT_EQ(n_run, sort_same_count);
}
} // namespace feed
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <iostream>
#include <fstream>
#include <gtest/gtest.h>
#include "paddle/fluid/train/custom_trainer/feed/trainer_context.h"
#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h"
#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
......@@ -62,6 +49,7 @@ public:
op->SetInput("X", {"x"});
op->SetOutput("Out", {"mean"});
op->CheckAttrs();
load_block->Var("mean");
std::ofstream fout(main_program_path, std::ios::out | std::ios::binary);
ASSERT_TRUE(fout);
fout << main_program->Proto()->SerializeAsString();
......@@ -105,14 +93,15 @@ TEST_F(SimpleExecutorTest, run) {
auto config = YAML::Load(string::format_string("{thread_num: 2, startup_program: %s, main_program: %s}", startup_program_path, main_program_path));
ASSERT_EQ(0, executor->initialize(config, context_ptr));
auto x_var = executor->mutable_var<::paddle::framework::LoDTensor>("x");
executor->mutable_var<::paddle::framework::LoDTensor>("mean");
paddle::framework::Scope scope;
executor->initialize_scope(&scope);
auto x_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, std::string("x"));
ASSERT_NE(nullptr, x_var);
int x_len = 10;
x_var->Resize({1, x_len});
auto x_data = x_var->mutable_data<float>(context_ptr->cpu_place);
x_var->Resize({1, x_len});
auto x_data = x_var->mutable_data<float>(context_ptr->cpu_place);
ASSERT_NE(nullptr, x_data);
std::cout << "x: ";
for (int i = 0; i < x_len; ++i) {
x_data[i] = i;
......@@ -120,9 +109,9 @@ TEST_F(SimpleExecutorTest, run) {
}
std::cout << std::endl;
ASSERT_EQ(0, executor->run());
ASSERT_EQ(0, executor->run(&scope));
auto mean_var = executor->var<::paddle::framework::LoDTensor>("mean");
auto mean_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, std::string("mean"));
auto mean = mean_var.data<float>()[0];
std::cout << "mean: " << mean << std::endl;
ASSERT_NEAR(4.5, mean, 1e-9);
......
......@@ -5,6 +5,10 @@ cp baidu_third-party_mklml/so/* so
rm -rf baidu_third-party_mklml
cp baidu_third-party_openmpi/so/* so
ln -s libmpi.so so/libmpi.so.0
ln -s libmpi_cxx.so so/libmpi_cxx.so.0
ln -s libopen-pal.so so/libopen-pal.so.0
ln -s libopen-rte.so so/libopen-rte.so.0
rm -rf baidu_third-party_openmpi
rm lib/libfake_paddle_proto.a
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册