diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index f96ff436da997e2035f02b4982c5acd770b024e3..45a914b70eaea45a394e654794ea1a12c48dbe69 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -83,6 +83,10 @@ uint64_t AsyncExecutor::StartServer() { return _pslib_ptr->run_server(); } +void AsyncExecutor::StopServer() { + _pslib_ptr->stop_server(); +} + void AsyncExecutor::GatherServers(std::vector& host_sign_list, int node_num) { _pslib_ptr->gather_servers(host_sign_list.data(), node_num); } diff --git a/paddle/fluid/framework/async_executor.h b/paddle/fluid/framework/async_executor.h index 90d6b46b2f9b2a9fbee2ea3c2cf26599eb3b973c..4b461262173fb8828a4bbf7ebad34715a2f4fa96 100644 --- a/paddle/fluid/framework/async_executor.h +++ b/paddle/fluid/framework/async_executor.h @@ -67,6 +67,7 @@ class AsyncExecutor { void InitWorker(const std::string& dist_desc, std::vector& host_sign_list, int node_num, int index); //void ConfigWorker() {} uint64_t StartServer(); + void StopServer(); void GatherServers(std::vector& host_sign_list, int node_num); void InitModel(); void SaveModel(const std::string& path); diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index d8320b422b800d48c4dd562b438ef81b297b4ec5..a0455b26efd0371f1d3a1be39c986613ac25c182 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -569,7 +569,6 @@ void AsyncExecutorThreadWorker::FillSparse(int table_id) { } void AsyncExecutorThreadWorker::PushSparse(int table_id) { - auto slot_dim = _param_config->slot_dim; //TODO auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO auto& features = _features[table_id]; @@ -592,19 +591,20 @@ void AsyncExecutorThreadWorker::PushSparse(int table_id) { } Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[table_id][slot_idx - 1]); LoDTensor* g_tensor = g_var->GetMutable(); - //int count = g_tensor->numel(); - float* g = g_tensor->data(); - /* - if (FLAGS_scale_sparse_gradient_with_batch_size) { - Eigen::Map g_mat(g, 1, tensor->numel()); - g_mat *= _batch_size; + if (g_tensor == NULL) { + LOG(ERROR) << "var[" << _param_config->gradient_var[table_id][slot_idx - 1] << "] not found"; + exit(-1); } - */ + float* g = g_tensor->data(); Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); LoDTensor* tensor = var->GetMutable(); + if (tensor == NULL) { + LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found"; + exit(-1); + } int len = tensor->lod()[0].back(); - //assert(slot_dim * len == count); + assert(slot_dim * len == g_tensor->numel()); int64_t* ids = tensor->data(); for (auto id_idx = 0u; id_idx < len; ++id_idx){ if (ids[id_idx] == 0) { diff --git a/paddle/fluid/pybind/async_executor_py.cc b/paddle/fluid/pybind/async_executor_py.cc index eca46fbad55970d8b9034ac5ae82859c7223d5f6..8dfba0d2694f222ccbf61c056bd416ad7114a256 100644 --- a/paddle/fluid/pybind/async_executor_py.cc +++ b/paddle/fluid/pybind/async_executor_py.cc @@ -51,6 +51,7 @@ void BindAsyncExecutor(py::module* m) { .def("init_server", &framework::AsyncExecutor::InitServer) .def("init_worker", &framework::AsyncExecutor::InitWorker) .def("start_server", &framework::AsyncExecutor::StartServer) + .def("stop_server", &framework::AsyncExecutor::StopServer) .def("gather_servers", &framework::AsyncExecutor::GatherServers) .def("init_model", &framework::AsyncExecutor::InitModel) .def("save_model", &framework::AsyncExecutor::SaveModel); diff --git a/python/paddle/fluid/async_executor.py b/python/paddle/fluid/async_executor.py index 3451d1edb5426fa092784f616eb6b5cb1392026b..76fdb5b0e263de4517a5ef4b6634dbe0fcee31b3 100644 --- a/python/paddle/fluid/async_executor.py +++ b/python/paddle/fluid/async_executor.py @@ -151,7 +151,10 @@ class AsyncExecutor(object): self.executor.run_from_files(program_desc, data_feed.desc(), filelist, thread_num, fetch_var_names, debug) - self.instance.barrier_all() + self.instance.barrier_all() #worker do all things + if self.instance.is_first_worker(): + self.executor.stop_server() + self.instance.barrier_all() #sync def config_distributed_nodes(self, dist_opt): @@ -164,6 +167,9 @@ class AsyncExecutor(object): def get_instance(self): return self.instance + #def stop_server(self): + # self.executor.stop_server() + def init_server(self, dist_desc): self.executor.init_server(dist_desc, self.instance._rankid) ip = self.executor.start_server() @@ -174,6 +180,7 @@ class AsyncExecutor(object): self.instance.barrier_all() #wait all worker start self.instance.barrier_all() #wait init model self.instance.barrier_all() #wait worker do all things + self.instance.barrier_all() #sync def init_worker(self, dist_desc): self.instance.barrier_all() #wait all server start