diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index f3d66cd88301dd8424ea40e12b3290b2073e7ed8..ab237f768a9394502e7c2bd9a45517b2fd4bf4ec 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -1,6 +1,6 @@ -# windows treat symbolic file as a real file, which is different with unix -# We create a hidden file and compile it instead of origin source file. +#windows treat symbolic file as a real file, which is different with unix +#We create a hidden file and compile it instead of origin source file. function(windows_symbolic TARGET) set(oneValueArgs "") set(multiValueArgs SRCS DEPS) @@ -11,7 +11,7 @@ function(windows_symbolic TARGET) message(FATAL " ${src}.cc and ${src}.cu must exsits, and ${src}.cu must be symbolic file.") endif() - # only copy the xx.cu to .xx.cu when the content are modified +#only copy the xx.cu to.xx.cu when the content are modified set(copy_flag 1) if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.${src}.cu) file(READ ${CMAKE_CURRENT_SOURCE_DIR}/${src}.cc SOURCE_STR) @@ -32,7 +32,7 @@ endfunction() add_subdirectory(ir) add_subdirectory(details) -# ddim lib +#ddim lib proto_library(framework_proto SRCS framework.proto) proto_library(async_executor_proto SRCS data_feed.proto) @@ -89,8 +89,8 @@ nv_test(data_device_transform_test SRCS data_device_transform_test.cu if(WITH_GPU) if (WIN32) - # windows treat symbolic file as a real file, which is different with unix - # We create a hidden file and compile it instead of origin source file. +#windows treat symbolic file as a real file, which is different with unix +#We create a hidden file and compile it instead of origin source file. windows_symbolic(hidden_file SRCS data_type_transform.cu) nv_library(data_type_transform SRCS .data_type_transform.cu DEPS tensor) add_dependencies(data_type_transform hidden_file) @@ -137,7 +137,8 @@ cc_library(op_registry SRCS op_registry.cc DEPS op_proto_maker op_info operator nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry) py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto) -# Generate an empty __init__.py to make framework_py_proto as a valid python module. +#Generate an empty \ + __init__.py to make framework_py_proto as a valid python module. add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_dependencies(framework_py_proto framework_py_proto_init) if (NOT WIN32) diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index 0fe7f3bd5c0c5aef58b5e47da8ce1915ab096417..e2756cafa20c3317f13a2fa9c3e0919b58167d7d 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -30,7 +30,7 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" #include "paddle/fluid/pybind/pybind.h" #ifdef PADDLE_WITH_PSLIB -#include "pslib.h" +#include #endif namespace paddle { @@ -70,50 +70,52 @@ void PrepareReaders(std::vector>& readers, // NOLINT #ifdef PADDLE_WITH_PSLIB void AsyncExecutor::InitServer(const std::string& dist_desc, int index) { - _pslib_ptr = - std::shared_ptr( - new paddle::distributed::PSlib()); - _pslib_ptr->init_server(dist_desc, index); - InitParamConfig(); + _pslib_ptr = std::shared_ptr( + new paddle::distributed::PSlib()); + _pslib_ptr->init_server(dist_desc, index); + InitParamConfig(); } void AsyncExecutor::InitWorker(const std::string& dist_desc, const std::vector& host_sign_list, int node_num, int index) { - _pslib_ptr = std::shared_ptr( - new paddle::distributed::PSlib()); - _pslib_ptr->init_worker( - dist_desc, (uint64_t*)(host_sign_list.data()), node_num, index); + _pslib_ptr = std::shared_ptr( + new paddle::distributed::PSlib()); + _pslib_ptr->init_worker(dist_desc, + static_cast(host_sign_list.data()), + node_num, index); - InitParamConfig(); + InitParamConfig(); } -uint64_t AsyncExecutor::StartServer() { - return _pslib_ptr->run_server(); -} +uint64_t AsyncExecutor::StartServer() { return _pslib_ptr->run_server(); } -void AsyncExecutor::StopServer() { - _pslib_ptr->stop_server(); -} +void AsyncExecutor::StopServer() { _pslib_ptr->stop_server(); } -void AsyncExecutor::GatherServers( - const std::vector& host_sign_list, int node_num) { - _pslib_ptr->gather_servers((uint64_t*)(host_sign_list.data()), node_num); +void AsyncExecutor::GatherServers(const std::vector& host_sign_list, + int node_num) { + _pslib_ptr->gather_servers(static_cast(host_sign_list.data()), + node_num); } void AsyncExecutor::InitParamConfig() { - for (int i = 0; i < - _pslib_ptr->get_param()->server_param(). \ - downpour_server_param(). \ - downpour_table_param_size(); + for (int i = 0; i < _pslib_ptr->get_param() + ->server_param() + .downpour_server_param() + .downpour_table_param_size(); ++i) { - if (_pslib_ptr->get_param()->server_param(). \ - downpour_server_param().downpour_table_param(i). \ - table_class().find("SparseTable") != -1) { - _param_config.fea_dim = _pslib_ptr->get_param()->server_param(). \ - downpour_server_param(). \ - downpour_table_param(i). \ - accessor().fea_dim(); + if (_pslib_ptr->get_param() + ->server_param() + .downpour_server_param() + .downpour_table_param(i) + .table_class() + .find("SparseTable") != -1) { + _param_config.fea_dim = _pslib_ptr->get_param() + ->server_param() + .downpour_server_param() + .downpour_table_param(i) + .accessor() + .fea_dim(); break; } } @@ -122,28 +124,24 @@ void AsyncExecutor::InitParamConfig() { _pslib_ptr->get_param()->trainer_param().push_dense_per_batch()); _param_config.tmp_push_sparse_wait_times = static_cast( _pslib_ptr->get_param()->trainer_param().push_sparse_per_batch()); - - for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().skip_op_size(); + + for (auto t = 0u; t < _pslib_ptr->get_param()->trainer_param().skip_op_size(); ++t) { _param_config.skip_op.push_back( _pslib_ptr->get_param()->trainer_param().skip_op(t)); } - + for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); - ++t) { + t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); ++t) { auto& table = _pslib_ptr->get_param()->trainer_param().sparse_table(t); std::vector tmp_sparse_variable_name; for (int i = 0u; i < table.slot_value_size(); ++i) { tmp_sparse_variable_name.push_back(table.slot_value(i)); - _param_config.slot_alias_to_table[table.slot_key(i)] = - table.table_id(); + _param_config.slot_alias_to_table[table.slot_key(i)] = table.table_id(); } std::vector tmp_sparse_gradient_variable_name; for (auto i = 0u; i < table.slot_gradient_size(); ++i) { - tmp_sparse_gradient_variable_name.push_back( - table.slot_gradient(i)); + tmp_sparse_gradient_variable_name.push_back(table.slot_gradient(i)); } _param_config.slot_input_vec[table.table_id()] = std::move(tmp_sparse_variable_name); @@ -151,10 +149,9 @@ void AsyncExecutor::InitParamConfig() { std::move(tmp_sparse_gradient_variable_name); _param_config.sparse_table_id.push_back(table.table_id()); } - + for (auto t = 0u; - t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); - ++t) { + t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); ++t) { auto& table = _pslib_ptr->get_param()->trainer_param().dense_table(t); std::vector tmp_dense_variable_name; for (int i = 0u; i < table.dense_variable_name_size(); ++i) { @@ -181,26 +178,25 @@ void AsyncExecutor::InitModel() { Variable* var = root_scope_->FindVar(t); CHECK(var != nullptr) << "var[" << t << "] not found"; LoDTensor* tensor = var->GetMutable(); - + float* g = tensor->data(); CHECK(g != nullptr) << "var[" << t << "] value not initialized"; float init_range = 0.2; int rown = tensor->dims()[0]; init_range /= sqrt(rown); - + std::normal_distribution ndistr(0.0, 1.0); for (auto i = 0u; i < tensor->numel(); ++i) { g[i] = ndistr(local_random_engine()) * init_range; } - + paddle::ps::Region reg(g, tensor->numel()); regions.emplace_back(std::move(reg)); } - - auto push_status = - _pslib_ptr->_worker_ptr->push_dense_param( - regions.data(), regions.size(), table_id); + + auto push_status = _pslib_ptr->_worker_ptr->push_dense_param( + regions.data(), regions.size(), table_id); push_status.wait(); auto status = push_status.get(); if (status != 0) { @@ -225,14 +221,14 @@ void AsyncExecutor::SaveModel(const std::string& path) { void AsyncExecutor::PrepareDenseThread(const std::string& mode) { if (mode == "mpi") { DensePullThreadParam param; - param.ps_client = _pslib_ptr->_worker_ptr;; + param.ps_client = _pslib_ptr->_worker_ptr; param.threshold = 1; param.training_thread_num = actual_thread_num; param.root_scope = root_scope_; param.dense_params = &_param_config.dense_variable_name; - - _pull_dense_thread = std::shared_ptr( - new DensePullThread(param)); + + _pull_dense_thread = + std::shared_ptr(new DensePullThread(param)); _pull_dense_thread->start(); } } @@ -243,8 +239,7 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, const std::vector& filelist, const int thread_num, const std::vector& fetch_var_names, - const std::string& mode, - const bool debug) { + const std::string& mode, const bool debug) { std::vector threads; auto& block = main_program.Block(0); @@ -293,9 +288,9 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, for (auto& worker : workers) { #ifdef PADDLE_WITH_PSLIB if (mode == "mpi") { - worker.reset(new AsyncExecutorThreadWorker); + worker.reset(new AsyncExecutorThreadWorker); } else { - worker.reset(new ExecutorThreadWorker); + worker.reset(new ExecutorThreadWorker); } #else worker.reset(new ExecutorThreadWorker); @@ -308,7 +303,6 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, fetch_var_names, root_scope_, thidx, debug); } - // start executing ops in multiple threads for (int thidx = 0; thidx < actual_thread_num; ++thidx) { threads.push_back( diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index 59679842bc17217ac0991caf61d89e01bc63e8a5..a945562926002ba6a5cf19d10cb9eb69e8556daa 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/executor_thread_worker.h" +#include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" @@ -51,7 +52,7 @@ void DensePullThread::run() { if (_pull_dense_status.size() != 0) { wait_all(); } - + usleep(_sleep_time_ms * 1000); } } @@ -77,12 +78,12 @@ std::future DensePullThread::pull_dense(uint64_t table_id) { regions.clear(); auto& variables = _dense_variable_name[table_id]; regions.resize(variables.size()); - + for (auto i = 0u; i < variables.size(); ++i) { auto& t = variables[i]; Variable* var = _root_scope->FindVar(t); LoDTensor* tensor = var->GetMutable(); - + float* w = tensor->data(); paddle::ps::Region reg(w, tensor->numel()); regions[i] = std::move(reg); @@ -95,21 +96,20 @@ void DensePullThread::wait_all() { t.wait(); auto status = t.get(); if (status != 0) { - LOG(WARNING) << "pull dense failed times:" << - ++_pull_dense_fail_times; + LOG(WARNING) << "pull dense failed times:" << ++_pull_dense_fail_times; } } - + if (_pull_dense_fail_times > 20) { LOG(FATAL) << "pull dense failed times more than 20 times"; exit(-1); } - + _pull_dense_status.resize(0); } -void DensePullThread::increase_thread_version( - int thread_id, uint64_t table_id) { +void DensePullThread::increase_thread_version(int thread_id, + uint64_t table_id) { std::lock_guard lock(_mutex_for_version); _training_versions[table_id][thread_id]++; } @@ -174,7 +174,6 @@ void ExecutorThreadWorker::SetFetchVarNames( fetch_var_names.end()); } - void ExecutorThreadWorker::SetDevice() { #if defined _WIN32 || defined __APPLE__ return; @@ -344,15 +343,14 @@ void AsyncExecutorThreadWorker::SetPullDenseThread( } void AsyncExecutorThreadWorker::TrainOneNetwork() { PrepareParams(); - + for (auto& op : ops_) { if (op->Type().find("sgd") != std::string::npos) { continue; } bool need_skip = false; for (auto t = 0u; t < _param_config->skip_op.size(); ++t) { - if (op->Type().find(_param_config->skip_op[t]) != - std::string::npos) { + if (op->Type().find(_param_config->skip_op[t]) != std::string::npos) { need_skip = true; break; } @@ -436,14 +434,13 @@ void AsyncExecutorThreadWorker::PushDense(int table_id) { paddle::ps::Region reg(g, count); regions.emplace_back(std::move(reg)); } - - auto status = _pslib_ptr->_worker_ptr->push_dense( - regions.data(), regions.size(), table_id); + + auto status = _pslib_ptr->_worker_ptr->push_dense(regions.data(), + regions.size(), table_id); _push_dense_status.push_back(std::move(status)); } void AsyncExecutorThreadWorker::PullSparse(int table_id) { - auto& features = _features[table_id]; auto& feature_value = _feature_value[table_id]; auto fea_dim = _param_config->fea_dim; @@ -451,8 +448,7 @@ void AsyncExecutorThreadWorker::PullSparse(int table_id) { features.clear(); features.resize(0); features.reserve(MAX_FEASIGN_NUM); - const std::vector& feed_vec = - thread_reader_->GetUseSlotAlias(); + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); // slot_idx = 0 is label TODO for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); @@ -468,20 +464,20 @@ void AsyncExecutorThreadWorker::PullSparse(int table_id) { features.push_back(static_cast(ids[i])); } } - check_pull_push_memory(features, feature_value, fea_dim); - + check_pull_push_memory(features, &feature_value, fea_dim); + std::vector pull_feature_value; for (auto i = 0u; i < features.size(); ++i) { pull_feature_value.push_back(feature_value[i].data()); } - + auto status = _pslib_ptr->_worker_ptr->pull_sparse( pull_feature_value.data(), table_id, features.data(), features.size()); _pull_sparse_status.push_back(std::move(status)); - + auto& push_g = _feature_push_value[table_id]; - check_pull_push_memory(features, push_g, fea_dim); - + check_pull_push_memory(features, &push_g, fea_dim); + collect_feasign_info(table_id); } @@ -490,15 +486,14 @@ void AsyncExecutorThreadWorker::FillSparse(int table_id) { auto fea_dim = _param_config->fea_dim; auto& features = _features[table_id]; auto& fea_value = _feature_value[table_id]; - + CHECK(features.size() > 0) << "feature size check failed"; - + auto fea_idx = 0u; - + std::vector init_value(fea_dim); - - const std::vector& feed_vec = - thread_reader_->GetUseSlotAlias(); + + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); // slot_idx = 0 is label TODO for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); @@ -508,22 +503,22 @@ void AsyncExecutorThreadWorker::FillSparse(int table_id) { Variable* var_emb = thread_scope_->FindVar( _param_config->slot_input_vec[table_id][slot_idx - 1]); LoDTensor* tensor_emb = var_emb->GetMutable(); - float* ptr = tensor_emb->mutable_data( - {len, slot_dim}, platform::CPUPlace()); + float* ptr = + tensor_emb->mutable_data({len, slot_dim}, platform::CPUPlace()); memset(ptr, 0, sizeof(float) * len * slot_dim); auto& tensor_lod = tensor->lod()[0]; - + LoD data_lod{tensor_lod}; tensor_emb->set_lod(data_lod); - + for (auto index = 0u; index < len; ++index) { if (ids[index] == 0u) { - memcpy(ptr + slot_dim * index, - init_value.data() + 2, sizeof(float) * slot_dim); + memcpy(ptr + slot_dim * index, init_value.data() + 2, + sizeof(float) * slot_dim); continue; } - memcpy(ptr + slot_dim * index, - fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); + memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2, + sizeof(float) * slot_dim); fea_idx++; } } @@ -534,35 +529,38 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { auto fea_dim = _param_config->fea_dim; auto& features = _features[table_id]; auto& push_g = _feature_push_value[table_id]; - check_pull_push_memory(features, push_g, fea_dim); - CHECK(push_g.size() == features.size() + 1) << - "push_g size:" << push_g.size() << " features size:" << features.size(); + check_pull_push_memory(features, &push_g, fea_dim); + CHECK(push_g.size() == features.size() + 1) + << "push_g size:" << push_g.size() + << " features size:" << features.size(); uint64_t fea_idx = 0u; auto& fea_info = _fea_info[table_id]; int offset = 2; const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - // slot_idx = 0 is label + // slot_idx = 0 is label for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - if (_param_config->slot_alias_to_table.find( - feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) { - LOG(ERROR) << "ERROR slot_idx:" << slot_idx << - " name:" << feed_vec[slot_idx]; - } else if ( - _param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { + if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) == + _param_config->slot_alias_to_table.end()) { + LOG(ERROR) << "ERROR slot_idx:" << slot_idx + << " name:" << feed_vec[slot_idx]; + } else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != + table_id) { continue; } Variable* g_var = thread_scope_->FindVar( _param_config->gradient_var[table_id][slot_idx - 1]); - CHECK(g_var != nullptr) << "var[" << - _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; + CHECK(g_var != nullptr) + << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] + << "] not found"; LoDTensor* g_tensor = g_var->GetMutable(); if (g_tensor == NULL) { - LOG(ERROR) << "var[" << - _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; + LOG(ERROR) << "var[" + << _param_config->gradient_var[table_id][slot_idx - 1] + << "] not found"; exit(-1); } float* g = g_tensor->data(); - + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found"; LoDTensor* tensor = var->GetMutable(); @@ -571,42 +569,40 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { exit(-1); } int len = tensor->numel(); - CHECK(slot_dim * len == g_tensor->numel()) << - "len:" << len << " g_numel:" << g_tensor->numel(); - CHECK(len == tensor->numel()) << "len:" << - len << "t_numel:" << tensor->numel(); + CHECK(slot_dim * len == g_tensor->numel()) + << "len:" << len << " g_numel:" << g_tensor->numel(); + CHECK(len == tensor->numel()) << "len:" << len + << "t_numel:" << tensor->numel(); int64_t* ids = tensor->data(); for (auto id_idx = 0u; id_idx < len; ++id_idx) { if (ids[id_idx] == 0) { g += slot_dim; continue; } - memcpy(push_g[fea_idx].data() + offset, - g, sizeof(float) * slot_dim); + memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim); push_g[fea_idx][0] = 1.0f; - CHECK(fea_idx < fea_info.size()) << "fea_idx:" << - fea_idx << " size:" << fea_info.size(); + CHECK(fea_idx < fea_info.size()) << "fea_idx:" << fea_idx + << " size:" << fea_info.size(); push_g[fea_idx][1] = static_cast(fea_info[fea_idx].label); g += slot_dim; fea_idx++; } } - CHECK(fea_idx == features.size()) << "fea_idx:" << - fea_idx << " features size:" << features.size(); + CHECK(fea_idx == features.size()) << "fea_idx:" << fea_idx + << " features size:" << features.size(); CHECK_GT(features.size(), 0); - + std::vector push_g_vec; for (auto i = 0u; i < features.size(); ++i) { push_g_vec.push_back(push_g[i].data()); } auto status = _pslib_ptr->_worker_ptr->push_sparse( - table_id, features.data(), - (const float**)push_g_vec.data(), features.size()); + table_id, features.data(), (const float**)push_g_vec.data(), + features.size()); _push_sparse_status.push_back(std::move(status)); } -void AsyncExecutorThreadWorker::collect_feasign_info( - int table_id) { +void AsyncExecutorThreadWorker::collect_feasign_info(int table_id) { auto& fea_info = _fea_info[table_id]; auto& feature = _features[table_id]; fea_info.resize(feature.size()); @@ -614,13 +610,13 @@ void AsyncExecutorThreadWorker::collect_feasign_info( Variable* var = thread_scope_->FindVar(feed_vec[0]); LoDTensor* tensor = var->GetMutable(); int64_t* label = tensor->data(); - + int global_index = 0; for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); LoDTensor* tensor = var->GetMutable(); int64_t* ids = tensor->data(); - + int fea_idx = 0; for (auto ins_idx = 1u; ins_idx < tensor->lod()[0].size(); ++ins_idx) { for (; fea_idx < tensor->lod()[0][ins_idx]; ++fea_idx) { @@ -628,36 +624,33 @@ void AsyncExecutorThreadWorker::collect_feasign_info( continue; } FeasignInfo info{slot_idx, ins_idx, label[ins_idx - 1]}; - + fea_info[global_index++] = std::move(info); } } } - CHECK(global_index == feature.size()) << - "expect fea info size:" << feature.size() - << " real:" << global_index; + CHECK(global_index == feature.size()) + << "expect fea info size:" << feature.size() << " real:" << global_index; } void AsyncExecutorThreadWorker::check_pull_push_memory( - const std::vector& features, - std::vector>& push_g, - int dim) { - push_g.resize(features.size() + 1); - for (auto& t : push_g) { + const std::vector& features, + std::vector>* push_g, int dim) { + push_g->resize(features.size() + 1); + for (auto& t : *push_g) { t.resize(dim); } } void AsyncExecutorThreadWorker::check_pull_push_memory( - const std::vector& features, - std::vector& push_g, + const std::vector& features, std::vector* push_g, int dim) { - if (features.size() > push_g.size()) { - push_g.reserve(features.size() + 1); - auto size = features.size() - push_g.size() + 1; + if (features.size() > push_g->size()) { + push_g->reserve(features.size() + 1); + auto size = features.size() - push_g->size() + 1; for (auto i = 0u; i < size; ++i) { float* ptr = new float[dim]; - push_g.push_back(ptr); + push_g->push_back(ptr); } } } diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index 20410b4c069b5a730bb3f374888bd2c1aa683d06..30b81ad88035eacc7a8efbe6d20f03d362122003 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -26,7 +26,7 @@ limitations under the License. */ #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #ifdef PADDLE_WITH_PSLIB -#include "pslib.h" +#include #endif namespace paddle { @@ -34,75 +34,74 @@ namespace framework { void CreateTensor(Variable* var, proto::VarType::Type var_type); #ifdef PADDLE_WITH_PSLIB -const static uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100; +static const uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100; struct AsyncWorkerParamConfig { int slot_dim; int fea_dim; int32_t tmp_push_dense_wait_times; int32_t tmp_push_sparse_wait_times; - + std::vector skip_op; - + std::map> dense_variable_name; std::map> dense_gradient_variable_name; - std::vector dense_table_id; + std::vector dense_table_id; // fea_dim for each dense table - std::vector dense_table_size; - std::vector sparse_table_id; + std::vector dense_table_size; + std::vector sparse_table_id; std::map> slot_input_vec; std::map> gradient_var; std::map slot_alias_to_table; }; struct DensePullThreadParam { - std::shared_ptr ps_client; - int threshold; - int training_thread_num; - Scope* root_scope; - std::map>* dense_params; - int sleep_time_ms = 2; + std::shared_ptr ps_client; + int threshold; + int training_thread_num; + Scope* root_scope; + std::map>* dense_params; + int sleep_time_ms = 2; }; class DensePullThread { public: - explicit DensePullThread(const DensePullThreadParam& param) : - _running(false) { + explicit DensePullThread(const DensePullThreadParam& param) + : _running(false) { _ps_client = param.ps_client; _threshold = param.threshold; _thread_num = param.training_thread_num; _root_scope = param.root_scope; _sleep_time_ms = param.sleep_time_ms; - + for (auto& t : *param.dense_params) { - _dense_variable_name[t.first].insert( - _dense_variable_name[t.first].end(), - t.second.begin(), t.second.end()); + _dense_variable_name[t.first].insert(_dense_variable_name[t.first].end(), + t.second.begin(), t.second.end()); _training_versions[t.first].resize(_thread_num, 0); _last_versions[t.first] = 0; _current_version[t.first] = 0; } } - + int start(); - + void stop() { if (_running) { _running = false; _t.join(); } } - + void increase_thread_version(int thread_id, uint64_t table_id); void reset_thread_version(uint64_t table_id); std::future pull_dense(uint64_t table_id); void pull_dense2(uint64_t table_id); void wait_all(); - + private: void run(); bool check_update_param(uint64_t table_id); - + private: std::shared_ptr _ps_client; int _thread_num; @@ -113,33 +112,33 @@ class DensePullThread { std::map _last_versions; std::map _current_version; - std::mutex _mutex_for_version; + std::mutex _mutex_for_version; std::map> _training_versions; std::map> _dense_variable_name; - + std::thread _t; - + std::vector<::std::future> _pull_dense_status; - + std::map> _regions; - uint32_t _pull_dense_fail_times = 0; - - std::vector _base_norm_param; - std::vector _mean; - std::vector _scale; + uint32_t _pull_dense_fail_times = 0; + + std::vector _base_norm_param; + std::vector _mean; + std::vector _scale; float _squared_sum_epsilon = 1e-4; std::mutex _mutex_for_mean_scale; - + float _total_batch_num = 0; }; #endif class ExecutorThreadWorker { public: -ExecutorThreadWorker() - : thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {} + ExecutorThreadWorker() + : thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {} virtual ~ExecutorThreadWorker() {} - + void CreateThreadResource(const framework::ProgramDesc& program, const paddle::platform::Place& place); void SetThreadId(int tid); @@ -161,10 +160,8 @@ ExecutorThreadWorker() #ifdef PADDLE_WITH_PSLIB virtual void SetPSlibPtr( std::shared_ptr pslib_ptr) {} - virtual void SetPullDenseThread( - std::shared_ptr dpt) {} - virtual void SetParamConfig( - AsyncWorkerParamConfig * param_config) {} + virtual void SetPullDenseThread(std::shared_ptr dpt) {} + virtual void SetParamConfig(AsyncWorkerParamConfig* param_config) {} #endif private: @@ -195,7 +192,7 @@ ExecutorThreadWorker() }; #ifdef PADDLE_WITH_PSLIB -class AsyncExecutorThreadWorker: public ExecutorThreadWorker { +class AsyncExecutorThreadWorker : public ExecutorThreadWorker { public: AsyncExecutorThreadWorker() {} virtual ~AsyncExecutorThreadWorker() {} @@ -210,40 +207,35 @@ class AsyncExecutorThreadWorker: public ExecutorThreadWorker { void FillSparse(int table_id); void PushSparse(int table_id); void PushDense(int table_id); - - void check_pull_push_memory( - const std::vector& features, - std::vector& push_g, - int dim); + void check_pull_push_memory(const std::vector& features, - std::vector>& push_g, - int dim); + std::vector* push_g, int dim); + void check_pull_push_memory(const std::vector& features, + std::vector>* push_g, int dim); void collect_feasign_info(int table_id); - + private: struct FeasignInfo { uint32_t slot; uint32_t ins; int64_t label; }; - - std::map> _features; - std::map> _fea_info; + + std::map> _features; + std::map> _fea_info; std::map>> _feature_value; std::map>> _feature_push_value; - - - std::shared_ptr _pslib_ptr; - - std::shared_ptr _pull_dense_thread; - - std::vector<::std::future> _pull_sparse_status; - std::vector<::std::future> _pull_dense_status; - std::vector<::std::future> _push_sparse_status; - std::vector<::std::future> _push_dense_status; - - AsyncWorkerParamConfig* _param_config; - + + std::shared_ptr _pslib_ptr; + + std::shared_ptr _pull_dense_thread; + + std::vector<::std::future> _pull_sparse_status; + std::vector<::std::future> _pull_dense_status; + std::vector<::std::future> _push_sparse_status; + std::vector<::std::future> _push_dense_status; + + AsyncWorkerParamConfig* _param_config; }; #endif