diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index 592a416d6dcc408ea0536169ba03a7a8e8519ac4..412f4a2b6ed7c5516b752f5d91b643d3490c56cb 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -58,7 +58,8 @@ bool DensePullThread::check_update_param(uint64_t table_id) { { std::lock_guard lock(_mutex_for_version); auto& version = _training_versions[table_id]; - _current_version[table_id] = *(std::min_element(version.begin(), version.end())); + _current_version[table_id] = + *(std::min_element(version.begin(), version.end())); } if (_current_version[table_id] - _last_versions[table_id] < _threshold) { return false; @@ -93,7 +94,8 @@ void DensePullThread::wait_all() { t.wait(); auto status = t.get(); if (status != 0) { - LOG(WARNING) << "pull dense failed times:" << ++_pull_dense_fail_times; + LOG(WARNING) << "pull dense failed times:" << + ++_pull_dense_fail_times; } } @@ -105,7 +107,8 @@ void DensePullThread::wait_all() { _pull_dense_status.resize(0); } -void DensePullThread::increase_thread_version(int thread_id, uint64_t table_id) { +void DensePullThread::increase_thread_version( + int thread_id, uint64_t table_id) { std::lock_guard lock(_mutex_for_version); _training_versions[table_id][thread_id]++; } @@ -169,10 +172,6 @@ void ExecutorThreadWorker::SetFetchVarNames( fetch_var_names.end()); } -void ExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { - -} - void ExecutorThreadWorker::SetDevice() { #if defined _WIN32 || defined __APPLE__ @@ -332,10 +331,12 @@ void AsyncExecutorThreadWorker::TrainFiles() { } // end while () } -void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { +void AsyncExecutorThreadWorker::SetPSlibPtr( + std::shared_ptr pslib_ptr) { _pslib_ptr = pslib_ptr; } -void AsyncExecutorThreadWorker::SetPullDenseThread(std::shared_ptr dpt) { +void AsyncExecutorThreadWorker::SetPullDenseThread( + std::shared_ptr dpt) { _pull_dense_thread = dpt; } void AsyncExecutorThreadWorker::TrainOneNetwork() { @@ -347,7 +348,8 @@ void AsyncExecutorThreadWorker::TrainOneNetwork() { } bool need_skip = false; for (auto t = 0u; t < _param_config->skip_op.size(); ++t) { - if (op->Type().find(_param_config->skip_op[t]) != std::string::npos) { + if (op->Type().find(_param_config->skip_op[t]) != + std::string::npos) { need_skip = true; break; } @@ -359,13 +361,13 @@ void AsyncExecutorThreadWorker::TrainOneNetwork() { UpdateParams(); } - -void AsyncExecutorThreadWorker::SetParamConfig(AsyncWorkerParamConfig* param_config) { +void AsyncExecutorThreadWorker::SetParamConfig( + AsyncWorkerParamConfig* param_config) { _param_config = param_config; } void AsyncExecutorThreadWorker::PrepareParams() { - for (auto table_id: _param_config->sparse_table_id) { + for (auto table_id : _param_config->sparse_table_id) { PullSparse(table_id); for (auto& t : _pull_sparse_status) { t.wait(); @@ -378,7 +380,7 @@ void AsyncExecutorThreadWorker::PrepareParams() { } _pull_sparse_status.resize(0); - for (auto table_id: _param_config->sparse_table_id) { + for (auto table_id : _param_config->sparse_table_id) { FillSparse(table_id); } } @@ -440,180 +442,198 @@ void AsyncExecutorThreadWorker::PushDense(int table_id) { void AsyncExecutorThreadWorker::PullSparse(int table_id) { - auto& features = _features[table_id]; - auto& feature_value = _feature_value[table_id]; - auto fea_dim = _param_config->fea_dim; - // slot id starts from 1 - features.clear(); - features.resize(0); - features.reserve(MAX_FEASIGN_NUM); - const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - // slot_idx = 0 is label TODO - for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); - LoDTensor* tensor = var->GetMutable(); - int64_t* ids = tensor->data(); - int len = tensor->numel(); - for (auto i = 0u; i < len; ++i) { - //todo: current trick - filter feasign=use_slot_mod(bug: datafeed fill use_slot_mod for empty slot) - if (ids[i] == 0u) { - continue; - } - features.push_back(static_cast(ids[i])); - } - } - check_pull_push_memory(features, feature_value, fea_dim); - - std::vector pull_feature_value; - for (auto i = 0u; i < features.size(); ++i) { - pull_feature_value.push_back(feature_value[i].data()); - } - for (int i = 0; i < features.size(); ++i) { + auto& features = _features[table_id]; + auto& feature_value = _feature_value[table_id]; + auto fea_dim = _param_config->fea_dim; + // slot id starts from 1 + features.clear(); + features.resize(0); + features.reserve(MAX_FEASIGN_NUM); + const std::vector& feed_vec = + thread_reader_->GetUseSlotAlias(); + // slot_idx = 0 is label TODO + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int64_t* ids = tensor->data(); + int len = tensor->numel(); + for (auto i = 0u; i < len; ++i) { + // todo(colourful-tree): current trick - filter feasign=use_slot_mod( + // bug: datafeed fill use_slot_mod for empty slot) + if (ids[i] == 0u) { + continue; + } + features.push_back(static_cast(ids[i])); } - auto status = _pslib_ptr->_worker_ptr->pull_sparse( - pull_feature_value.data(), table_id, features.data(), features.size()); - _pull_sparse_status.push_back(std::move(status)); - - auto& push_g = _feature_push_value[table_id]; - check_pull_push_memory(features, push_g, fea_dim); - - collect_feasign_info(table_id); + } + check_pull_push_memory(features, feature_value, fea_dim); + + std::vector pull_feature_value; + for (auto i = 0u; i < features.size(); ++i) { + pull_feature_value.push_back(feature_value[i].data()); + } + + auto status = _pslib_ptr->_worker_ptr->pull_sparse( + pull_feature_value.data(), table_id, features.data(), features.size()); + _pull_sparse_status.push_back(std::move(status)); + + auto& push_g = _feature_push_value[table_id]; + check_pull_push_memory(features, push_g, fea_dim); + + collect_feasign_info(table_id); } void AsyncExecutorThreadWorker::FillSparse(int table_id) { - auto slot_dim = _param_config->slot_dim; - auto fea_dim = _param_config->fea_dim; - auto& features = _features[table_id]; - auto& fea_value = _feature_value[table_id]; - - CHECK(features.size() > 0) << "feature size check failed"; - - auto fea_idx = 0u; - - std::vector init_value(fea_dim); - - const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - // slot_idx = 0 is label TODO - for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); - LoDTensor* tensor = var->GetMutable(); - int64_t* ids = tensor->data(); - int len = tensor->numel(); - Variable* var_emb = thread_scope_->FindVar(_param_config->slot_input_vec[table_id][slot_idx - 1]); - LoDTensor* tensor_emb = var_emb->GetMutable(); - float* ptr = tensor_emb->mutable_data({len, slot_dim}, platform::CPUPlace()); - memset(ptr, 0, sizeof(float) * len * slot_dim); - auto& tensor_lod = tensor->lod()[0]; - - LoD data_lod{tensor_lod}; - tensor_emb->set_lod(data_lod); - - for (auto index = 0u; index < len; ++index){ - if (ids[index] == 0u) { - memcpy(ptr + slot_dim * index, init_value.data() + 2, sizeof(float) * slot_dim); - continue; - } - memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); - fea_idx++; - } + auto slot_dim = _param_config->slot_dim; + auto fea_dim = _param_config->fea_dim; + auto& features = _features[table_id]; + auto& fea_value = _feature_value[table_id]; + + CHECK(features.size() > 0) << "feature size check failed"; + + auto fea_idx = 0u; + + std::vector init_value(fea_dim); + + const std::vector& feed_vec = + thread_reader_->GetUseSlotAlias(); + // slot_idx = 0 is label TODO + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int64_t* ids = tensor->data(); + int len = tensor->numel(); + Variable* var_emb = thread_scope_->FindVar( + _param_config->slot_input_vec[table_id][slot_idx - 1]); + LoDTensor* tensor_emb = var_emb->GetMutable(); + float* ptr = tensor_emb->mutable_data( + {len, slot_dim}, platform::CPUPlace()); + memset(ptr, 0, sizeof(float) * len * slot_dim); + auto& tensor_lod = tensor->lod()[0]; + + LoD data_lod{tensor_lod}; + tensor_emb->set_lod(data_lod); + + for (auto index = 0u; index < len; ++index) { + if (ids[index] == 0u) { + memcpy(ptr + slot_dim * index, + init_value.data() + 2, sizeof(float) * slot_dim); + continue; + } + memcpy(ptr + slot_dim * index, + fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); + fea_idx++; } + } } void AsyncExecutorThreadWorker::PushSparse(int table_id) { - auto slot_dim = _param_config->slot_dim; - auto fea_dim = _param_config->fea_dim; - auto& features = _features[table_id]; - CHECK(features.size() < 1000000) << "features size is too big, may be wrong:" << features.size(); - auto& push_g = _feature_push_value[table_id]; - check_pull_push_memory(features, push_g, fea_dim); - CHECK(push_g.size() == features.size() + 1) << "push_g size:" << push_g.size() << " features size:" << features.size(); - uint64_t fea_idx = 0u; - auto& fea_info = _fea_info[table_id]; - int offset = 2; - const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - // slot_idx = 0 is label - for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) { - LOG(ERROR) << "ERROR slot_idx:" << slot_idx << " name:" << feed_vec[slot_idx]; - } else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { - continue; - } - Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); - CHECK(g_var != nullptr) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; - LoDTensor* g_tensor = g_var->GetMutable(); - if (g_tensor == NULL) { - LOG(ERROR) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; - exit(-1); - } - float* g = g_tensor->data(); - - Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); - CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found"; - LoDTensor* tensor = var->GetMutable(); - if (tensor == NULL) { - LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; - exit(-1); - } - int len = tensor->numel(); - CHECK(slot_dim * len == g_tensor->numel()) << "len:" << len << " g_numel:" << g_tensor->numel(); - CHECK(len == tensor->numel()) << "len:" << len << "t_numel:" << tensor->numel(); - int64_t* ids = tensor->data(); - for (auto id_idx = 0u; id_idx < len; ++id_idx){ - if (ids[id_idx] == 0) { - g += slot_dim; - continue; - } - memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim); - push_g[fea_idx][0] = 1.0f; - CHECK(fea_idx < fea_info.size()) << "fea_idx:" << fea_idx << " size:" << fea_info.size(); - push_g[fea_idx][1] = static_cast(fea_info[fea_idx].label); - g += slot_dim; - fea_idx++; - } + auto slot_dim = _param_config->slot_dim; + auto fea_dim = _param_config->fea_dim; + auto& features = _features[table_id]; + auto& push_g = _feature_push_value[table_id]; + check_pull_push_memory(features, push_g, fea_dim); + CHECK(push_g.size() == features.size() + 1) << + "push_g size:" << push_g.size() << " features size:" << features.size(); + uint64_t fea_idx = 0u; + auto& fea_info = _fea_info[table_id]; + int offset = 2; + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + // slot_idx = 0 is label + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + if (_param_config->slot_alias_to_table.find( + feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) { + LOG(ERROR) << "ERROR slot_idx:" << slot_idx << + " name:" << feed_vec[slot_idx]; + } else if ( + _param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { + continue; } - CHECK(fea_idx == features.size()) << "fea_idx:" << fea_idx << " features size:" << features.size(); - CHECK(features.size() > 0); - - std::vector push_g_vec; - for (auto i = 0u; i < features.size(); ++i) { - push_g_vec.push_back(push_g[i].data()); + Variable* g_var = thread_scope_->FindVar( + _param_config->gradient_var[table_id][slot_idx - 1]); + CHECK(g_var != nullptr) << "var[" << + _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; + LoDTensor* g_tensor = g_var->GetMutable(); + if (g_tensor == NULL) { + LOG(ERROR) << "var[" << + _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; + exit(-1); + } + float* g = g_tensor->data(); + + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found"; + LoDTensor* tensor = var->GetMutable(); + if (tensor == NULL) { + LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; + exit(-1); + } + int len = tensor->numel(); + CHECK(slot_dim * len == g_tensor->numel()) << + "len:" << len << " g_numel:" << g_tensor->numel(); + CHECK(len == tensor->numel()) << "len:" << + len << "t_numel:" << tensor->numel(); + int64_t* ids = tensor->data(); + for (auto id_idx = 0u; id_idx < len; ++id_idx) { + if (ids[id_idx] == 0) { + g += slot_dim; + continue; + } + memcpy(push_g[fea_idx].data() + offset, + g, sizeof(float) * slot_dim); + push_g[fea_idx][0] = 1.0f; + CHECK(fea_idx < fea_info.size()) << "fea_idx:" << + fea_idx << " size:" << fea_info.size(); + push_g[fea_idx][1] = static_cast(fea_info[fea_idx].label); + g += slot_dim; + fea_idx++; } - auto status = _pslib_ptr->_worker_ptr->push_sparse( - table_id, features.data(), (const float**)push_g_vec.data(), features.size()); - _push_sparse_status.push_back(std::move(status)); + } + CHECK(fea_idx == features.size()) << "fea_idx:" << + fea_idx << " features size:" << features.size(); + CHECK_GT(features.size(), 0); + + std::vector push_g_vec; + for (auto i = 0u; i < features.size(); ++i) { + push_g_vec.push_back(push_g[i].data()); + } + auto status = _pslib_ptr->_worker_ptr->push_sparse( + table_id, features.data(), + (const float**)push_g_vec.data(), features.size()); + _push_sparse_status.push_back(std::move(status)); } void AsyncExecutorThreadWorker::collect_feasign_info( - int table_id) { - auto& fea_info = _fea_info[table_id]; - auto& feature = _features[table_id]; - fea_info.resize(feature.size()); - - const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - Variable* var = thread_scope_->FindVar(feed_vec[0]); + int table_id) { + auto& fea_info = _fea_info[table_id]; + auto& feature = _features[table_id]; + fea_info.resize(feature.size()); + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + Variable* var = thread_scope_->FindVar(feed_vec[0]); + LoDTensor* tensor = var->GetMutable(); + int64_t* label = tensor->data(); + + int global_index = 0; + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); LoDTensor* tensor = var->GetMutable(); - int64_t* label = tensor->data(); - - int global_index = 0; - for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); - LoDTensor* tensor = var->GetMutable(); - int64_t* ids = tensor->data(); - - int fea_idx = 0; - for (auto ins_idx = 1u; ins_idx < tensor->lod()[0].size(); ++ins_idx) { - for (; fea_idx < tensor->lod()[0][ins_idx]; ++fea_idx) { - if (ids[fea_idx] == 0u) { - continue; - } - FeasignInfo info{slot_idx, ins_idx, label[ins_idx - 1]}; - - fea_info[global_index++] = std::move(info); - } + int64_t* ids = tensor->data(); + + int fea_idx = 0; + for (auto ins_idx = 1u; ins_idx < tensor->lod()[0].size(); ++ins_idx) { + for (; fea_idx < tensor->lod()[0][ins_idx]; ++fea_idx) { + if (ids[fea_idx] == 0u) { + continue; } + FeasignInfo info{slot_idx, ins_idx, label[ins_idx - 1]}; + + fea_info[global_index++] = std::move(info); + } } - CHECK(global_index == feature.size()) << "expect fea info size:" << feature.size() - << " real:" << global_index; + } + CHECK(global_index == feature.size()) << + "expect fea info size:" << feature.size() + << " real:" << global_index; } void AsyncExecutorThreadWorker::check_pull_push_memory( diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index 4e9c2622b0e42e499d4fe4c836b3c37ba67bec91..b6c4f950ecc05171056092a1a871a7f976728f12 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -35,21 +35,22 @@ const static uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100; void CreateTensor(Variable* var, proto::VarType::Type var_type); struct AsyncWorkerParamConfig { - int slot_dim; - int fea_dim; - int32_t tmp_push_dense_wait_times; - int32_t tmp_push_sparse_wait_times; - - std::vector skip_op; - - std::map> dense_variable_name; - std::map> dense_gradient_variable_name; - std::vector dense_table_id; - std::vector dense_table_size; // fea_dim for each dense table - std::vector sparse_table_id; - std::map> slot_input_vec; //6048slot 6050slot //name - std::map> gradient_var; //6048slot_embed - std::map slot_alias_to_table; //TODO done + int slot_dim; + int fea_dim; + int32_t tmp_push_dense_wait_times; + int32_t tmp_push_sparse_wait_times; + + std::vector skip_op; + + std::map> dense_variable_name; + std::map> dense_gradient_variable_name; + std::vector dense_table_id; + // fea_dim for each dense table + std::vector dense_table_size; + std::vector sparse_table_id; + std::map> slot_input_vec; + std::map> gradient_var; + std::map slot_alias_to_table; }; struct DensePullThreadParam { @@ -62,8 +63,8 @@ struct DensePullThreadParam { }; class DensePullThread { -public: - DensePullThread(DensePullThreadParam& param) : + public: + explicit DensePullThread(const DensePullThreadParam& param) : _running(false) { _ps_client = param.ps_client; _threshold = param.threshold; @@ -96,11 +97,11 @@ public: void pull_dense2(uint64_t table_id); void wait_all(); -private: + private: void run(); bool check_update_param(uint64_t table_id); -private: + private: std::shared_ptr _ps_client; int _thread_num; int _threshold; @@ -153,9 +154,13 @@ class ExecutorThreadWorker { virtual void TrainFiles(); // set fetch variable names from python interface assigned by users void SetFetchVarNames(const std::vector& fetch_var_names); - virtual void SetPSlibPtr(std::shared_ptr pslib_ptr); - virtual void SetPullDenseThread(std::shared_ptr dpt) {}; - virtual void SetParamConfig(AsyncWorkerParamConfig* param_config) {}; + virtual void SetPSlibPtr( + std::shared_ptr pslib_ptr); + virtual void SetPullDenseThread( + std::shared_ptr dpt) {} + virtual void SetParamConfig( + AsyncWorkerParamConfig * param_config) {} + private: void CreateThreadScope(const framework::ProgramDesc& program); void CreateThreadOperators(const framework::ProgramDesc& program); @@ -178,32 +183,37 @@ class ExecutorThreadWorker { Scope* root_scope_; // a thread scope, father scope is global score which is shared Scope* thread_scope_; - //private: std::vector fetch_var_names_; std::vector> fetch_values_; bool debug_; }; class AsyncExecutorThreadWorker: public ExecutorThreadWorker { -public: - AsyncExecutorThreadWorker(){}; - virtual ~AsyncExecutorThreadWorker() {} - void SetPSlibPtr(std::shared_ptr pslib_ptr); - void SetPullDenseThread(std::shared_ptr dpt); - void SetParamConfig(AsyncWorkerParamConfig* param_config); - void TrainFiles(); - void TrainOneNetwork(); - void PrepareParams(); - void UpdateParams(); - void PullSparse(int table_id); - void FillSparse(int table_id); - void PushSparse(int table_id); - void PushDense(int table_id); - - void check_pull_push_memory(std::vector& features, std::vector& push_g, int dim); - void check_pull_push_memory(std::vector& features, std::vector>& push_g, int dim); + public: + AsyncExecutorThreadWorker() {} + virtual ~AsyncExecutorThreadWorker() {} + void SetPSlibPtr(std::shared_ptr pslib_ptr); + void SetPullDenseThread(std::shared_ptr dpt); + void SetParamConfig(AsyncWorkerParamConfig* param_config); + void TrainFiles(); + void TrainOneNetwork(); + void PrepareParams(); + void UpdateParams(); + void PullSparse(int table_id); + void FillSparse(int table_id); + void PushSparse(int table_id); + void PushDense(int table_id); + + void check_pull_push_memory( + const std::vector& features, + std::vector& push_g, + int dim); + void check_pull_push_memory(const std::vector& features, + std::vector>& push_g, + int dim); void collect_feasign_info(int table_id); -private: + + private: struct FeasignInfo { uint32_t slot; uint32_t ins;