diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 28fc6f0611d78d98b62e525840e5ac334a55b1d9..310a6e2beb52a7a89f33f2f3fc73bd671b3a448d 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -155,6 +155,7 @@ class DownpourWorker : public HogwildWorker { virtual ~DownpourWorker() {} virtual void Initialize(const TrainerDesc& desc); virtual void TrainFiles(); + virtual void TrainFilesWithProfiler(); protected: std::shared_ptr fleet_ptr_; diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 4f8d15adc38c5d7ac11130f5275446a3cd78b714..60b8930bbb6efb5e9adb8064bb3a94b7f9501169 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -44,6 +44,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, pull_dense_worker_ = PullDenseWorker::GetInstance(); pull_dense_worker_->Initialize(trainer_desc); VLOG(3) << "initialize pull dense worker"; + SetDebug(trainer_desc.debug()); } void DistMultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 966588c262f3a2dc05f64438d53dee6d3cdf54e9..475574f251541991e5dccbf585536df9f3bf8792 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -70,7 +70,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { void DownpourWorker::CollectLabelInfo(size_t table_idx) { uint64_t table_id = static_cast( - param_.program_config(0).pull_sparse_table_id(table_idx)); + param_.program_config(0).pull_sparse_table_id(table_idx)); TableParameter table; for (auto i : param_.sparse_table()) { @@ -82,16 +82,23 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { auto& feature = features_[table_id]; auto& feature_label = feature_labels_[table_id]; feature_label.resize(feature.size()); + VLOG(3) << "going to get label_var_name " << label_var_name_[table_id]; Variable* var = thread_scope_->FindVar(label_var_name_[table_id]); + VLOG(3) << "going to get tensor"; LoDTensor* tensor = var->GetMutable(); + VLOG(3) << "going to get ptr"; int64_t* label_ptr = tensor->data(); + VLOG(3) << "lele"; int global_index = 0; for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) { + VLOG(3) << "sparse_key_names_[" << i + << "]: " << sparse_key_names_[table_id][i]; Variable* fea_var = thread_scope_->FindVar(sparse_key_names_[table_id][i]); LoDTensor* tensor = fea_var->GetMutable(); int64_t* ids = tensor->data(); int fea_idx = 0; + VLOG(3) << "Haha"; // tensor->lod()[0].size() == batch_size + 1 for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) { for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) { @@ -103,6 +110,7 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { static_cast(label_ptr[lod_idx - 1]); } } + VLOG(3) << "EE"; } CHECK(global_index == feature.size()) << "expect fea info size:" << feature.size() << " real:" << global_index; @@ -110,7 +118,7 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { void DownpourWorker::FillSparseValue(size_t table_idx) { uint64_t table_id = static_cast( - param_.program_config(0).pull_sparse_table_id(table_idx)); + param_.program_config(0).pull_sparse_table_id(table_idx)); TableParameter table; for (auto i : param_.sparse_table()) { @@ -152,6 +160,11 @@ void DownpourWorker::FillSparseValue(size_t table_idx) { } } +void DownpourWorker::TrainFilesWithProfiler() { + VLOG(3) << "Begin to train files with profiler"; + platform::SetNumThreads(1); +} + void DownpourWorker::TrainFiles() { VLOG(3) << "Begin to train files"; platform::SetNumThreads(1); diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index a5edbe5fb3bc7519de3f85986f4825af5eed1418..30d6311728b4f677bbf64edf9a10ef224302f23a 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -41,6 +41,7 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, } // set debug here + SetDebug(trainer_desc.debug()); } // call only after all resources are set in current trainer @@ -57,8 +58,13 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program, void MultiTrainer::Run() { VLOG(3) << "Going to run"; for (int thidx = 0; thidx < thread_num_; ++thidx) { - threads_.push_back( - std::thread(&DeviceWorker::TrainFiles, workers_[thidx].get())); + if (!debug_) { + threads_.push_back( + std::thread(&DeviceWorker::TrainFiles, workers_[thidx].get())); + } else { + threads_.push_back(std::thread(&DeviceWorker::TrainFilesWithProfiler, + workers_[thidx].get())); + } } } diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 2a40f777446d99a830391cf28ef43d20c3e898e8..f422d226ca525f7b50e00f85928b212384aa152e 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -30,6 +30,7 @@ message TrainerDesc { repeated string filelist = 5; repeated string fetch_var_names = 6; optional int32 batch_per_print = 7 [ default = 100 ]; + optional bool debug = 8 [ default = false ]; // device worker parameters optional HogwildWorkerParameter hogwild_param = 101; diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 7c61a5f1a3236e4bd81d56d4b6fd3bfa7b3d341e..e2e0f5ff10d3b0000956bb9fe95a90fb4628936e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -28,6 +28,7 @@ class Fleet(object): def __init__(self): self._opt_info = None # for fleet only self.role_maker_ = None + self.local_ip_ = 0 def init(self): # TODO(guru4elephant) @@ -57,9 +58,12 @@ class Fleet(object): self._fleet_ptr.init_server(self._dist_desc_str, self.role_maker_.get_rank()) self.local_ip_ = self._fleet_ptr.run_server() + self.role_maker_.barrier_all() self.all_ips_ = self.role_maker_.all_gather(self.local_ip_) + self._fleet_ptr.gather_servers(self.all_ips_, self.role_maker_.get_size()) + # wait all workers start self.role_maker_.barrier_all() else: print("You should run DistributedOptimizer.minimize() first") @@ -74,10 +78,12 @@ class Fleet(object): else: print("You should run DistributedOptimizer.minimize() first") sys.exit(-1) - self.role_maker_.barrier_all() - self._fleet_ptr.init_worker(self._dist_desc_str, [0], + self.role_maker_.barrier_all() # wait for server starts + self.all_ips_ = self.role_maker_.all_gather(self.local_ip_) + self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_, self.role_maker_.get_size(), self.role_maker_.get_rank()) + self.role_maker_.barrier_all() self.role_maker_.barrier_worker() else: print("You should run DistributedOptimizer.minimize() first")