提交 7bd16e3a 编写于 作者: H heqiaozhi

fix some bug & add log

上级 5d3ecbfd
...@@ -111,7 +111,7 @@ void AsyncExecutor::InitParamConfig() { ...@@ -111,7 +111,7 @@ void AsyncExecutor::InitParamConfig() {
std::vector<std::string> tmp_sparse_variable_name; std::vector<std::string> tmp_sparse_variable_name;
for (int i = 0u; i < table.slot_value_size(); ++i) { for (int i = 0u; i < table.slot_value_size(); ++i) {
tmp_sparse_variable_name.push_back(table.slot_value(i)); tmp_sparse_variable_name.push_back(table.slot_value(i));
_param_config.slot_alias_to_table[table.slot_value(i)] = table.table_id(); _param_config.slot_alias_to_table[table.slot_key(i)] = table.table_id();
} }
std::vector<std::string> tmp_sparse_gradient_variable_name; std::vector<std::string> tmp_sparse_gradient_variable_name;
for (auto i = 0u; i < table.slot_gradient_size(); ++i) { for (auto i = 0u; i < table.slot_gradient_size(); ++i) {
......
...@@ -330,6 +330,7 @@ void AsyncExecutorThreadWorker::TrainFiles() { ...@@ -330,6 +330,7 @@ void AsyncExecutorThreadWorker::TrainFiles() {
print_fetch_var(thread_scope_, fetch_var_names_[i]); print_fetch_var(thread_scope_, fetch_var_names_[i]);
} // end for (int i = 0...) } // end for (int i = 0...)
} // end while () } // end while ()
LOG(ERROR) << "TRAIN DONE";
} }
void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr<paddle::distributed::PSlib> pslib_ptr) { void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr<paddle::distributed::PSlib> pslib_ptr) {
...@@ -572,24 +573,29 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { ...@@ -572,24 +573,29 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) {
auto slot_dim = _param_config->slot_dim; //TODO auto slot_dim = _param_config->slot_dim; //TODO
auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO
auto& features = _features[table_id]; auto& features = _features[table_id];
CHECK(features.size() < 1000000) << "features size:" << features.size();
//std::vector<std::string> gradient_var; //std::vector<std::string> gradient_var;
//auto& gradient_var = GlobalConfig::instance().input_gradient_variable_name; //TODO //auto& gradient_var = GlobalConfig::instance().input_gradient_variable_name; //TODO
auto& push_g = _feature_push_value[table_id]; auto& push_g = _feature_push_value[table_id];
check_pull_push_memory(features, push_g, fea_dim); check_pull_push_memory(features, push_g, fea_dim);
CHECK(push_g.size() == features.size() + 1) << "push_g size:" << push_g.size() << " features size:" << features.size();
uint64_t fea_idx = 0u; uint64_t fea_idx = 0u;
auto& fea_info = _fea_info[table_id]; //TODO auto& fea_info = _fea_info[table_id];
int offset = 0; int offset = 0;
//if (!_current_train_job.use_cvm_feature()) { //TODO //if (!_current_train_job.use_cvm_feature()) { //TODO
offset = 2; offset = 2;
//} //}
const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias(); const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias();
// slot_idx = 0 is label TODO // slot_idx = 0 is label TODO
for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) {
if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) { if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) == _param_config->slot_alias_to_table.end()) {
LOG(ERROR) << "ERROR slot_idx:" << slot_idx << " name:" << feed_vec[slot_idx];
} else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] != table_id) {
LOG(ERROR) << "ERROR continue";
continue; continue;
} }
Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]);
CHECK(g_var != nullptr) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found";
LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>(); LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>();
if (g_tensor == NULL) { if (g_tensor == NULL) {
LOG(ERROR) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; LOG(ERROR) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found";
...@@ -598,13 +604,16 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { ...@@ -598,13 +604,16 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) {
float* g = g_tensor->data<float>(); float* g = g_tensor->data<float>();
Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]);
CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == NULL) { if (tensor == NULL) {
LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found";
exit(-1); exit(-1);
} }
int len = tensor->lod()[0].back(); //int len = tensor->lod()[0].back();
assert(slot_dim * len == g_tensor->numel()); int len = tensor->numel();
CHECK(slot_dim * len == g_tensor->numel()) << "len:" << len << " g_numel:" << g_tensor->numel();
CHECK(len == tensor->numel()) << "len:" << len << "t_numel:" << tensor->numel();
int64_t* ids = tensor->data<int64_t>(); int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx){ for (auto id_idx = 0u; id_idx < len; ++id_idx){
if (ids[id_idx] == 0) { if (ids[id_idx] == 0) {
...@@ -613,12 +622,13 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { ...@@ -613,12 +622,13 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) {
} }
memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim); memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim);
push_g[fea_idx][0] = 1.0f; push_g[fea_idx][0] = 1.0f;
CHECK(fea_idx < fea_info.size()) << "fea_idx:" << fea_idx << " size:" << fea_info.size();
push_g[fea_idx][1] = static_cast<float>(fea_info[fea_idx].label); push_g[fea_idx][1] = static_cast<float>(fea_info[fea_idx].label);
g += slot_dim; g += slot_dim;
fea_idx++; fea_idx++;
} }
} }
assert(fea_idx == features.size()); CHECK(fea_idx == features.size()) << "fea_idx:" << fea_idx << " features size:" << features.size();
CHECK(features.size() > 0); CHECK(features.size() > 0);
std::vector<float*> push_g_vec; std::vector<float*> push_g_vec;
......
...@@ -49,7 +49,7 @@ struct AsyncWorkerParamConfig { ...@@ -49,7 +49,7 @@ struct AsyncWorkerParamConfig {
std::vector<int> sparse_table_id; std::vector<int> sparse_table_id;
std::map<uint64_t, std::vector<std::string>> slot_input_vec; //6048slot 6050slot //name std::map<uint64_t, std::vector<std::string>> slot_input_vec; //6048slot 6050slot //name
std::map<uint64_t, std::vector<std::string>> gradient_var; //6048slot_embed std::map<uint64_t, std::vector<std::string>> gradient_var; //6048slot_embed
std::unordered_map<std::string, uint64_t> slot_alias_to_table; //TODO done std::map<std::string, uint64_t> slot_alias_to_table; //TODO done
}; };
struct DensePullThreadParam { struct DensePullThreadParam {
......
...@@ -153,7 +153,7 @@ class AsyncExecutor(object): ...@@ -153,7 +153,7 @@ class AsyncExecutor(object):
data_feed.desc(), filelist, thread_num, data_feed.desc(), filelist, thread_num,
fetch_var_names, mode, debug) fetch_var_names, mode, debug)
def download_data(self, afs_path, local_path, fs_default_name, ugi, hadoop_home="$HADOOP_HOME", process_num=12): def download_data(self, afs_path, local_path, fs_default_name, ugi, file_cnt, hadoop_home="$HADOOP_HOME", process_num=12):
if self.instance is None: if self.instance is None:
raise ValueError('instance is None, please run config_distributed_nodes init instance') raise ValueError('instance is None, please run config_distributed_nodes init instance')
...@@ -169,6 +169,7 @@ class AsyncExecutor(object): ...@@ -169,6 +169,7 @@ class AsyncExecutor(object):
local_path, local_path,
self.instance.get_worker_index(), self.instance.get_worker_index(),
self.instance.get_node_cnt() / 2, self.instance.get_node_cnt() / 2,
file_cnt,
multi_processes=process_num) multi_processes=process_num)
#self.instance.barrier_all() #wait for download_data #TODO only barriere worker #self.instance.barrier_all() #wait for download_data #TODO only barriere worker
self.instance.barrier_worker() #wait for download_data #TODO only barriere worker self.instance.barrier_worker() #wait for download_data #TODO only barriere worker
......
...@@ -427,6 +427,7 @@ def multi_download(client, ...@@ -427,6 +427,7 @@ def multi_download(client,
local_path, local_path,
trainer_id, trainer_id,
trainers, trainers,
file_cnt,
multi_processes=5): multi_processes=5):
""" """
multi_download multi_download
...@@ -435,6 +436,7 @@ def multi_download(client, ...@@ -435,6 +436,7 @@ def multi_download(client,
:param local_path: path on local :param local_path: path on local
:param trainer_id: current trainer id :param trainer_id: current trainer id
:param trainers: all trainers number :param trainers: all trainers number
:param file_cnt: all file number
:param multi_processes: the download data process at the same time, default=5 :param multi_processes: the download data process at the same time, default=5
:return: None :return: None
""" """
...@@ -450,7 +452,7 @@ def multi_download(client, ...@@ -450,7 +452,7 @@ def multi_download(client,
client.make_local_dirs(local_path) client.make_local_dirs(local_path)
_logger.info("Make local dir {} successfully".format(local_path)) _logger.info("Make local dir {} successfully".format(local_path))
all_need_download = client.lsr(hdfs_path, sort=True) all_need_download = client.lsr(hdfs_path, sort=True)[:file_cnt]
need_download = all_need_download[trainer_id::trainers] need_download = all_need_download[trainer_id::trainers]
_logger.info("Get {} files From all {} files need to be download from {}". _logger.info("Get {} files From all {} files need to be download from {}".
format(len(need_download), len(all_need_download), hdfs_path)) format(len(need_download), len(all_need_download), hdfs_path))
...@@ -501,6 +503,7 @@ if __name__ == "__main__": ...@@ -501,6 +503,7 @@ if __name__ == "__main__":
"/home/xx/data1", "/home/xx/data1",
1, 1,
5, 5,
100,
multi_processes=5) multi_processes=5)
multi_upload(client, "/user/com/train-25/model", "/home/xx/data1") multi_upload(client, "/user/com/train-25/model", "/home/xx/data1")
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册