diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index f0ca375f950dc98533a4cd79e372eadef9770dc2..6efe5cafe722c34184d48c60b1e05c37529eed2a 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -111,7 +111,7 @@ void AsyncExecutor::InitParamConfig() { std::vector tmp_sparse_variable_name; for (int i = 0u; i < table.slot_value_size(); ++i) { tmp_sparse_variable_name.push_back(table.slot_value(i)); - _param_config.slot_alias_to_table[table.slot_value(i)] = table.table_id(); + _param_config.slot_alias_to_table[table.slot_key(i)] = table.table_id(); } std::vector tmp_sparse_gradient_variable_name; for (auto i = 0u; i < table.slot_gradient_size(); ++i) { diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index a0455b26efd0371f1d3a1be39c986613ac25c182..7004ecf23b0007252a464d31ea6786ebc28924a4 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -330,6 +330,7 @@ void AsyncExecutorThreadWorker::TrainFiles() { print_fetch_var(thread_scope_, fetch_var_names_[i]); } // end for (int i = 0...) } // end while () + LOG(ERROR) << "TRAIN DONE"; } void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { @@ -571,25 +572,30 @@ void AsyncExecutorThreadWorker::FillSparse(int table_id) { void AsyncExecutorThreadWorker::PushSparse(int table_id) { auto slot_dim = _param_config->slot_dim; //TODO auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO - auto& features = _features[table_id]; + auto& features = _features[table_id]; + CHECK(features.size() < 1000000) << "features size:" << features.size(); //std::vector gradient_var; //auto& gradient_var = GlobalConfig::instance().input_gradient_variable_name; //TODO - auto& push_g = _feature_push_value[table_id]; + auto& push_g = _feature_push_value[table_id]; check_pull_push_memory(features, push_g, fea_dim); + CHECK(push_g.size() == features.size() + 1) << "push_g size:" << push_g.size() << " features size:" << features.size(); uint64_t fea_idx = 0u; - auto& fea_info = _fea_info[table_id]; //TODO + auto& fea_info = _fea_info[table_id]; int offset = 0; //if (!_current_train_job.use_cvm_feature()) { //TODO offset = 2; //} - const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); // slot_idx = 0 is label TODO for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { - if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { + if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) { + LOG(ERROR) << "ERROR slot_idx:" << slot_idx << " name:" << feed_vec[slot_idx]; + } else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { + LOG(ERROR) << "ERROR continue"; continue; } - Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); + Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); + CHECK(g_var != nullptr) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; LoDTensor* g_tensor = g_var->GetMutable(); if (g_tensor == NULL) { LOG(ERROR) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; @@ -598,13 +604,16 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { float* g = g_tensor->data(); Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found"; LoDTensor* tensor = var->GetMutable(); if (tensor == NULL) { LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; exit(-1); } - int len = tensor->lod()[0].back(); - assert(slot_dim * len == g_tensor->numel()); + //int len = tensor->lod()[0].back(); + int len = tensor->numel(); + CHECK(slot_dim * len == g_tensor->numel()) << "len:" << len << " g_numel:" << g_tensor->numel(); + CHECK(len == tensor->numel()) << "len:" << len << "t_numel:" << tensor->numel(); int64_t* ids = tensor->data(); for (auto id_idx = 0u; id_idx < len; ++id_idx){ if (ids[id_idx] == 0) { @@ -613,12 +622,13 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { } memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim); push_g[fea_idx][0] = 1.0f; + CHECK(fea_idx < fea_info.size()) << "fea_idx:" << fea_idx << " size:" << fea_info.size(); push_g[fea_idx][1] = static_cast(fea_info[fea_idx].label); g += slot_dim; fea_idx++; } } - assert(fea_idx == features.size()); + CHECK(fea_idx == features.size()) << "fea_idx:" << fea_idx << " features size:" << features.size(); CHECK(features.size() > 0); std::vector push_g_vec; diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index b3ee9dfaec9953f11360eba30dc98fafb8076b79..0c9a47690bec42981c397cc17f25799680dcba9e 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -49,7 +49,7 @@ struct AsyncWorkerParamConfig { std::vector sparse_table_id; std::map> slot_input_vec; //6048slot 6050slot //name std::map> gradient_var; //6048slot_embed - std::unordered_map slot_alias_to_table; //TODO done + std::map slot_alias_to_table; //TODO done }; struct DensePullThreadParam { diff --git a/python/paddle/fluid/async_executor.py b/python/paddle/fluid/async_executor.py index af42d2912fd2fae53dba94d8b439ff1a52b8a87c..13d876e57beec94d13e3f0c03e3f3820962d443f 100644 --- a/python/paddle/fluid/async_executor.py +++ b/python/paddle/fluid/async_executor.py @@ -153,7 +153,7 @@ class AsyncExecutor(object): data_feed.desc(), filelist, thread_num, fetch_var_names, mode, debug) - def download_data(self, afs_path, local_path, fs_default_name, ugi, hadoop_home="$HADOOP_HOME", process_num=12): + def download_data(self, afs_path, local_path, fs_default_name, ugi, file_cnt, hadoop_home="$HADOOP_HOME", process_num=12): if self.instance is None: raise ValueError('instance is None, please run config_distributed_nodes init instance') @@ -169,6 +169,7 @@ class AsyncExecutor(object): local_path, self.instance.get_worker_index(), self.instance.get_node_cnt() / 2, + file_cnt, multi_processes=process_num) #self.instance.barrier_all() #wait for download_data #TODO only barriere worker self.instance.barrier_worker() #wait for download_data #TODO only barriere worker diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py index ff1a2d3e4ad88a237fa1dc74ff5039a9aa456dea..42b4d7feab669f5c041a31a17fd4c2b16b38c19d 100644 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ b/python/paddle/fluid/contrib/utils/hdfs_utils.py @@ -427,6 +427,7 @@ def multi_download(client, local_path, trainer_id, trainers, + file_cnt, multi_processes=5): """ multi_download @@ -435,6 +436,7 @@ def multi_download(client, :param local_path: path on local :param trainer_id: current trainer id :param trainers: all trainers number + :param file_cnt: all file number :param multi_processes: the download data process at the same time, default=5 :return: None """ @@ -450,7 +452,7 @@ def multi_download(client, client.make_local_dirs(local_path) _logger.info("Make local dir {} successfully".format(local_path)) - all_need_download = client.lsr(hdfs_path, sort=True) + all_need_download = client.lsr(hdfs_path, sort=True)[:file_cnt] need_download = all_need_download[trainer_id::trainers] _logger.info("Get {} files From all {} files need to be download from {}". format(len(need_download), len(all_need_download), hdfs_path)) @@ -501,6 +503,7 @@ if __name__ == "__main__": "/home/xx/data1", 1, 5, + 100, multi_processes=5) multi_upload(client, "/user/com/train-25/model", "/home/xx/data1")