From c9b799896e6b78a4248cd8c9288ab6adacf628ad Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Thu, 13 Dec 2018 15:33:45 +0800 Subject: [PATCH] fix tag in async_executor --- paddle/fluid/framework/async_executor.cc | 238 ++++++++--------- paddle/fluid/framework/async_executor.h | 5 +- .../fluid/framework/executor_thread_worker.cc | 249 +++++++++--------- .../fluid/framework/executor_thread_worker.h | 178 ++++++------- 4 files changed, 336 insertions(+), 334 deletions(-) diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index fe6488f4b6f..0fe7f3bd5c0 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -102,139 +102,139 @@ void AsyncExecutor::GatherServers( } void AsyncExecutor::InitParamConfig() { - for (int i = 0; i < - _pslib_ptr->get_param()->server_param().\ - downpour_server_param().\ - downpour_table_param_size(); - ++i) { - if (_pslib_ptr->get_param()->server_param().\ - downpour_server_param().downpour_table_param(i).\ - table_class().find("SparseTable") != -1) { - _param_config.fea_dim = _pslib_ptr->get_param()->server_param().\ - downpour_server_param().\ - downpour_table_param(i).\ - accessor().fea_dim(); - break; - } + for (int i = 0; i < + _pslib_ptr->get_param()->server_param(). \ + downpour_server_param(). \ + downpour_table_param_size(); + ++i) { + if (_pslib_ptr->get_param()->server_param(). \ + downpour_server_param().downpour_table_param(i). \ + table_class().find("SparseTable") != -1) { + _param_config.fea_dim = _pslib_ptr->get_param()->server_param(). \ + downpour_server_param(). \ + downpour_table_param(i). \ + accessor().fea_dim(); + break; } - _param_config.slot_dim = _param_config.fea_dim - 2; - _param_config.tmp_push_dense_wait_times = static_cast( - _pslib_ptr->get_param()->trainer_param().push_dense_per_batch()); - _param_config.tmp_push_sparse_wait_times = static_cast( - _pslib_ptr->get_param()->trainer_param().push_sparse_per_batch()); - - for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().skip_op_size(); - ++t) { - _param_config.skip_op.push_back( - _pslib_ptr->get_param()->trainer_param().skip_op(t)); + } + _param_config.slot_dim = _param_config.fea_dim - 2; + _param_config.tmp_push_dense_wait_times = static_cast( + _pslib_ptr->get_param()->trainer_param().push_dense_per_batch()); + _param_config.tmp_push_sparse_wait_times = static_cast( + _pslib_ptr->get_param()->trainer_param().push_sparse_per_batch()); + + for (auto t = 0u; + t < _pslib_ptr->get_param()->trainer_param().skip_op_size(); + ++t) { + _param_config.skip_op.push_back( + _pslib_ptr->get_param()->trainer_param().skip_op(t)); + } + + for (auto t = 0u; + t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); + ++t) { + auto& table = _pslib_ptr->get_param()->trainer_param().sparse_table(t); + std::vector tmp_sparse_variable_name; + for (int i = 0u; i < table.slot_value_size(); ++i) { + tmp_sparse_variable_name.push_back(table.slot_value(i)); + _param_config.slot_alias_to_table[table.slot_key(i)] = + table.table_id(); } - - for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); - ++t) { - auto& table = _pslib_ptr->get_param()->trainer_param().sparse_table(t); - std::vector tmp_sparse_variable_name; - for (int i = 0u; i < table.slot_value_size(); ++i) { - tmp_sparse_variable_name.push_back(table.slot_value(i)); - _param_config.slot_alias_to_table[table.slot_key(i)] = - table.table_id(); - } - std::vector tmp_sparse_gradient_variable_name; - for (auto i = 0u; i < table.slot_gradient_size(); ++i) { - tmp_sparse_gradient_variable_name.push_back( - table.slot_gradient(i)); - } - _param_config.slot_input_vec[table.table_id()] = - std::move(tmp_sparse_variable_name); - _param_config.gradient_var[table.table_id()] = - std::move(tmp_sparse_gradient_variable_name); - _param_config.sparse_table_id.push_back(table.table_id()); + std::vector tmp_sparse_gradient_variable_name; + for (auto i = 0u; i < table.slot_gradient_size(); ++i) { + tmp_sparse_gradient_variable_name.push_back( + table.slot_gradient(i)); } - - for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); - ++t) { - auto& table = _pslib_ptr->get_param()->trainer_param().dense_table(t); - std::vector tmp_dense_variable_name; - for (int i = 0u; i < table.dense_variable_name_size(); ++i) { - tmp_dense_variable_name.push_back(table.dense_variable_name(i)); - } - std::vector tmp_dense_gradient_variable_name; - for (auto i = 0u; i < table.dense_gradient_variable_name_size(); ++i) { - tmp_dense_gradient_variable_name.push_back( - table.dense_gradient_variable_name(i)); - } - _param_config.dense_variable_name[table.table_id()] = - std::move(tmp_dense_variable_name); - _param_config.dense_gradient_variable_name[table.table_id()] = - std::move(tmp_dense_gradient_variable_name); - _param_config.dense_table_id.push_back(table.table_id()); - _param_config.dense_table_size.push_back(table.fea_dim()); + _param_config.slot_input_vec[table.table_id()] = + std::move(tmp_sparse_variable_name); + _param_config.gradient_var[table.table_id()] = + std::move(tmp_sparse_gradient_variable_name); + _param_config.sparse_table_id.push_back(table.table_id()); + } + + for (auto t = 0u; + t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); + ++t) { + auto& table = _pslib_ptr->get_param()->trainer_param().dense_table(t); + std::vector tmp_dense_variable_name; + for (int i = 0u; i < table.dense_variable_name_size(); ++i) { + tmp_dense_variable_name.push_back(table.dense_variable_name(i)); + } + std::vector tmp_dense_gradient_variable_name; + for (auto i = 0u; i < table.dense_gradient_variable_name_size(); ++i) { + tmp_dense_gradient_variable_name.push_back( + table.dense_gradient_variable_name(i)); } + _param_config.dense_variable_name[table.table_id()] = + std::move(tmp_dense_variable_name); + _param_config.dense_gradient_variable_name[table.table_id()] = + std::move(tmp_dense_gradient_variable_name); + _param_config.dense_table_id.push_back(table.table_id()); + _param_config.dense_table_size.push_back(table.fea_dim()); + } } void AsyncExecutor::InitModel() { - for (auto table_id : _param_config.dense_table_id) { - std::vector regions; - for (auto& t : _param_config.dense_variable_name[table_id]) { - Variable* var = root_scope_->FindVar(t); - CHECK(var != nullptr) << "var[" << t << "] not found"; - LoDTensor* tensor = var->GetMutable(); - - float* g = tensor->data(); - CHECK(g != nullptr) << "var[" << t << "] value not initialized"; - - float init_range = 0.2; - int rown = tensor->dims()[0]; - init_range /= sqrt(rown); - - std::normal_distribution ndistr(0.0, 1.0); - for (auto i = 0u; i < tensor->numel(); ++i) { - g[i] = ndistr(local_random_engine()) * init_range; - } - - paddle::ps::Region reg(g, tensor->numel()); - regions.emplace_back(std::move(reg)); - } - - auto push_status = - _pslib_ptr->_worker_ptr->push_dense_param( - regions.data(), regions.size(), table_id); - push_status.wait(); - auto status = push_status.get(); - if (status != 0) { - LOG(FATAL) << "push dense param failed, status[" << status << "]"; - exit(-1); - } + for (auto table_id : _param_config.dense_table_id) { + std::vector regions; + for (auto& t : _param_config.dense_variable_name[table_id]) { + Variable* var = root_scope_->FindVar(t); + CHECK(var != nullptr) << "var[" << t << "] not found"; + LoDTensor* tensor = var->GetMutable(); + + float* g = tensor->data(); + CHECK(g != nullptr) << "var[" << t << "] value not initialized"; + + float init_range = 0.2; + int rown = tensor->dims()[0]; + init_range /= sqrt(rown); + + std::normal_distribution ndistr(0.0, 1.0); + for (auto i = 0u; i < tensor->numel(); ++i) { + g[i] = ndistr(local_random_engine()) * init_range; + } + + paddle::ps::Region reg(g, tensor->numel()); + regions.emplace_back(std::move(reg)); } + + auto push_status = + _pslib_ptr->_worker_ptr->push_dense_param( + regions.data(), regions.size(), table_id); + push_status.wait(); + auto status = push_status.get(); + if (status != 0) { + LOG(FATAL) << "push dense param failed, status[" << status << "]"; + exit(-1); + } + } } void AsyncExecutor::SaveModel(const std::string& path) { - auto ret = _pslib_ptr->_worker_ptr->flush(); - ret.wait(); - ret = _pslib_ptr->_worker_ptr->save(path, 0); - ret.wait(); - int32_t feasign_cnt = ret.get(); - if (feasign_cnt == -1) { // (colourful-tree) TODO should be feasign_cnt < 0 - LOG(FATAL) << "save model failed"; - exit(-1); - } + auto ret = _pslib_ptr->_worker_ptr->flush(); + ret.wait(); + ret = _pslib_ptr->_worker_ptr->save(path, 0); + ret.wait(); + int32_t feasign_cnt = ret.get(); + if (feasign_cnt == -1) { // (colourful-tree) TODO should be feasign_cnt < 0 + LOG(FATAL) << "save model failed"; + exit(-1); + } } void AsyncExecutor::PrepareDenseThread(const std::string& mode) { - if (mode == "mpi") { - DensePullThreadParam param; - param.ps_client = _pslib_ptr->_worker_ptr;; - param.threshold = 1; - param.training_thread_num = actual_thread_num; - param.root_scope = root_scope_; - param.dense_params = &_param_config.dense_variable_name; - - _pull_dense_thread = std::shared_ptr( - new DensePullThread(param)); - _pull_dense_thread->start(); - } + if (mode == "mpi") { + DensePullThreadParam param; + param.ps_client = _pslib_ptr->_worker_ptr;; + param.threshold = 1; + param.training_thread_num = actual_thread_num; + param.root_scope = root_scope_; + param.dense_params = &_param_config.dense_variable_name; + + _pull_dense_thread = std::shared_ptr( + new DensePullThread(param)); + _pull_dense_thread->start(); + } } #endif diff --git a/paddle/fluid/framework/async_executor.h b/paddle/fluid/framework/async_executor.h index d6f16d91338..12642126411 100644 --- a/paddle/fluid/framework/async_executor.h +++ b/paddle/fluid/framework/async_executor.h @@ -45,7 +45,8 @@ inline std::default_random_engine& local_random_engine() { engine_wrapper_t() { static std::atomic x(0); std::seed_seq sseq = {x++, x++, x++, - static_cast(current_realtime() * 1000)}; + static_cast( + current_realtime() * 1000)}; engine.seed(sseq); } }; @@ -77,6 +78,7 @@ class AsyncExecutor { void SaveModel(const std::string& path); void InitParamConfig(); #endif + private: void CreateThreads(ExecutorThreadWorker* worker, const ProgramDesc& main_program, @@ -87,6 +89,7 @@ class AsyncExecutor { #ifdef PADDLE_WITH_PSLIB void PrepareDenseThread(const std::string& mode); #endif + public: #ifdef PADDLE_WITH_PSLIB std::shared_ptr _pslib_ptr; diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index a58c2692204..59679842bc1 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -33,87 +33,87 @@ namespace framework { #ifdef PADDLE_WITH_PSLIB int DensePullThread::start() { - _running = true; - _t = std::thread(&DensePullThread::run, this); - return 0; + _running = true; + _t = std::thread(&DensePullThread::run, this); + return 0; } void DensePullThread::run() { - while (_running) { - _pull_dense_status.resize(0); - for (auto& t : _dense_variable_name) { - if (check_update_param(t.first)) { - auto status = pull_dense(t.first); - _pull_dense_status.emplace_back(std::move(status)); - reset_thread_version(t.first); - } - } - if (_pull_dense_status.size() != 0) { - wait_all(); - } - - usleep(_sleep_time_ms * 1000); + while (_running) { + _pull_dense_status.resize(0); + for (auto& t : _dense_variable_name) { + if (check_update_param(t.first)) { + auto status = pull_dense(t.first); + _pull_dense_status.emplace_back(std::move(status)); + reset_thread_version(t.first); + } + } + if (_pull_dense_status.size() != 0) { + wait_all(); } + + usleep(_sleep_time_ms * 1000); + } } bool DensePullThread::check_update_param(uint64_t table_id) { - { - std::lock_guard lock(_mutex_for_version); - auto& version = _training_versions[table_id]; - _current_version[table_id] = - *(std::min_element(version.begin(), version.end())); - } - if (_current_version[table_id] - _last_versions[table_id] < _threshold) { - return false; - } - return true; + { + std::lock_guard lock(_mutex_for_version); + auto& version = _training_versions[table_id]; + _current_version[table_id] = + *(std::min_element(version.begin(), version.end())); + } + if (_current_version[table_id] - _last_versions[table_id] < _threshold) { + return false; + } + return true; } void DensePullThread::reset_thread_version(uint64_t table_id) { - std::lock_guard lock(_mutex_for_version); - _last_versions[table_id] = _current_version[table_id]; + std::lock_guard lock(_mutex_for_version); + _last_versions[table_id] = _current_version[table_id]; } std::future DensePullThread::pull_dense(uint64_t table_id) { - auto& regions = _regions[table_id]; - regions.clear(); - auto& variables = _dense_variable_name[table_id]; - regions.resize(variables.size()); - - for (auto i = 0u; i < variables.size(); ++i) { - auto& t = variables[i]; - Variable* var = _root_scope->FindVar(t); - LoDTensor* tensor = var->GetMutable(); - - float* w = tensor->data(); - paddle::ps::Region reg(w, tensor->numel()); - regions[i] = std::move(reg); - } - return _ps_client->pull_dense(regions.data(), regions.size(), table_id); + auto& regions = _regions[table_id]; + regions.clear(); + auto& variables = _dense_variable_name[table_id]; + regions.resize(variables.size()); + + for (auto i = 0u; i < variables.size(); ++i) { + auto& t = variables[i]; + Variable* var = _root_scope->FindVar(t); + LoDTensor* tensor = var->GetMutable(); + + float* w = tensor->data(); + paddle::ps::Region reg(w, tensor->numel()); + regions[i] = std::move(reg); + } + return _ps_client->pull_dense(regions.data(), regions.size(), table_id); } void DensePullThread::wait_all() { - for (auto& t : _pull_dense_status) { - t.wait(); - auto status = t.get(); - if (status != 0) { - LOG(WARNING) << "pull dense failed times:" << - ++_pull_dense_fail_times; - } + for (auto& t : _pull_dense_status) { + t.wait(); + auto status = t.get(); + if (status != 0) { + LOG(WARNING) << "pull dense failed times:" << + ++_pull_dense_fail_times; } - - if (_pull_dense_fail_times > 20) { - LOG(FATAL) << "pull dense failed times more than 20 times"; - exit(-1); - } - - _pull_dense_status.resize(0); + } + + if (_pull_dense_fail_times > 20) { + LOG(FATAL) << "pull dense failed times more than 20 times"; + exit(-1); + } + + _pull_dense_status.resize(0); } void DensePullThread::increase_thread_version( int thread_id, uint64_t table_id) { - std::lock_guard lock(_mutex_for_version); - _training_versions[table_id][thread_id]++; + std::lock_guard lock(_mutex_for_version); + _training_versions[table_id][thread_id]++; } -#endif +#endif void ExecutorThreadWorker::CreateThreadOperators(const ProgramDesc& program) { auto& block = program.Block(0); @@ -336,56 +336,56 @@ void AsyncExecutorThreadWorker::TrainFiles() { void AsyncExecutorThreadWorker::SetPSlibPtr( std::shared_ptr pslib_ptr) { - _pslib_ptr = pslib_ptr; + _pslib_ptr = pslib_ptr; } void AsyncExecutorThreadWorker::SetPullDenseThread( std::shared_ptr dpt) { - _pull_dense_thread = dpt; + _pull_dense_thread = dpt; } void AsyncExecutorThreadWorker::TrainOneNetwork() { - PrepareParams(); - - for (auto& op : ops_) { - if (op->Type().find("sgd") != std::string::npos) { - continue; - } - bool need_skip = false; - for (auto t = 0u; t < _param_config->skip_op.size(); ++t) { - if (op->Type().find(_param_config->skip_op[t]) != - std::string::npos) { - need_skip = true; - break; - } - } - if (!need_skip) { - op->Run(*thread_scope_, place_); - } + PrepareParams(); + + for (auto& op : ops_) { + if (op->Type().find("sgd") != std::string::npos) { + continue; + } + bool need_skip = false; + for (auto t = 0u; t < _param_config->skip_op.size(); ++t) { + if (op->Type().find(_param_config->skip_op[t]) != + std::string::npos) { + need_skip = true; + break; + } + } + if (!need_skip) { + op->Run(*thread_scope_, place_); } - UpdateParams(); + } + UpdateParams(); } void AsyncExecutorThreadWorker::SetParamConfig( AsyncWorkerParamConfig* param_config) { - _param_config = param_config; + _param_config = param_config; } void AsyncExecutorThreadWorker::PrepareParams() { - for (auto table_id : _param_config->sparse_table_id) { - PullSparse(table_id); - for (auto& t : _pull_sparse_status) { - t.wait(); - auto status = t.get(); - if (status != 0) { - LOG(ERROR) << "pull sparse failed, status[" << status << "]"; - exit(-1); - } - } + for (auto table_id : _param_config->sparse_table_id) { + PullSparse(table_id); + for (auto& t : _pull_sparse_status) { + t.wait(); + auto status = t.get(); + if (status != 0) { + LOG(ERROR) << "pull sparse failed, status[" << status << "]"; + exit(-1); + } } - _pull_sparse_status.resize(0); + } + _pull_sparse_status.resize(0); - for (auto table_id : _param_config->sparse_table_id) { - FillSparse(table_id); - } + for (auto table_id : _param_config->sparse_table_id) { + FillSparse(table_id); + } } void AsyncExecutorThreadWorker::UpdateParams() { @@ -426,21 +426,20 @@ void AsyncExecutorThreadWorker::UpdateParams() { } void AsyncExecutorThreadWorker::PushDense(int table_id) { - std::vector regions; - for (auto& t : _param_config->dense_gradient_variable_name[table_id]) { - Variable* var = thread_scope_->FindVar(t); - CHECK(var != nullptr) << "var[" << t << "] not found"; - LoDTensor* tensor = var->GetMutable(); - int count = tensor->numel(); - float* g = tensor->data(); - paddle::ps::Region reg(g, count); - regions.emplace_back(std::move(reg)); - } - - auto status = _pslib_ptr->_worker_ptr->push_dense( - regions.data(), regions.size(), table_id); - _push_dense_status.push_back(std::move(status)); - + std::vector regions; + for (auto& t : _param_config->dense_gradient_variable_name[table_id]) { + Variable* var = thread_scope_->FindVar(t); + CHECK(var != nullptr) << "var[" << t << "] not found"; + LoDTensor* tensor = var->GetMutable(); + int count = tensor->numel(); + float* g = tensor->data(); + paddle::ps::Region reg(g, count); + regions.emplace_back(std::move(reg)); + } + + auto status = _pslib_ptr->_worker_ptr->push_dense( + regions.data(), regions.size(), table_id); + _push_dense_status.push_back(std::move(status)); } void AsyncExecutorThreadWorker::PullSparse(int table_id) { @@ -643,24 +642,24 @@ void AsyncExecutorThreadWorker::check_pull_push_memory( const std::vector& features, std::vector>& push_g, int dim) { - push_g.resize(features.size() + 1); - for (auto& t : push_g) { - t.resize(dim); - } + push_g.resize(features.size() + 1); + for (auto& t : push_g) { + t.resize(dim); + } } void AsyncExecutorThreadWorker::check_pull_push_memory( - const std::vector& features, - std::vector& push_g, - int dim) { - if (features.size() > push_g.size()) { - push_g.reserve(features.size() + 1); - auto size = features.size() - push_g.size() + 1; - for (auto i = 0u; i < size; ++i) { - float* ptr = new float[dim]; - push_g.push_back(ptr); - } + const std::vector& features, + std::vector& push_g, + int dim) { + if (features.size() > push_g.size()) { + push_g.reserve(features.size() + 1); + auto size = features.size() - push_g.size() + 1; + for (auto i = 0u; i < size; ++i) { + float* ptr = new float[dim]; + push_g.push_back(ptr); } + } } #endif diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index c23eb09470d..20410b4c069 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -67,79 +67,79 @@ struct DensePullThreadParam { class DensePullThread { public: explicit DensePullThread(const DensePullThreadParam& param) : - _running(false) { - _ps_client = param.ps_client; - _threshold = param.threshold; - _thread_num = param.training_thread_num; - _root_scope = param.root_scope; - _sleep_time_ms = param.sleep_time_ms; - - for (auto& t : *param.dense_params) { - _dense_variable_name[t.first].insert( - _dense_variable_name[t.first].end(), - t.second.begin(), t.second.end()); - _training_versions[t.first].resize(_thread_num, 0); - _last_versions[t.first] = 0; - _current_version[t.first] = 0; - } + _running(false) { + _ps_client = param.ps_client; + _threshold = param.threshold; + _thread_num = param.training_thread_num; + _root_scope = param.root_scope; + _sleep_time_ms = param.sleep_time_ms; + + for (auto& t : *param.dense_params) { + _dense_variable_name[t.first].insert( + _dense_variable_name[t.first].end(), + t.second.begin(), t.second.end()); + _training_versions[t.first].resize(_thread_num, 0); + _last_versions[t.first] = 0; + _current_version[t.first] = 0; } - - int start(); - - void stop() { - if (_running) { - _running = false; - _t.join(); - } + } + + int start(); + + void stop() { + if (_running) { + _running = false; + _t.join(); } - - void increase_thread_version(int thread_id, uint64_t table_id); - void reset_thread_version(uint64_t table_id); - std::future pull_dense(uint64_t table_id); - void pull_dense2(uint64_t table_id); - void wait_all(); - + } + + void increase_thread_version(int thread_id, uint64_t table_id); + void reset_thread_version(uint64_t table_id); + std::future pull_dense(uint64_t table_id); + void pull_dense2(uint64_t table_id); + void wait_all(); + private: - void run(); - bool check_update_param(uint64_t table_id); - + void run(); + bool check_update_param(uint64_t table_id); + private: - std::shared_ptr _ps_client; - int _thread_num; - int _threshold; - int _sleep_time_ms; - Scope* _root_scope; - bool _running; - - std::map _last_versions; - std::map _current_version; - std::mutex _mutex_for_version; - std::map> _training_versions; - std::map> _dense_variable_name; - - std::thread _t; - - std::vector<::std::future> _pull_dense_status; - - std::map> _regions; - uint32_t _pull_dense_fail_times = 0; - - std::vector _base_norm_param; - std::vector _mean; - std::vector _scale; - float _squared_sum_epsilon = 1e-4; - std::mutex _mutex_for_mean_scale; - - float _total_batch_num = 0; + std::shared_ptr _ps_client; + int _thread_num; + int _threshold; + int _sleep_time_ms; + Scope* _root_scope; + bool _running; + + std::map _last_versions; + std::map _current_version; + std::mutex _mutex_for_version; + std::map> _training_versions; + std::map> _dense_variable_name; + + std::thread _t; + + std::vector<::std::future> _pull_dense_status; + + std::map> _regions; + uint32_t _pull_dense_fail_times = 0; + + std::vector _base_norm_param; + std::vector _mean; + std::vector _scale; + float _squared_sum_epsilon = 1e-4; + std::mutex _mutex_for_mean_scale; + + float _total_batch_num = 0; }; #endif class ExecutorThreadWorker { public: - ExecutorThreadWorker() - : thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {} +ExecutorThreadWorker() + : thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {} virtual ~ExecutorThreadWorker() {} - + void CreateThreadResource(const framework::ProgramDesc& program, const paddle::platform::Place& place); void SetThreadId(int tid); @@ -160,7 +160,7 @@ class ExecutorThreadWorker { void SetFetchVarNames(const std::vector& fetch_var_names); #ifdef PADDLE_WITH_PSLIB virtual void SetPSlibPtr( - std::shared_ptr pslib_ptr) {}; + std::shared_ptr pslib_ptr) {} virtual void SetPullDenseThread( std::shared_ptr dpt) {} virtual void SetParamConfig( @@ -218,32 +218,32 @@ class AsyncExecutorThreadWorker: public ExecutorThreadWorker { void check_pull_push_memory(const std::vector& features, std::vector>& push_g, int dim); - void collect_feasign_info(int table_id); - + void collect_feasign_info(int table_id); + private: - struct FeasignInfo { - uint32_t slot; - uint32_t ins; - int64_t label; - }; - - std::map> _features; - std::map> _fea_info; - std::map>> _feature_value; - std::map>> _feature_push_value; - - - std::shared_ptr _pslib_ptr; - - std::shared_ptr _pull_dense_thread; - - std::vector<::std::future> _pull_sparse_status; - std::vector<::std::future> _pull_dense_status; - std::vector<::std::future> _push_sparse_status; - std::vector<::std::future> _push_dense_status; - - AsyncWorkerParamConfig* _param_config; - + struct FeasignInfo { + uint32_t slot; + uint32_t ins; + int64_t label; + }; + + std::map> _features; + std::map> _fea_info; + std::map>> _feature_value; + std::map>> _feature_push_value; + + + std::shared_ptr _pslib_ptr; + + std::shared_ptr _pull_dense_thread; + + std::vector<::std::future> _pull_sparse_status; + std::vector<::std::future> _pull_dense_status; + std::vector<::std::future> _push_sparse_status; + std::vector<::std::future> _push_dense_status; + + AsyncWorkerParamConfig* _param_config; + }; #endif -- GitLab