diff --git a/BCLOUD b/BCLOUD index e797fdf1d7c69313543cd2d06c0c5766e81e346d..4687106f33f74c5405c2a99c1ef7fe4707142d6c 100644 --- a/BCLOUD +++ b/BCLOUD @@ -79,7 +79,6 @@ NEED_OUTPUT("baidu/third-party/mklml") NEED_OUTPUT("baidu/third-party/openmpi") OUTPUT('paddle/fluid/train/custom_trainer/feed/conf', '$OUT') OUTPUT('paddle/fluid/train/custom_trainer/feed/scripts', '$OUT') -OUTPUT('paddle/fluid/train/custom_trainer/feed/so', '$OUT') def UT_FILE(filename): UT_DIR = 'paddle/fluid/train/custom_trainer/feed/unit_test' @@ -93,9 +92,21 @@ CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADD CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -fopenmp -mavx -O0 -DNDEBUG ' CXXFLAGS_STR = '-std=c++11 ' + CFLAGS_STR -Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc', custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so']), Libs(module='baidu/third-party/openmpi', libs=['libmpi.so', 'libmpi_cxx.so', 'libopen-pal.so', 'libopen-rte.so'])) +SharedLibrary("paddle_fluid_avx_mklml", PreBuilt(True)) +application_args = [ + CppFlags(CPPFLAGS_STR), + CFlags(CFLAGS_STR), + CxxFlags(CXXFLAGS_STR), + Libs(libs=['libpaddle_fluid_avx_mklml.so']), + Libs(module='baidu/third-party/openmpi', libs=['libmpi.so', 'libmpi_cxx.so', 'libopen-pal.so', 'libopen-rte.so']), +] + +StaticLibrary("feed_trainer", Sources(custom_trainer_src), *application_args) + +Application('feed_trainer', Sources('paddle/fluid/train/custom_trainer/feed/main.cc'), WholeArchives("$OUT/lib/libfeed_trainer.a"), *application_args) #feed unit test -UT_MAIN = UT_FILE('main.cc') -#UTApplication('unit_test', Sources(UT_MAIN, GLOB(UT_FILE('test_*.cc')), custom_trainer_src), CppFlags(CPPFLAGS_STR), CFlags(CFLAGS_STR), CxxFlags(CXXFLAGS_STR), Libs(src_libs=['paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so'])) + +# bug: shared librarys can not be found when run on server +UTApplication('unit_test', UTOnServer(False), Sources(UT_FILE('main.cc'), GLOB(UT_FILE('test_*.cc'))), WholeArchives("$OUT/lib/libfeed_trainer.a"), *application_args) diff --git a/paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so b/lib/libpaddle_fluid_avx_mklml.so similarity index 100% rename from paddle/fluid/train/custom_trainer/feed/so/libpaddle_fluid_avx_mklml.so rename to lib/libpaddle_fluid_avx_mklml.so diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc new file mode 100644 index 0000000000000000000000000000000000000000..c74a679c9f3a9334615a66f99a4618e931bbdfaf --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/accessor/dense_input_accessor.cc @@ -0,0 +1,135 @@ +#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +int DenseInputAccessor::initialize(YAML::Node config, + std::shared_ptr context_ptr) { + CHECK(DataInputAccessor::initialize(config, context_ptr) == 0); + CHECK(config["input"] && config["input"].Type() == YAML::NodeType::Map); + _total_dim = 0; + _pull_request_num.store(0); + for (const auto& input : config["input"]) { + DenseInputVariable variable; + variable.name = input.first.as(); + variable.gradient_name = paddle::framework::GradVarName(variable.name); + variable.shape = input.second["shape"].as>(); + variable.dim = 1; + for (int i = 0; i < variable.shape.size(); ++i) { + if (variable.shape[i] <= 0) { + variable.shape[i] = 1; + } + variable.dim *= variable.shape[i]; + } + _total_dim += variable.dim; + } + if (config["async_pull"] && config["async_pull"].as()) { + _need_async_pull = true; + } + return 0; +} + +// rpc拉取数据,需保证单线程运行 +int32_t DenseInputAccessor::pull_dense(size_t table_id) { + float* data_buffer = NULL; + if (_data_buffer != nullptr) { + data_buffer = _data_buffer; + } else { + data_buffer = new float[_total_dim]; + } + + size_t data_buffer_idx = 0; + std::vector regions; + for (auto& variable : _x_variables) { + regions.emplace_back(data_buffer + data_buffer_idx, variable.dim); + data_buffer_idx += variable.dim; + } + auto* ps_client = _trainer_context->pslib->ps_client(); + auto push_status = ps_client->pull_dense(regions.data(), regions.size(), table_id); + return push_status.get(); +} + +int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope) { + // 首次同步pull,之后异步pull + if (_data_buffer == nullptr) { + _pull_mutex.lock(); + if (_data_buffer == nullptr) { + CHECK(pull_dense(_table_id) == 0); + _async_pull_thread = std::make_shared( + [this]() { + while (_need_async_pull) { + if (_pull_request_num > 0) { + pull_dense(_table_id); + _pull_request_num = 0; + } else { + usleep(50000); + } + } + }); + } + _pull_mutex.unlock(); + } + + size_t data_buffer_idx = 0; + for (auto& variable : _x_variables) { + auto* shape_ptr = &(variable.shape[0]); + paddle::framework::DDim ddim(shape_ptr, variable.shape.size()); + auto* tensor = ScopeHelper::resize_lod_tensor(scope, variable.name, ddim); + auto* var_data = tensor->mutable_data(_trainer_context->cpu_place); + memcpy(var_data, _data_buffer + data_buffer_idx, variable.dim * sizeof(float)); + data_buffer_idx += variable.dim; + } + if (_need_async_pull) { + ++_pull_request_num; + } + return 0; +} + +int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope) { + if (!_need_gradient) { + return 0; + } + size_t data_buffer_idx = 0; + std::vector regions; + for (auto& variable : _x_variables) { + auto* tensor = scope->Var(variable.gradient_name)-> + GetMutable(); + auto* grad_data = tensor->mutable_data(_trainer_context->cpu_place); + regions.emplace_back(grad_data, variable.dim); + } + auto* ps_client = _trainer_context->pslib->ps_client(); + auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id); + return push_status.get(); +} + +int32_t EbdVariableInputAccessor::forward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope) { + CHECK(_x_variables.size() == 1); + CHECK(_x_variables[0].shape.size() == 1); + auto& variable = _x_variables[0]; + auto* tensor = ScopeHelper::resize_lod_tensor(scope, + variable.name, {num, variable.shape[0]}); + auto* var_data = tensor->mutable_data(_trainer_context->cpu_place); + for (size_t i = 0; i < num; ++i) { + auto& sample = samples[i]; + CHECK(sample.embedx.size() == variable.dim); + memcpy(var_data, sample.embedx.data(), variable.dim * sizeof(float)); + var_data += variable.dim; + } + return 0; +} + +int32_t EbdVariableInputAccessor::backward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope) { + return 0; +} + +REGIST_CLASS(DataInputAccessor, DenseInputAccessor); +REGIST_CLASS(DataInputAccessor, EbdVariableInputAccessor); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h index a91a5f4620e20755089a2fdb53a19bfd7184ef80..1cca4b1d5d76ea12b50f8028fe3f28d27c38f9ac 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h +++ b/paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h @@ -1,7 +1,9 @@ #pragma once #include "paddle/fluid/framework/scope.h" -#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" +#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/accessor.h" +#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h" +#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" namespace paddle { namespace custom_trainer { @@ -12,20 +14,126 @@ public: DataInputAccessor() {} virtual ~DataInputAccessor() {} - virtual int initialize(const YAML::Node& config, - std::shared_ptr context_ptr); + virtual int initialize(YAML::Node config, + std::shared_ptr context_ptr) { + _trainer_context = context_ptr.get(); + _table_id = config["table_id"].as(); + _need_gradient = config["need_gradient"].as(); + return 0; + } // 前向, 一般用于填充输入,在训练网络执行前调用 - virtual int32_t forward(const SampleInstance* samples, - ::paddle::framework::Scope* scope, size_t table_id, size_t num) = 0; + virtual int32_t forward(SampleInstance* samples, size_t num, + ::paddle::framework::Scope* scope) = 0; // 后向,一般用于更新梯度,在训练网络执行后调用 - virtual int32_t backward(const SampleInstance* samples, - ::paddle::framework::Scope* scope, size_t table_id, size_t num) = 0; + virtual int32_t backward(SampleInstance* samples, size_t num, + ::paddle::framework::Scope* scope) = 0; protected: + size_t _table_id; + bool _need_gradient = false; + TrainerContext* _trainer_context = nullptr; }; REGIST_REGISTERER(DataInputAccessor); + +struct SparseInputVariable { + size_t slot_dim; + size_t total_dim; + std::string name; + std::string gradient_name; + std::vector slot_idx; + std::vector slot_list; +}; + +struct SparseVarRuntimeData { + uint32_t row_size; + uint32_t total_size; + float* variable_data; + float* gradient_data; +}; + +class BaseSparseInputAccessor : public DataInputAccessor { +public: + BaseSparseInputAccessor() {} + virtual ~BaseSparseInputAccessor() {} + + virtual int initialize(YAML::Node config, + std::shared_ptr context_ptr); + + // forword过程的input填充 + virtual int32_t forward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); + // 取得单个SparseKey的PullValue, 实现单个SparseValue的填充 + virtual void fill_input(float* var_data, const float* pull_raw, + paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0; + // 所有SparseValue填充完成后,调用,可进一步全局处理 + virtual void post_process_input(float* var_data, SparseInputVariable&, SampleInstance*, size_t num) = 0; + + // backward过程的梯度push + virtual int32_t backward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); + // SparseGradValue会被依次调用,用于整理push的梯度 + virtual void fill_gradient(float* push_value, const float* gradient_raw, + paddle::ps::ValueAccessor&, SparseInputVariable&, SampleInstance&) = 0; + +protected: + // 输入层列表 + std::vector _x_variables; +}; + +struct DenseInputVariable { + size_t dim; + std::string name; + std::vector shape; + std::string gradient_name; +}; + +class DenseInputAccessor : public DataInputAccessor { +public: + DenseInputAccessor() {} + virtual ~DenseInputAccessor() { + if (_data_buffer) { + delete[] _data_buffer; + } + _need_async_pull = false; + if (_async_pull_thread) { + _async_pull_thread->join(); + } + } + + virtual int initialize(YAML::Node config, + std::shared_ptr context_ptr); + + virtual int32_t forward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); + + virtual int32_t backward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); +protected: + virtual int32_t pull_dense(size_t table_id); + + size_t _total_dim = 0; + std::mutex _pull_mutex; + bool _need_async_pull = false; + float* _data_buffer = nullptr; + std::atomic _pull_request_num; + std::vector _x_variables; + std::shared_ptr _async_pull_thread; +}; + +class EbdVariableInputAccessor : public DenseInputAccessor { +public: + EbdVariableInputAccessor() {} + virtual ~EbdVariableInputAccessor() {} + + virtual int32_t forward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); + + virtual int32_t backward(SampleInstance* samples, size_t num, + paddle::framework::Scope* scope); +}; + } // namespace feed } // namespace custom_trainer } // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc index 63b896b8e3977ef15695fcddb8b5ee697db24f89..2ff68d9fcdb2cc95ba4b188b8389e520759eb370 100644 --- a/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc +++ b/paddle/fluid/train/custom_trainer/feed/accessor/sparse_input_accessor.cc @@ -1,50 +1,223 @@ +#include #include #include #include "paddle/fluid/string/string_helper.h" +#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" #include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" namespace paddle { namespace custom_trainer { namespace feed { -class CommonSparseInputAccessor : public DataInputAccessor { -public: - CommonSparseInputAccessor() {} - virtual ~CommonSparseInputAccessor() {} - virtual int initialize(const YAML::Node& config, - std::shared_ptr context_ptr) { - CHECK(config["sparse_input"] && config["sparse_input"].Type() == YAML::NodeType::Map); - for (auto& input : config["sparse_input"]) { - std::pair> sparse_slots; - sparse_slots.first = input.first.as(); - std::string slots_str = input.second["slots"].as(); - std::vector slots = paddle::string::split_string(slots_str, ","); - for (int i = 0; i < slots.size(); ++i) { - sparse_slots.second.push_back((uint16_t)atoi(slots[i].c_str())); +int BaseSparseInputAccessor::initialize(YAML::Node config, + std::shared_ptr context_ptr) { + CHECK(DataInputAccessor::initialize(config, context_ptr) == 0); + CHECK(config["input"] && config["input"].Type() == YAML::NodeType::Map); + for (const auto& input : config["input"]) { + SparseInputVariable variable; + variable.name = input.first.as(); + variable.gradient_name = paddle::framework::GradVarName(variable.name); + std::string slots_str = input.second["slots"].as(); + std::vector slots = paddle::string::split_string(slots_str, ","); + variable.slot_idx.resize(UINT16_MAX, -1); + for (int i = 0; i < slots.size(); ++i) { + auto slot = (uint16_t)atoi(slots[i].c_str()); + variable.slot_idx[slot] = i; + variable.slot_list.push_back(slot); + } + variable.slot_dim = input.second["slot_dim"].as(); + variable.total_dim = variable.slot_list.size() * variable.slot_dim; + _x_variables.push_back(variable); + } + return 0; +} + +// 取sparse数据 +int32_t BaseSparseInputAccessor::forward(SampleInstance* samples, + size_t num, paddle::framework::Scope* scope) { + CHECK(num > 0); + auto* ps_client = _trainer_context->pslib->ps_client(); + auto* value_accessor = ps_client->table_accessor(_table_id); + size_t key_num = 0; + for (size_t i = 0; i < num; ++i) { + key_num += samples[i].features.size(); + } + std::vector keys(key_num); + float** pull_values = new float*[key_num]; + auto pull_value_dim = value_accessor->select_dim(); + + // 填入sparseKey Request + size_t key_idx = 0; + for (size_t i = 0; i < num; ++i) { + auto& features = samples[i].features; + for (auto& feature_item : features) { + feature_item.weights.resize(pull_value_dim, 0.0); + keys[key_idx] = feature_item.feature_sign; + pull_values[key_idx++] = &(feature_item.weights[0]); + } + } + auto pull_status = ps_client->pull_sparse(pull_values, _table_id, keys.data(), key_num); + auto ret = pull_status.get(); + delete[] pull_values; + if (ret != 0) { + return ret; + } + + auto* runtime_data_ptr = new std::vector(); + auto& var_runtime_data = *runtime_data_ptr; + var_runtime_data.resize(_x_variables.size()); + int64_t runtime_data_for_scope = (int64_t)runtime_data_ptr; + ScopeHelper::fill_value(scope, _trainer_context->cpu_place, + "sparse_runtime_data", runtime_data_for_scope); + // Variable空间初始化 + for (size_t i = 0; i < _x_variables.size(); ++i) { + const auto& variable = _x_variables[i]; + var_runtime_data[i].row_size = num; + var_runtime_data[i].total_size = num * variable.total_dim; + auto* tensor = ScopeHelper::resize_lod_tensor( + scope, variable.name, {num, variable.total_dim}); + auto* grad_tensor = ScopeHelper::resize_lod_tensor( + scope, variable.gradient_name, {num, variable.total_dim}); + var_runtime_data[i].variable_data = tensor->mutable_data(_trainer_context->cpu_place); + var_runtime_data[i].gradient_data = grad_tensor->mutable_data(_trainer_context->cpu_place); + memset((void*) var_runtime_data[i].variable_data, 0, var_runtime_data[i].total_size * sizeof(float)); + memset((void*) var_runtime_data[i].gradient_data, 0, var_runtime_data[i].total_size * sizeof(float)); + } + // 参数填入Variable + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + auto slot_idx = variable.slot_idx[feature_item.slot_id]; + if (slot_idx < 0) { + continue; + } + float* item_data = var_runtime_data[i].variable_data + + samp_idx * var_runtime_data[i].total_size + variable.slot_dim * slot_idx; + fill_input(item_data, &(feature_item.weights[0]), *value_accessor, variable, samples[samp_idx]); } } - return 0; } + // Variable后置处理 + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + post_process_input(var_runtime_data[i].variable_data, variable, samples, num); + } + return 0; +} - // 取sparse数据 - virtual int32_t forward(const SampleInstance* samples, - ::paddle::framework::Scope* scope, size_t table_id, size_t num) { - // pull +// 更新spare数据 +int32_t BaseSparseInputAccessor::backward(SampleInstance* samples, + size_t num, paddle::framework::Scope* scope) { + int64_t runtime_data_for_scope = *ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "sparse_runtime_data"); + auto* runtime_data_ptr = (std::vector*)runtime_data_for_scope; + auto& var_runtime_data = *runtime_data_ptr; + DoneGuard gurad([runtime_data_ptr](){ + delete runtime_data_ptr; + }); + if (!_need_gradient) { return 0; } + auto* ps_client = _trainer_context->pslib->ps_client(); + auto* value_accessor = ps_client->table_accessor(_table_id); - // 更新spare数据 - virtual int32_t backward(const SampleInstance* samples, - ::paddle::framework::Scope* scope, size_t table_id, size_t num) { - return 0; - } + size_t key_num = 0; + for (size_t i = 0; i < num; ++i) { + key_num += samples[i].features.size(); + } + std::vector keys(key_num); + float** push_values = new float*[key_num]; + auto push_value_dim = value_accessor->update_dim(); + + size_t key_idx = 0; + for (size_t samp_idx = 0; samp_idx < num; ++samp_idx) { + auto& features = samples[samp_idx].features; + for (auto& feature_item : features) { + feature_item.gradients.resize(push_value_dim, 0.0); + for (size_t i = 0; i < _x_variables.size(); ++i) { + auto& variable = _x_variables[i]; + auto slot_idx = variable.slot_idx[feature_item.slot_id]; + if (slot_idx < 0) { + continue; + } + const float* grad_data = var_runtime_data[i].gradient_data + + samp_idx * var_runtime_data[i].total_size + variable.slot_dim * slot_idx; + fill_gradient(&(feature_item.gradients[0]), grad_data, + *value_accessor, variable, samples[samp_idx]); + keys[key_idx] = feature_item.feature_sign; + push_values[key_idx++] = &(feature_item.gradients[0]); + } + } + } + auto push_status = ps_client->push_sparse(_table_id, + keys.data(), (const float**)push_values, key_num); + auto ret = push_status.get(); + delete[] push_values; + return ret; +} -protected: +class AbacusSparseJoinAccessor : public BaseSparseInputAccessor { +public: + AbacusSparseJoinAccessor() {} + virtual ~AbacusSparseJoinAccessor() {} + virtual void fill_input(float* var_data, const float* pull_raw, + paddle::ps::ValueAccessor& value_accessor, + SparseInputVariable& variable, SampleInstance& sample) { + for (size_t i = 0; i < variable.slot_dim; ++i) { + var_data[i] += pull_raw[i]; + } + } + + virtual void post_process_input(float* var_data, + SparseInputVariable& variable, SampleInstance* samples, size_t num) { + for (size_t i = 0; i < num * variable.slot_list.size(); ++i) { + var_data[0] = log(var_data[0] + 1); // show + var_data[1] = log(var_data[1] + 1) - var_data[0]; // ctr + var_data += variable.slot_dim; + } + } + + virtual void fill_gradient(float* push_value, const float* gradient_raw, + paddle::ps::ValueAccessor& value_accessor, + SparseInputVariable& variable, SampleInstance& sample) { + // join阶段不回填梯度 + CHECK(false); + return; + } +}; +REGIST_CLASS(DataInputAccessor, AbacusSparseJoinAccessor); - // 输入层列表 - // - std::vector > > _x_variables; +class AbacusSparseUpdateAccessor : public BaseSparseInputAccessor { +public: + AbacusSparseUpdateAccessor() {} + virtual ~AbacusSparseUpdateAccessor() {} + virtual void fill_input(float* var_data, const float* pull_raw, + paddle::ps::ValueAccessor& value_accessor, + SparseInputVariable& variable, SampleInstance& sample) { + for (size_t i = 0; i < variable.slot_dim; ++i) { + var_data[i] += pull_raw[i + 2]; + } + } + + virtual void post_process_input(float* var_data, + SparseInputVariable& variable, SampleInstance* samples, size_t num) { + return; + } + + virtual void fill_gradient(float* push_value, const float* gradient_raw, + paddle::ps::ValueAccessor& value_accessor, + SparseInputVariable& variable, SampleInstance& sample) { + push_value[0] += 1; + push_value[1] += sample.labels[0]; + for (size_t i = 0; i < variable.slot_dim; ++i) { + push_value[i + 2] += gradient_raw[i]; + } + return; + } }; +REGIST_CLASS(DataInputAccessor, AbacusSparseUpdateAccessor); } // namespace feed } // namespace custom_trainer diff --git a/paddle/fluid/train/custom_trainer/feed/common/pipeline.h b/paddle/fluid/train/custom_trainer/feed/common/pipeline.h index 39e669954d99b1aa7500fb3d7c2c83ce5eeda0c5..1db754903b9feaecfa654ba985b180a524075a43 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/pipeline.h +++ b/paddle/fluid/train/custom_trainer/feed/common/pipeline.h @@ -6,12 +6,22 @@ namespace paddle { namespace custom_trainer { namespace feed { +class DoneGuard { +public: + DoneGuard(std::function func) : _func(func) {} + virtual ~DoneGuard() { _func(); } +private: + std::function _func; +}; + class PipelineOptions { public: PipelineOptions() = default; - uint32_t buffer_data_num = 400 ; //缓冲区数据个数,需大于batch_size - uint32_t batch_size = 100 ; //从pipe读数据的batch大小 - bool need_hold_input_data = false; //是否保存input流数据,否则消费后释放 + uint32_t batch_size = 10; // pipe输出的batch大小 + uint32_t thread_num = 1; // converter的并发线程数 + float input_output_rate = 1; // 输入/输出 qps流量比 + uint32_t buffer_batch_count = 4; // pipe预存count组batch数据 + bool need_hold_input_data = false; // 是否保存input流数据,否则消费后释放 }; /* @@ -29,7 +39,8 @@ public: Pipeline() {} Pipeline(Pipeline&&) = delete; Pipeline(const Pipeline&) = delete; - typedef std::function PipeDataConverter; + typedef std::function PipeDataConverter; int initialize(const PipelineOptions& options, ::paddle::framework::Channel input_channel, @@ -42,18 +53,12 @@ public: _converter = data_converter; _input_channel = input_channel; _output_channel = ::paddle::framework::MakeChannel(); - - auto batch_size = options.batch_size; - auto buffer_data_num = options.buffer_data_num; - _input_channel->SetBlockSize(batch_size); - _output_channel->SetBlockSize(batch_size); - _input_data_buffer.resize(buffer_data_num); - _output_data_buffer.resize(buffer_data_num); - if (buffer_data_num / batch_size < 3) { - buffer_data_num = batch_size * 3; - } - buffer_data_num = (buffer_data_num / batch_size) * batch_size; - _output_channel->SetCapacity(buffer_data_num); + _output_channel->SetBlockSize(options.batch_size); + size_t input_batch_size = options.batch_size * options.input_output_rate; + _input_channel->SetBlockSize(input_batch_size); + _input_data_buffer.resize(input_batch_size * options.buffer_batch_count); + _output_data_buffer.resize(options.batch_size * options.buffer_batch_count); + _output_channel->SetCapacity(_output_data_buffer.size()); CHECK(_input_channel != nullptr) << " Input Channel is null"; _convert_thread = std::make_shared([this](){ async_convert_data(); @@ -63,7 +68,9 @@ public: template int connect_to(Pipeline& pre_pipeline, - PipeDataConverter data_converter) { + PipelineOptions& options, PipeDataConverter data_converter) { + // 保证全局batch一致 + options.batch_size = pre_pipeline.options().batch_size / options.input_output_rate; return initialize(pre_pipeline.options(), pre_pipeline.output_chnnel(), data_converter); } @@ -87,24 +94,26 @@ public: inline ::paddle::framework::Channel output_chnnel() { return _output_channel; } + + // 返回对input_channel的消费备份 + inline ::paddle::framework::Channel backup_channel() { + return _input_channel_backup; + } private: void async_convert_data() { - size_t convete_batch_size = _input_data_buffer.size() / 4; - if (convete_batch_size < _options.batch_size * 3) { - convete_batch_size = 3 * _options.batch_size; - } - convete_batch_size = (convete_batch_size / _options.batch_size) * _options.batch_size; + size_t input_batch_size = _options.batch_size * _options.input_output_rate; while (!_is_read_end) { while (_output_channel->Size() < _input_data_buffer.size()) { size_t read_size = _input_channel-> - Read(convete_batch_size, &_input_data_buffer[0]); + Read(input_batch_size, &_input_data_buffer[0]); if (read_size == 0) { _is_read_end = true; break; } - CHECK(_converter(&_input_data_buffer[0], &_output_data_buffer[0], - read_size) == 0) << "Data Converter Do Failed"; - _output_channel->WriteMove(read_size, &_output_data_buffer[0]); + size_t write_size = 0; + CHECK(_converter(&_input_data_buffer[0], read_size, + &_output_data_buffer[0], &write_size, 0) == 0) << "Data Converter Do Failed"; + _output_channel->WriteMove(write_size, &_output_data_buffer[0]); if (_options.need_hold_input_data) { _input_channel_backup->WriteMove(read_size, &_input_data_buffer[0]); } diff --git a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h index 5e567c787d7db0fde69fe9ecdfaef21de9061f74..e4ac68a90dd1cd11d722d2e7bf16bb64ce0d25f9 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h +++ b/paddle/fluid/train/custom_trainer/feed/common/runtime_environment.h @@ -15,8 +15,6 @@ namespace paddle { namespace custom_trainer { namespace feed { -class paddle::ps::PSEnvironment; - enum class EnvironmentLogLevel { FATAL = 0, ERROR = 1, diff --git a/paddle/fluid/train/custom_trainer/feed/common/scope_helper.h b/paddle/fluid/train/custom_trainer/feed/common/scope_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..307a835ac91ff10bfc85e57e039a3b00e326a113 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/common/scope_helper.h @@ -0,0 +1,55 @@ +#pragma once +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/lod_tensor.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class ScopeHelper { +public: + //直接取var + template + static const T& var(paddle::framework::Scope* scope, const std::string& name) { + return scope->Var(name)->Get(); + } + template + static T* mutable_var(paddle::framework::Scope* scope, const std::string& name) { + return scope->Var(name)->GetMutable(); + } + + template + static T* resize_variable(paddle::framework::Scope* scope, + const std::string& name, const paddle::framework::DDim& dim) { + auto* tensor = scope->Var(name)->GetMutable(); + tensor->Resize(dim); + return tensor; + } + + static paddle::framework::LoDTensor* resize_lod_tensor( + paddle::framework::Scope* scope, + const std::string& name, const paddle::framework::DDim& dim) { + return resize_variable(scope, name, dim); + } + + template + static void fill_value(paddle::framework::Scope* scope, + paddle::platform::Place place, const std::string& name, T& value) { + auto* tensor = resize_variable(scope, name, { 1 }); + T* data = tensor->mutable_data(place); + *data = value; + return; + } + + template + static T* get_value(paddle::framework::Scope* scope, + paddle::platform::Place place, const std::string& name) { + auto* tensor = scope->Var(name)->GetMutable(); + return tensor->mutable_data(place); + } + +}; + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc index 7e03677054395e63dc15090b7a64fd21cf93d48c..46794a84d39b7b19aa4d61ffbe5d66427d18032a 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.cc @@ -131,53 +131,66 @@ public: return read_all(file_list, data_channel); } virtual int read_all(const std::vector& file_list, ::paddle::framework::Channel data_channel) { - auto deleter = [](framework::ChannelWriter *writer) { - if (writer) { - writer->Flush(); - VLOG(3) << "writer auto flush"; - } - delete writer; - }; - std::unique_ptr, decltype(deleter)> writer(new framework::ChannelWriter(data_channel.get()), deleter); - DataItem data_item; - - int file_list_size = file_list.size(); + const int file_list_size = file_list.size(); std::atomic is_failed(false); + const int max_threads = omp_get_max_threads(); + std::vector> writers; // writer is not thread safe + writers.reserve(max_threads); + for (int i = 0; i < max_threads; ++i) { + writers.emplace_back(data_channel.get()); + } + VLOG(5) << "file_list: " << string::join_strings(file_list, ' '); #pragma omp parallel for for (int i = 0; i < file_list_size; ++i) { + if (is_failed) { + continue; + } + const int thread_num = omp_get_thread_num(); + framework::ChannelWriter *writer = nullptr; + if (thread_num < max_threads) { + writer = &writers[thread_num]; + } const auto& filepath = file_list[i]; - if (!is_failed) { - std::shared_ptr fin = _file_system->open_read(filepath, _pipeline_cmd); - if (fin == nullptr) { - VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd; - is_failed = true; + std::shared_ptr fin = _file_system->open_read(filepath, _pipeline_cmd); + if (fin == nullptr) { + VLOG(2) << "fail to open file: " << filepath << ", with cmd: " << _pipeline_cmd; + is_failed = true; + continue; + } + char *buffer = nullptr; + size_t buffer_size = 0; + ssize_t line_len = 0; + while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) { + // 去掉行位回车 + if (line_len > 0 && buffer[line_len - 1] == '\n') { + buffer[--line_len] = '\0'; + } + // 忽略空行 + if (line_len <= 0) { continue; } - char *buffer = nullptr; - size_t buffer_size = 0; - ssize_t line_len = 0; - while ((line_len = getline(&buffer, &buffer_size, fin.get())) != -1) { - if (line_len > 0 && buffer[line_len - 1] == '\n') { - buffer[--line_len] = '\0'; - } - if (line_len <= 0) { - continue; - } - if (_parser->parse(buffer, line_len, data_item) == 0) { + DataItem data_item; + if (_parser->parse(buffer, line_len, data_item) == 0) { + VLOG(5) << "parse data: " << data_item.id << " " << data_item.data << ", filename: " << filepath << ", thread_num: " << thread_num << ", max_threads: " << max_threads; + if (writer == nullptr) { + if (!data_channel->Put(std::move(data_item))) { + VLOG(2) << "fail to put data, thread_num: " << thread_num; + } + } else { (*writer) << std::move(data_item); } } - if (buffer != nullptr) { - free(buffer); - buffer = nullptr; - buffer_size = 0; - } - if (ferror(fin.get()) != 0) { - VLOG(2) << "fail to read file: " << filepath; - is_failed = true; - continue; - } + } + if (buffer != nullptr) { + free(buffer); + buffer = nullptr; + buffer_size = 0; + } + if (ferror(fin.get()) != 0) { + VLOG(2) << "fail to read file: " << filepath; + is_failed = true; + continue; } if (_file_system->err_no() != 0) { _file_system->reset_err_no(); @@ -185,10 +198,14 @@ public: continue; } } - writer->Flush(); - if (!(*writer)) { - VLOG(2) << "fail when write to channel"; - is_failed = true; + // omp end + + for (int i = 0; i < max_threads; ++i) { + writers[i].Flush(); + if (!writers[i]) { + VLOG(2) << "writer " << i << " is failed"; + is_failed = true; + } } data_channel->Close(); return is_failed ? -1 : 0; diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h index 7b6d1c3f679d6e4295cbb8acd0d81a192f9293a4..f588fa4af8c35f0d76e4a5a3d64cb8a70396d15d 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h +++ b/paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h @@ -20,11 +20,13 @@ class TrainerContext; struct FeatureItem { uint64_t feature_sign; uint16_t slot_id; + std::vector weights; + std::vector gradients; }; struct SampleInstance { std::string id; - std::vector lables; + std::vector labels; std::vector features; std::vector embedx; }; diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/dataset.cc b/paddle/fluid/train/custom_trainer/feed/dataset/dataset.cc index fc29f30852245916c89b6a724b5f4aa23c2dd6c6..cb9bd06e4bacff520643ac0cde675a6233e84b3e 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/dataset.cc +++ b/paddle/fluid/train/custom_trainer/feed/dataset/dataset.cc @@ -53,25 +53,9 @@ inline ::paddle::framework::Channel Dataset::fetch_data( return _data_containers[data_name]->fetch(epoch_id); } -SampleInstancePipe Dataset::fetch_sample( - const std::string& data_name, uint32_t batch_size, uint64_t epoch_id) { +inline const DataParser* Dataset::data_parser(const std::string& data_name) { auto* data_container = _data_containers[data_name].get(); - auto data_channel = data_container->fetch(epoch_id); - const auto* data_parser = data_container->data_parser(); - PipelineOptions options; - options.batch_size = batch_size; - options.need_hold_input_data = true; - options.buffer_data_num = batch_size * 10; - SampleInstancePipe pipe = make_sample_instance_channel(); - pipe->initialize(options, data_channel, - [data_parser] (const DataItem* data, SampleInstance* sample, size_t num) -> int { - int ret = 0; - for (int i = 0; i < num; ++i, ++data, ++sample) { - ret |= data_parser->parse_to_sample(*data, *sample); - } - return ret; - }); - return pipe; + return data_container->data_parser(); } diff --git a/paddle/fluid/train/custom_trainer/feed/dataset/dataset.h b/paddle/fluid/train/custom_trainer/feed/dataset/dataset.h index f8ea62c65202cf04622bb3e5e39ae82913a13feb..23a786e023fbad9821d4ad11f4bd7f54c41ebcb7 100644 --- a/paddle/fluid/train/custom_trainer/feed/dataset/dataset.h +++ b/paddle/fluid/train/custom_trainer/feed/dataset/dataset.h @@ -33,10 +33,9 @@ public: virtual ::paddle::framework::Channel fetch_data( const std::string& data_name, uint64_t epoch_id); - //以管道形式返回标准样本流,管道内会对数据做异步转换 - virtual SampleInstancePipe fetch_sample( - const std::string& data_name, uint32_t batch_size, uint64_t epoch_id); - + //获取DataItem解析器 + virtual const DataParser* data_parser(const std::string& data_name); + private: std::unordered_map> _data_containers; }; diff --git a/paddle/fluid/train/custom_trainer/feed/executor/executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/executor.cc index 73f8a601d10f160c764f76db94c9196624c05f30..0db6b9e887f8108a3320c3466202f15b3399cedf 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/executor/executor.cc @@ -54,8 +54,6 @@ public: paddle::framework::InitDevices(false); if (exe_config["num_threads"]) { paddle::platform::SetNumThreads(exe_config["num_threads"].as()); - } else { - paddle::platform::SetNumThreads(1); } if (!exe_config["startup_program"] || @@ -66,13 +64,11 @@ public: try { _context.reset(new SimpleExecutor::Context(context_ptr->cpu_place)); - auto startup_program = Load(&_context->executor, exe_config["startup_program"].as()); - if (startup_program == nullptr) { + _context->startup_program = Load(&_context->executor, exe_config["startup_program"].as()); + if (_context->startup_program == nullptr) { VLOG(2) << "fail to load startup_program: " << exe_config["startup_program"].as(); return -1; } - - _context->executor.Run(*startup_program, this->scope(), 0, false, true); _context->main_program = Load(&_context->executor, exe_config["main_program"].as()); if (_context->main_program == nullptr) { @@ -80,7 +76,6 @@ public: return -1; } _context->prepare_context = _context->executor.Prepare(*_context->main_program, 0); - _context->executor.CreateVariables(*_context->main_program, this->scope(), 0); } catch (::paddle::platform::EnforceNotMet& err) { VLOG(2) << err.what(); _context.reset(nullptr); @@ -89,18 +84,23 @@ public: return 0; } - virtual int run() { + virtual int initialize_scope(::paddle::framework::Scope* scope) { + _context->executor.Run(*_context->startup_program, scope, 0, false, true); + _context->executor.CreateVariables(*_context->main_program, scope, 0); + return 0; + } + virtual int run(::paddle::framework::Scope* scope) { if (_context == nullptr) { VLOG(2) << "need initialize before run"; return -1; } try { - _context->executor.RunPreparedContext(_context->prepare_context.get(), this->scope(), + _context->executor.RunPreparedContext(_context->prepare_context.get(), scope, false, /* don't create local scope each time*/ false /* don't create variable each time */); // For some other vector like containers not cleaned after each batch. - _context->tensor_array_batch_cleaner.CollectNoTensorVars(this->scope()); + _context->tensor_array_batch_cleaner.CollectNoTensorVars(scope); _context->tensor_array_batch_cleaner.ResetNoTensorVars(); } catch (::paddle::platform::EnforceNotMet& err) { VLOG(2) << err.what(); @@ -115,6 +115,7 @@ protected: const ::paddle::platform::Place& place; ::paddle::framework::Executor executor; ::std::unique_ptr<::paddle::framework::ProgramDesc> main_program; + ::std::unique_ptr<::paddle::framework::ProgramDesc> startup_program; ::std::unique_ptr prepare_context; details::TensorArrayBatchCleaner tensor_array_batch_cleaner; }; diff --git a/paddle/fluid/train/custom_trainer/feed/executor/executor.h b/paddle/fluid/train/custom_trainer/feed/executor/executor.h index 3ae40817eb56ca8738fbec13b92d8be86f5b6094..2a620f62192e1eb3795046f581d99716036a36c5 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/executor.h +++ b/paddle/fluid/train/custom_trainer/feed/executor/executor.h @@ -13,32 +13,16 @@ public: Executor() {} virtual ~Executor() {} - //初始化,包括进行训练网络&配置加载工作 + // 初始化,包括进行训练网络&配置加载工作 virtual int initialize(YAML::Node exe_config, std::shared_ptr context_ptr) = 0; - //scope 可用于填充&取 var - virtual ::paddle::framework::Scope* scope() { - return &_scope; - } - //直接取var - template - const T& var(const std::string& name) { - return _scope.Var(name)->Get(); - } - template - T* mutable_var(const std::string& name) { - return _scope.Var(name)->GetMutable(); - } + // 初始化scope, 后续反复执行训练,不再初始化 + virtual int initialize_scope(::paddle::framework::Scope* scope) = 0; + + // 执行训练 + virtual int run(::paddle::framework::Scope* scope) = 0; - //执行训练 - virtual int run() = 0; - - virtual bool is_dump_all_model() { - return false; - } -protected: - ::paddle::framework::Scope _scope; }; REGIST_REGISTERER(Executor); diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc new file mode 100644 index 0000000000000000000000000000000000000000..c0ecb05dc16fbd4fd0e8dcf874f5a8ef35dd01a8 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc @@ -0,0 +1,138 @@ +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +int MultiThreadExecutor::initialize(YAML::Node exe_config, + std::shared_ptr context_ptr) { + int ret = 0; + _trainer_context = context_ptr.get(); + _train_data_name = exe_config["train_data_name"].as(); + _train_batch_size = exe_config["train_batch_size"].as(); + _input_parse_thread_num = exe_config["input_parse_thread_num"].as(); + _push_gradient_thread_num = exe_config["push_gradient_thread_num "].as(); + _train_thread_num = exe_config["train_thread_num"].as(); + _need_dump_all_model = exe_config["need_dump_all_model"].as(); + CHECK(_train_thread_num > 0 && _train_batch_size > 0); + _thread_executors.resize(_train_thread_num); + auto e_class = exe_config["class"].as(); + _train_exe_name = exe_config["name"].as(); + + omp_set_num_threads(_train_thread_num); + #pragma omp parallel for + for (int i = 0; i < _train_thread_num; ++i) { + auto* e_ptr = CREATE_INSTANCE(Executor, e_class); + _thread_executors[i].reset(e_ptr); + if (e_ptr->initialize(exe_config, context_ptr) != 0) { + ret = -1; + } + } + CHECK(ret == 0); + _scope_obj_pool.config( + [this]() -> ::paddle::framework::Scope* { + auto* scope = new ::paddle::framework::Scope(); + _thread_executors[0]->initialize_scope(scope); + return scope; + }, _train_thread_num * 8); + + std::string model_config_path = _trainer_context->file_system->path_join( + "./model", string::format_string("%s.yaml", _train_exe_name.c_str())); + CHECK(_trainer_context->file_system->exists(model_config_path)) + << "miss model config file:" << model_config_path; + _model_config = YAML::LoadFile(model_config_path); + _input_accessors.resize(_model_config["input_accessor"].size()); + for (const auto& accessor_config : _model_config["input_accessor"]) { + auto accessor_class = accessor_config["class"].as(); + _input_accessors.emplace_back(CREATE_INSTANCE(DataInputAccessor, accessor_class)); + CHECK(_input_accessors.back()->initialize(accessor_config, context_ptr) == 0) + << "InputAccessor init Failed, class:" << accessor_class; + } + + return ret; +} + +paddle::framework::Channel MultiThreadExecutor::run( + paddle::framework::Channel input, const DataParser* parser) { + PipelineOptions input_pipe_option; + input_pipe_option.need_hold_input_data = true; + input_pipe_option.batch_size = _train_batch_size; + input_pipe_option.thread_num = _input_parse_thread_num; + input_pipe_option.input_output_rate = _train_batch_size; + input_pipe_option.buffer_batch_count = _train_thread_num; + auto input_pipe = std::make_shared>(); + input_pipe->initialize(input_pipe_option, input, + [this, parser](DataItem* item, size_t item_num, + ScopePoolObj* scope, size_t* scope_num, size_t thread_idx) -> int { + *scope_num = 1; + auto scope_obj = _scope_obj_pool.get(); + auto* samples = new SampleInstance[item_num]; + for (size_t i = 0; i parse_to_sample(item[i], samples[i]) == 0); + } + for (size_t i = 0; i < _input_accessors.size(); ++i) { + _input_accessors[i]->forward(samples, item_num, scope_obj.get()); + } + int64_t data_for_scope = (int64_t)samples; + ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, + "sample_data", data_for_scope); + data_for_scope = (int64_t)item_num; + ScopeHelper::fill_value(scope_obj.get(), _trainer_context->cpu_place, + "sample_num", data_for_scope); + *scope = std::move(scope_obj); + return 0; + }); + + PipelineOptions train_pipe_option; + train_pipe_option.input_output_rate = 1; + train_pipe_option.thread_num = _train_thread_num; + train_pipe_option.buffer_batch_count = 2 * _train_thread_num; + auto train_pipe = std::make_shared>(); + train_pipe->connect_to(*input_pipe, train_pipe_option, + [this] (ScopePoolObj* in_items, size_t in_num, + ScopePoolObj* out_items, size_t* out_num, size_t thread_idx) -> int { + auto* executor = _thread_executors[thread_idx].get(); + size_t& out_idx = *out_num; + for (out_idx = 0; out_idx < in_num; ++out_idx) { + executor->run(in_items[out_idx].get()); + out_items[out_idx] = std::move(in_items[out_idx]); + } + return 0; + }); + + PipelineOptions gradient_pipe_option; + gradient_pipe_option.input_output_rate = 1; + gradient_pipe_option.thread_num = _push_gradient_thread_num; + gradient_pipe_option.buffer_batch_count = 2 * _train_thread_num; + auto gradient_pipe = std::make_shared>(); + gradient_pipe->connect_to(*train_pipe, gradient_pipe_option, + [this] (ScopePoolObj* in_items, size_t in_num, + int* out_items, size_t* out_num, size_t thread_idx) -> int { + size_t& out_idx = *out_num; + for (out_idx = 0; out_idx < in_num; ++out_idx) { + auto* scope = in_items[out_idx].get(); + auto sample_num = *ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "sample_num"); + + auto* samples = (SampleInstance*)(*ScopeHelper::get_value( + scope, _trainer_context->cpu_place, "sample_data")); + for (size_t i = 0; i < _input_accessors.size(); ++i) { + out_items[out_idx] = _input_accessors[i]-> + backward(samples, sample_num, scope); + } + delete[] samples; // 所有pipe完成后,再回收sample + } + + return 0; + }); + + std::vector gradient_status; + while (gradient_pipe->read(gradient_status) > 0) { + } + return input_pipe->backup_channel(); +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h new file mode 100644 index 0000000000000000000000000000000000000000..77f07c4d3113c85de0c29fe9a263d720527277ba --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/train/custom_trainer/feed/accessor/input_data_accessor.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class MultiThreadExecutor { +public: + MultiThreadExecutor() {} + virtual ~MultiThreadExecutor() {} + + //初始化,包括进行训练网络&配置加载工作 + virtual int initialize(YAML::Node exe_config, + std::shared_ptr context_ptr); + + //执行训练 + virtual paddle::framework::Channel run( + paddle::framework::Channel input, const DataParser* parser); + + virtual bool is_dump_all_model() { + return _need_dump_all_model; + } + + virtual const std::string& train_data_name() { + return _train_data_name; + } +protected: + std::string _train_data_name; + size_t _train_batch_size = 32; + size_t _train_thread_num = 12; + size_t _input_parse_thread_num = 10; + size_t _push_gradient_thread_num = 10; + bool _need_dump_all_model = false; + + YAML::Node _model_config; + std::string _train_exe_name; + TrainerContext* _trainer_context = nullptr; + std::vector> _thread_executors; + std::vector> _input_accessors; + paddle::ps::ObjectPool<::paddle::framework::Scope> _scope_obj_pool; + typedef paddle::ps::ObjectPool<::paddle::framework::Scope>::PooledObject ScopePoolObj; +}; + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc index a588b4d16cb3598799872bb3b8441b395295d307..a64a73efebb97a6025e7965c0800de15ba9ac531 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include diff --git a/paddle/fluid/train/custom_trainer/feed/io/file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/file_system.cc index 2014cd23be58056cb619ae0e56766522479826a3..619499555dc3d5be2d7439f313867570d2c5abd8 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/file_system.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include diff --git a/paddle/fluid/train/custom_trainer/feed/io/file_system.h b/paddle/fluid/train/custom_trainer/feed/io/file_system.h index 4a157d697e0f0dcfa3459172eae17b8c042af55b..e3b813349ef6691a12d71a26aaa5607f4390076b 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/file_system.h +++ b/paddle/fluid/train/custom_trainer/feed/io/file_system.h @@ -1,17 +1,3 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - #pragma once #include diff --git a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc index d61e1fe1be08bef601ef695256d610013d8dd457..97c12968d061d2c9040fefe69694769aa42b4d1e 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include diff --git a/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc index 78cd8357c979ee58587e0261802236ab84fddce2..2da715061280119dde1155aeb015d6eb0844feea 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" #include diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.cc b/paddle/fluid/train/custom_trainer/feed/io/shell.cc index 6ed61d67d141bf863d62aab984ee52521639b595..d6fae5d0ab1d0df8c84e04b90a2ddf85e6c3a57b 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/shell.cc +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.cc @@ -1,17 +1,3 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - #include "paddle/fluid/train/custom_trainer/feed/io/shell.h" namespace paddle { @@ -179,14 +165,14 @@ static int shell_popen_fork_internal( if (child_end != child_std_end) { if (dup2(child_end, child_std_end) != child_std_end) { - return -1; + exit(127); } close(child_end); } close_open_fds_internal(); if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) { - return -1; + exit(127); } exit(127); #endif diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.h b/paddle/fluid/train/custom_trainer/feed/io/shell.h index 930d64d515f96348f295631afe2231c4dfe8a3c9..7eca3d8eef61382c0c04b26b796698508974ee75 100644 --- a/paddle/fluid/train/custom_trainer/feed/io/shell.h +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.h @@ -1,17 +1,3 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - #pragma once #include diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc index 2b11f61b3fa15ceb1c1cba9727635cfa1004f0c4..e54c87233388976aa4bc37b96c341048714036cd 100644 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc @@ -14,23 +14,11 @@ namespace feed { int LearnerProcess::initialize(std::shared_ptr context_ptr) { int ret = Process::initialize(context_ptr); auto& config = _context_ptr->trainer_config; - _train_thread_num = config["train_thread_num"].as(); - _threads_executor.resize(_train_thread_num); - if (config["executor"]) { - _executor_num = config["executor"].size(); - omp_set_num_threads(_train_thread_num); - #pragma omp parallel for - for (int i = 0; i < _train_thread_num; ++i) { - _threads_executor[i].resize(_executor_num); - for (int e = 0; e < _executor_num; ++e) { - auto e_class = config["executor"][e]["class"].as(); - auto* e_ptr = CREATE_INSTANCE(Executor, e_class); - _threads_executor[i][e].reset(e_ptr); - if (e_ptr->initialize(config["executor"][e], context_ptr) != 0) { - ret = -1; - } - } + _executors.resize(config["executor"].size()); + for (size_t i = 0; i < _executors.size(); ++i) { + _executors[i].reset(new MultiThreadExecutor()); + CHECK(_executors[i]->initialize(config["executor"][i], context_ptr) == 0); } } return 0; @@ -40,8 +28,7 @@ std::future LearnerProcess::save_model(uint64_t epoch_id, int table_id, Mod std::promise p; auto ret = p.get_future(); if (_context_ptr->epoch_accessor->need_save_model(epoch_id, way)) { - //TODO - //context_ptr->pslib_client()->save(); + LOG(NOTICE) << "save table, table_id:" << table_id; } else { p.set_value(0); } @@ -106,23 +93,22 @@ int LearnerProcess::run() { //Step2. 运行训练网络 { - for (int i = 0; i < _executor_num; ++i) { - std::vector> train_threads(_train_thread_num); - for (int thread_id = 0; thread_id < _train_thread_num; ++thread_id) { - train_threads[i].reset(new std::thread([this](int exe_idx, int thread_idx) { - auto* executor = _threads_executor[thread_idx][exe_idx].get(); - run_executor(executor); - }, i, thread_id)); - } - for (int i = 0; i < _train_thread_num; ++i) { - train_threads[i]->join(); - } + std::map> backup_input_map; + for (auto& executor : _executors) { environment->barrier(EnvironmentRole::WORKER); - - if (_threads_executor[0][i]->is_dump_all_model()) { + auto data_name = executor->train_data_name(); + paddle::framework::Channel input_channel; + if (backup_input_map.count(data_name)) { + input_channel = backup_input_map[data_name]; + } else { + input_channel = dataset->fetch_data(data_name, epoch_id); + } + input_channel = executor->run(input_channel, dataset->data_parser(data_name)); + if (executor->is_dump_all_model()) { already_dump_inference_model = true; wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceDelta); } + backup_input_map[data_name] = input_channel; environment->barrier(EnvironmentRole::WORKER); } } @@ -144,11 +130,6 @@ int LearnerProcess::run() { return 0; } -int LearnerProcess::run_executor(Executor* executor) { - //TODO - return 0; -} - } // namespace feed } // namespace custom_trainer } // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h index 7addb601e9cc2cee07194ae262fe622c00aab4bf..b7e50cc98b65ce0b50615e56944bdd4bf2e7a255 100644 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h @@ -4,13 +4,11 @@ */ #pragma once #include "paddle/fluid/train/custom_trainer/feed/process/process.h" -#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.h" namespace paddle { namespace custom_trainer { namespace feed { - -typedef std::vector> MultiExecutor; class LearnerProcess : public Process { public: LearnerProcess() {} @@ -20,19 +18,13 @@ public: virtual int initialize(std::shared_ptr context_ptr); protected: -//同步保存所有模型 +// 同步保存所有模型 virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way); -//异步保存指定模型 +// 异步保存指定模型 virtual std::future save_model(uint64_t epoch_id, int table_id, ModelSaveWay way); -//执行指定训练网络 -virtual int run_executor(Executor* executor); - - private: - int _executor_num = 0; //需要执行训练的网络个数 - int _train_thread_num = 1;//并行训练线程数 - std::vector _threads_executor; + std::vector> _executors; }; } // namespace feed diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/create_programs.py b/paddle/fluid/train/custom_trainer/feed/scripts/create_programs.py new file mode 100644 index 0000000000000000000000000000000000000000..6f3e16e4f04e9557fc499e2d4f6afd7d6652355a --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/create_programs.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +from __future__ import print_function, division +import os +import sys +import paddle +from paddle import fluid +import yaml + +def print_help(this_name): + """Print help + """ + dirname = os.path.dirname(this_name) + print("Usage: {} [model_dir]\n".format(this_name)) + print(" example: {} {}".format(this_name, os.path.join(dirname, 'example.py'))) + + + +def inference_warpper(filename): + """Build inference network(without loss and optimizer) + Args: + filename: path of file which defined real inference function + Returns: + list: inputs + and + list: outputs + """ + + with open(filename, 'r') as f: + code = f.read() + compiled = compile(code, filename, 'exec') + + scope = dict() + exec(compiled, scope) + return scope['inference']() + +def main(argv): + """Create programs + Args: + argv: arg list, length should be 2 + """ + if len(argv) < 2 or not os.path.exists(argv[1]): + print_help(argv[0]) + exit(1) + network_build_file = argv[1] + + if len(argv) > 2: + model_dir = argv[2] + else: + model_dir = './model' + + main_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(main_program, startup_program): + inputs, outputs = inference_warpper(network_build_file) + + test_program = main_program.clone(for_test=True) + + labels = list() + losses = list() + for output in outputs: + label = fluid.layers.data(name='label_' + output.name, shape=output.shape, dtype='float32') + loss = fluid.layers.square_error_cost(input=output, label=label) + loss = fluid.layers.mean(loss, name='loss_' + output.name) + + labels.append(label) + losses.append(loss) + + loss_all = fluid.layers.sum(losses) + optimizer = fluid.optimizer.SGD(learning_rate=1.0) + params_grads = optimizer.backward(loss_all) + + if not os.path.exists(model_dir): + os.mkdir(model_dir) + + programs = { + 'startup_program': startup_program, + 'main_program': main_program, + 'test_program': test_program, + } + for save_path, program in programs.items(): + with open(os.path.join(model_dir, save_path), 'w') as f: + f.write(program.desc.serialize_to_string()) + + model_desc_path = os.path.join(model_dir, 'model.yaml') + model_desc = { + 'inputs': [{"name": var.name, "shape": var.shape} for var in inputs], + 'outputs': [{"name": var.name, "shape": var.shape, "label_name": label.name, "loss_name": loss.name} for var, label, loss in zip(outputs, labels, losses)], + 'loss_all': loss_all.name, + } + + with open(model_desc_path, 'w') as f: + yaml.safe_dump(model_desc, f, encoding='utf-8', allow_unicode=True) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/paddle/fluid/train/custom_trainer/feed/scripts/example.py b/paddle/fluid/train/custom_trainer/feed/scripts/example.py new file mode 100644 index 0000000000000000000000000000000000000000..6773d036465fa2f6cac897f50bc35a21a90b88f8 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/scripts/example.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +""" +This is an example of network building +""" + +from __future__ import print_function, division +import paddle +from paddle import fluid + +def inference(): + """Build inference network(without loss and optimizer) + + Returns: + list: inputs + and + list: outputs + """ + # TODO: build network here + cvm_input = fluid.layers.data(name='cvm_input', shape=[4488], dtype='float32') + + net = cvm_input + net = fluid.layers.fc(net, 512, act='relu') + net = fluid.layers.fc(net, 256, act='relu') + net = fluid.layers.fc(net, 256, act='relu') + net = fluid.layers.fc(net, 128, act='relu') + net = fluid.layers.fc(net, 128, act='relu') + net = fluid.layers.fc(net, 128, act='relu') + net = fluid.layers.fc(net, 128, act='relu') + + ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr') + return [cvm_input], [ctr_output] diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_create_programs.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_create_programs.cc new file mode 100644 index 0000000000000000000000000000000000000000..d04e2f5cda143c9bf95b7ed08e01cdd3760027d0 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_create_programs.cc @@ -0,0 +1,131 @@ +#include +#include +#include + +#include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/framework/tensor_util.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" +#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +namespace { +const char feed_path[] = "paddle/fluid/train/custom_trainer/feed"; +const char test_data_dir[] = "test_data"; +const char main_program_path[] = "test_data/main_program"; +const char startup_program_path[] = "test_data/startup_program"; +const char model_desc_path[] = "test_data/model.yaml"; +} + +class CreateProgramsTest : public testing::Test +{ +public: + static void SetUpTestCase() + { + std::unique_ptr fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem")); + if (fs->exists("./scripts/create_programs.py")) { + shell_execute(string::format_string("python ./scripts/create_programs.py ./scripts/example.py %s", test_data_dir)); + } else if (fs->exists(string::format_string("%s/scripts/create_programs.py", feed_path))) { + shell_execute(string::format_string("python %s/scripts/create_programs.py %s/scripts/example.py %s", feed_path, feed_path, test_data_dir)); + } + } + + static void TearDownTestCase() + { + std::unique_ptr fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem")); + fs->remove(test_data_dir); + } + + virtual void SetUp() + { + context_ptr.reset(new TrainerContext()); + } + + virtual void TearDown() + { + context_ptr = nullptr; + } + + std::shared_ptr context_ptr; +}; + +TEST_F(CreateProgramsTest, example_network) { + std::unique_ptr executor(CREATE_INSTANCE(Executor, "SimpleExecutor")); + ASSERT_NE(nullptr, executor); + + auto config = YAML::Load(string::format_string("{thread_num: 2, startup_program: %s, main_program: %s}", startup_program_path, main_program_path)); + auto model_desc = YAML::LoadFile(model_desc_path); + ASSERT_EQ(0, executor->initialize(config, context_ptr)); + + std::string input_name = "cvm_input"; + ASSERT_TRUE(model_desc["inputs"]); + ASSERT_EQ(1, model_desc["inputs"].size()); + ASSERT_TRUE(model_desc["inputs"][0]["name"]); + ASSERT_TRUE(model_desc["inputs"][0]["shape"]); + ASSERT_EQ(input_name, model_desc["inputs"][0]["name"].as()); + std::vector input_shape = model_desc["inputs"][0]["shape"].as>(std::vector()); + ASSERT_EQ(2, input_shape.size()); + ASSERT_EQ(-1, input_shape[0]); + ASSERT_EQ(4488, input_shape[1]); + + ASSERT_TRUE(model_desc["loss_all"]); + auto loss_all_name = model_desc["loss_all"].as(); + + ASSERT_TRUE(model_desc["outputs"]); + ASSERT_EQ(1, model_desc["outputs"].size()); + ASSERT_TRUE(model_desc["outputs"][0]["name"]); + ASSERT_TRUE(model_desc["outputs"][0]["shape"]); + ASSERT_TRUE(model_desc["outputs"][0]["label_name"]); + ASSERT_TRUE(model_desc["outputs"][0]["loss_name"]); + + auto ctr_output_label_name = model_desc["outputs"][0]["label_name"].as(); + auto ctr_output_loss_name = model_desc["outputs"][0]["loss_name"].as(); + auto ctr_output_name = model_desc["outputs"][0]["name"].as(); + std::vector output_shape = model_desc["outputs"][0]["shape"].as>(std::vector()); + ASSERT_EQ(2, output_shape.size()); + ASSERT_EQ(-1, output_shape[0]); + ASSERT_EQ(1, output_shape[1]); + + paddle::framework::Scope scope; + executor->initialize_scope(&scope); + auto input_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, input_name); + auto label_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, ctr_output_label_name); + ASSERT_NE(nullptr, input_var); + ASSERT_NE(nullptr, label_var); + + input_var->Resize({1, input_shape[1]}); + auto input_data = input_var->mutable_data(context_ptr->cpu_place); + ASSERT_NE(nullptr, input_data); + for (int i = 0; i < input_shape[1]; ++i) { + input_data[i] = 0.1; + } + + label_var->Resize({1, 1}); + auto label_data = label_var->mutable_data(context_ptr->cpu_place); + ASSERT_NE(nullptr, label_data); + label_data[0] = 0.5; + + ASSERT_EQ(0, executor->run(&scope)); + + auto loss_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, ctr_output_loss_name); + auto loss = loss_var.data()[0]; + + auto loss_all_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, loss_all_name); + auto loss_all = loss_all_var.data()[0]; + + auto ctr_output_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, ctr_output_name); + auto ctr_output = ctr_output_var.data()[0]; + + std::cout << "loss: " << loss << std::endl; + std::cout << "ctr_output: " << ctr_output << std::endl; + ASSERT_NEAR(loss, loss_all, 1e-9); +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc index 0e183d93f7afb7daf59b4c46dbc3fa6659bf2952..03b47a1e950bb8912b9a307fcbcde0872302a0e1 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include #include #include diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc index b181f7b73faa1a4ab0e03bd63b1ed9cb9584c438..bee271ac80736759b0bc72ac57f6113c05629474 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_datareader_omp.cc @@ -1,17 +1,3 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include #include #include @@ -39,6 +25,9 @@ class DataReaderOmpTest : public testing::Test { public: static void SetUpTestCase() { std::unique_ptr fs(CREATE_INSTANCE(FileSystem, "LocalFileSystem")); + if (fs->exists(test_data_dir)) { + fs->remove(test_data_dir); + } fs->mkdir(test_data_dir); shell_set_verbose(true); std_items.clear(); @@ -92,6 +81,15 @@ public: return true; } + static void read_all(framework::Channel& channel, std::vector& items) { + channel->ReadAll(items); + // framework::ChannelReader reader(channel.get()); + // DataItem data_item; + // while (reader >> data_item) { + // items.push_back(std::move(data_item)); + // } + } + static bool is_same_with_std_items(const std::vector& items) { return is_same(items, std_items); } @@ -100,6 +98,14 @@ public: return is_same(items, sorted_std_items); } + static std::string to_string(const std::vector& items) { + std::string items_str = ""; + for (const auto& item : items) { + items_str.append(item.id); + } + return items_str; + } + static std::vector std_items; static std::vector sorted_std_items; std::shared_ptr context_ptr; @@ -130,22 +136,16 @@ TEST_F(DataReaderOmpTest, LineDataReaderSingleThread) { ASSERT_EQ(string::format_string("%s/%s.txt", test_data_dir, std_items[i].id.c_str()), data_file_list[i]); } - int same_count = 0; for (int i = 0; i < n_run; ++i) { auto channel = framework::MakeChannel(128); ASSERT_NE(nullptr, channel); ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel)); std::vector items; - channel->ReadAll(items); + read_all(channel, items); - if (is_same_with_std_items(items)) { - ++same_count; - } + ASSERT_TRUE(is_same_with_std_items(items)); } - - // n_run 次都相同 - ASSERT_EQ(n_run, same_count); } TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) { @@ -181,30 +181,32 @@ TEST_F(DataReaderOmpTest, LineDataReaderMuiltThread) { omp_set_num_threads(4); + channel->SetBlockSize(1); ASSERT_EQ(0, data_reader->read_all(test_data_dir, channel)); std::vector items; - channel->ReadAll(items); + read_all(channel, items); + + ASSERT_EQ(std_items_size, items.size()); if (is_same_with_std_items(items)) { ++same_count; } - + VLOG(5) << "before sort items: " << to_string(items); std::sort(items.begin(), items.end(), [] (const DataItem& a, const DataItem& b) { return a.id < b.id; }); - - if (is_same_with_sorted_std_items(items)) { - ++sort_same_count; + bool is_same_with_std = is_same_with_sorted_std_items(items); + if (!is_same_with_std) { + VLOG(5) << "after sort items: " << to_string(items); } - + // 排序后都是相同的 + ASSERT_TRUE(is_same_with_std); } // n_run次有不同的(证明是多线程) ASSERT_EQ(4, omp_get_max_threads()); ASSERT_GT(n_run, same_count); - // 但排序后都是相同的 - ASSERT_EQ(n_run, sort_same_count); } } // namespace feed diff --git a/paddle/fluid/train/custom_trainer/feed/unit_test/test_executor.cc b/paddle/fluid/train/custom_trainer/feed/unit_test/test_executor.cc index 385d9f95cb9c34b52e7ff568d9f29ac49fa60f56..50313f35983ead3126dc9ebd4daf871a09b9795a 100644 --- a/paddle/fluid/train/custom_trainer/feed/unit_test/test_executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/unit_test/test_executor.cc @@ -1,23 +1,10 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - #include #include #include #include "paddle/fluid/train/custom_trainer/feed/trainer_context.h" #include "paddle/fluid/train/custom_trainer/feed/executor/executor.h" +#include "paddle/fluid/train/custom_trainer/feed/common/scope_helper.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" @@ -62,6 +49,7 @@ public: op->SetInput("X", {"x"}); op->SetOutput("Out", {"mean"}); op->CheckAttrs(); + load_block->Var("mean"); std::ofstream fout(main_program_path, std::ios::out | std::ios::binary); ASSERT_TRUE(fout); fout << main_program->Proto()->SerializeAsString(); @@ -105,14 +93,15 @@ TEST_F(SimpleExecutorTest, run) { auto config = YAML::Load(string::format_string("{thread_num: 2, startup_program: %s, main_program: %s}", startup_program_path, main_program_path)); ASSERT_EQ(0, executor->initialize(config, context_ptr)); - - auto x_var = executor->mutable_var<::paddle::framework::LoDTensor>("x"); - executor->mutable_var<::paddle::framework::LoDTensor>("mean"); + paddle::framework::Scope scope; + executor->initialize_scope(&scope); + auto x_var = ScopeHelper::mutable_var<::paddle::framework::LoDTensor>(&scope, std::string("x")); ASSERT_NE(nullptr, x_var); int x_len = 10; - x_var->Resize({1, x_len}); - auto x_data = x_var->mutable_data(context_ptr->cpu_place); + x_var->Resize({1, x_len}); + auto x_data = x_var->mutable_data(context_ptr->cpu_place); + ASSERT_NE(nullptr, x_data); std::cout << "x: "; for (int i = 0; i < x_len; ++i) { x_data[i] = i; @@ -120,9 +109,9 @@ TEST_F(SimpleExecutorTest, run) { } std::cout << std::endl; - ASSERT_EQ(0, executor->run()); + ASSERT_EQ(0, executor->run(&scope)); - auto mean_var = executor->var<::paddle::framework::LoDTensor>("mean"); + auto mean_var = ScopeHelper::var<::paddle::framework::LoDTensor>(&scope, std::string("mean")); auto mean = mean_var.data()[0]; std::cout << "mean: " << mean << std::endl; ASSERT_NEAR(4.5, mean, 1e-9); diff --git a/release.bcloud b/release.bcloud index 6b6e0e1fd812ccc26388db33221e709a1f538b00..4b37842abb2ce5fa88c7fa30ab19c80d0e783f8a 100755 --- a/release.bcloud +++ b/release.bcloud @@ -5,6 +5,10 @@ cp baidu_third-party_mklml/so/* so rm -rf baidu_third-party_mklml cp baidu_third-party_openmpi/so/* so +ln -s libmpi.so so/libmpi.so.0 +ln -s libmpi_cxx.so so/libmpi_cxx.so.0 +ln -s libopen-pal.so so/libopen-pal.so.0 +ln -s libopen-rte.so so/libopen-rte.so.0 rm -rf baidu_third-party_openmpi rm lib/libfake_paddle_proto.a