diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index 86ac93be3e4b1b92d090b3a056ee1d5303067e6b..592a416d6dcc408ea0536169ba03a7a8e8519ac4 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -303,7 +303,7 @@ void ExecutorThreadWorker::SetRootScope(Scope* g_scope) { root_scope_ = g_scope; } -//AsyncExecutor +// AsyncExecutor void AsyncExecutorThreadWorker::TrainFiles() { SetDevice(); @@ -330,7 +330,6 @@ void AsyncExecutorThreadWorker::TrainFiles() { print_fetch_var(thread_scope_, fetch_var_names_[i]); } // end for (int i = 0...) } // end while () - LOG(ERROR) << "TRAIN DONE"; } void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { @@ -360,44 +359,12 @@ void AsyncExecutorThreadWorker::TrainOneNetwork() { UpdateParams(); } -void AsyncExecutorThreadWorker::BindingSlotVariableMemory() { - /* - std::vector ins_slot_offset(batch_size + 1, 0); - for (auto i = 1u; i <= batch_size; ++i) { - ins_slot_offset[i] += ins_slot_offset[i - 1] + slot_dim; - } - - std::vector tensor_lod(batch_size + 1, 0); - for (auto i = 1u; i <= batch_size; ++i) { - tensor_lod[i] += tensor_lod[i - 1] + 1; - } - - auto& used_slots = reader->get_use_slot_alias(); - slot_input_vec.resize(used_slots.size() - 1); - for (auto slot_idx = 1u; slot_idx < used_slots.size(); ++slot_idx) { - auto var = slot_input_variable_name[slot_idx]; - - auto v = thread_scope->FindVar(var); - CHECK(v != nullptr) << "var[" << var << "] not found"; - - LoDTensor* tensor = v->GetMutable(); - float* tensor_ptr = tensor->mutable_data({batch_size, slot_dim}, platform::CPUPlace()); - memset(tensor_ptr, 0, sizeof(float) * ins_slot_offset.back()); - - LoD data_lod{tensor_lod}; - tensor->set_lod(data_lod); - - slot_input_vec[slot_idx - 1].reset(tensor); - } - */ -} void AsyncExecutorThreadWorker::SetParamConfig(AsyncWorkerParamConfig* param_config) { _param_config = param_config; } void AsyncExecutorThreadWorker::PrepareParams() { - //int table_id = 0; //TODO for (auto table_id: _param_config->sparse_table_id) { PullSparse(table_id); for (auto& t : _pull_sparse_status) { @@ -423,9 +390,7 @@ void AsyncExecutorThreadWorker::UpdateParams() { for (auto i : _param_config->dense_table_id) { PushDense(i); } - // _param_config->tmp_push_dense_wait_times int32_t tmp_push_dense_wait_times = -1; - // _param_config->tmp_push_sparse_wait_times int32_t tmp_push_sparse_wait_times = -1; static uint32_t push_dense_wait_times = static_cast(tmp_push_dense_wait_times); @@ -509,17 +474,15 @@ void AsyncExecutorThreadWorker::PullSparse(int table_id) { pull_feature_value.data(), table_id, features.data(), features.size()); _pull_sparse_status.push_back(std::move(status)); - //to save time auto& push_g = _feature_push_value[table_id]; check_pull_push_memory(features, push_g, fea_dim); - //binding_slot_embed_with_concat(); TODO - collect_feasign_info(table_id); //TODO + collect_feasign_info(table_id); } void AsyncExecutorThreadWorker::FillSparse(int table_id) { - auto slot_dim = _param_config->slot_dim; // TODO - auto fea_dim = _param_config->fea_dim; //TODO + auto slot_dim = _param_config->slot_dim; + auto fea_dim = _param_config->fea_dim; auto& features = _features[table_id]; auto& fea_value = _feature_value[table_id]; @@ -544,53 +507,35 @@ void AsyncExecutorThreadWorker::FillSparse(int table_id) { LoD data_lod{tensor_lod}; tensor_emb->set_lod(data_lod); - //float* ptr = tensor_emb->data(); for (auto index = 0u; index < len; ++index){ - //if (_current_train_job.use_cvm_feature()) { - // if (ids[index] == 0u) { - // memcpy(ptr + slot_dim * index, init_value.data(), sizeof(float) * slot_dim); - // continue; - // } - // memcpy(ptr + slot_dim * index, fea_value[fea_idx].data(), sizeof(float) * slot_dim); - // (ptr + slot_dim * index)[0] = log((ptr + slot_dim * index)[0] + 1); - // (ptr + slot_dim * index)[1] = log((ptr + slot_dim * index)[1] + 1) - (ptr + slot_dim * index)[0]; - // fea_idx++; - //} else { - if (ids[index] == 0u) { - memcpy(ptr + slot_dim * index, init_value.data() + 2, sizeof(float) * slot_dim); - continue; - } - memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); - fea_idx++; - //} + if (ids[index] == 0u) { + memcpy(ptr + slot_dim * index, init_value.data() + 2, sizeof(float) * slot_dim); + continue; + } + memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); + fea_idx++; } } } void AsyncExecutorThreadWorker::PushSparse(int table_id) { - auto slot_dim = _param_config->slot_dim; //TODO - auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO + auto slot_dim = _param_config->slot_dim; + auto fea_dim = _param_config->fea_dim; auto& features = _features[table_id]; - CHECK(features.size() < 1000000) << "features size:" << features.size(); - //std::vector gradient_var; - //auto& gradient_var = GlobalConfig::instance().input_gradient_variable_name; //TODO + CHECK(features.size() < 1000000) << "features size is too big, may be wrong:" << features.size(); auto& push_g = _feature_push_value[table_id]; check_pull_push_memory(features, push_g, fea_dim); CHECK(push_g.size() == features.size() + 1) << "push_g size:" << push_g.size() << " features size:" << features.size(); uint64_t fea_idx = 0u; auto& fea_info = _fea_info[table_id]; - int offset = 0; - //if (!_current_train_job.use_cvm_feature()) { //TODO - offset = 2; - //} + int offset = 2; const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); - // slot_idx = 0 is label TODO + // slot_idx = 0 is label for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) { LOG(ERROR) << "ERROR slot_idx:" << slot_idx << " name:" << feed_vec[slot_idx]; } else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { - LOG(ERROR) << "ERROR continue"; continue; } Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); @@ -609,7 +554,6 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; exit(-1); } - //int len = tensor->lod()[0].back(); int len = tensor->numel(); CHECK(slot_dim * len == g_tensor->numel()) << "len:" << len << " g_numel:" << g_tensor->numel(); CHECK(len == tensor->numel()) << "len:" << len << "t_numel:" << tensor->numel(); diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index 0c9a47690bec42981c397cc17f25799680dcba9e..4e9c2622b0e42e499d4fe4c836b3c37ba67bec91 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -155,7 +155,6 @@ class ExecutorThreadWorker { void SetFetchVarNames(const std::vector& fetch_var_names); virtual void SetPSlibPtr(std::shared_ptr pslib_ptr); virtual void SetPullDenseThread(std::shared_ptr dpt) {}; - virtual void BindingSlotVariableMemory() {}; virtual void SetParamConfig(AsyncWorkerParamConfig* param_config) {}; private: void CreateThreadScope(const framework::ProgramDesc& program); @@ -191,7 +190,6 @@ public: virtual ~AsyncExecutorThreadWorker() {} void SetPSlibPtr(std::shared_ptr pslib_ptr); void SetPullDenseThread(std::shared_ptr dpt); - void BindingSlotVariableMemory(); void SetParamConfig(AsyncWorkerParamConfig* param_config); void TrainFiles(); void TrainOneNetwork();