diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 60b8930bbb6efb5e9adb8064bb3a94b7f9501169..0c42f5bf69e5d132a5ce09ed6105877cbf3e0cd5 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -53,6 +53,18 @@ void DistMultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { VLOG(3) << "init other env done."; } +void DistMultiTrainer::Run() { + for (int thidx = 0; thidx < thread_num_; ++thidx) { + if (!debug_) { + threads_.push_back( + std::thread(&DeviceWorker::TrainFiles, workers_[thidx].get())); + } else { + threads_.push_back(std::thread(&DeviceWorker::TrainFilesWithProfiler, + workers_[thidx].get())); + } + } +} + void DistMultiTrainer::Finalize() { for (auto& th : threads_) { th.join(); diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 475574f251541991e5dccbf585536df9f3bf8792..b7f666cb3663411a8956d2e6924effc8cec6738c 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -82,14 +82,10 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { auto& feature = features_[table_id]; auto& feature_label = feature_labels_[table_id]; feature_label.resize(feature.size()); - VLOG(3) << "going to get label_var_name " << label_var_name_[table_id]; Variable* var = thread_scope_->FindVar(label_var_name_[table_id]); - VLOG(3) << "going to get tensor"; LoDTensor* tensor = var->GetMutable(); - VLOG(3) << "going to get ptr"; int64_t* label_ptr = tensor->data(); - VLOG(3) << "lele"; int global_index = 0; for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) { VLOG(3) << "sparse_key_names_[" << i @@ -98,7 +94,6 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { LoDTensor* tensor = fea_var->GetMutable(); int64_t* ids = tensor->data(); int fea_idx = 0; - VLOG(3) << "Haha"; // tensor->lod()[0].size() == batch_size + 1 for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) { for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) { @@ -110,7 +105,6 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { static_cast(label_ptr[lod_idx - 1]); } } - VLOG(3) << "EE"; } CHECK(global_index == feature.size()) << "expect fea info size:" << feature.size() << " real:" << global_index; @@ -163,6 +157,174 @@ void DownpourWorker::FillSparseValue(size_t table_idx) { void DownpourWorker::TrainFilesWithProfiler() { VLOG(3) << "Begin to train files with profiler"; platform::SetNumThreads(1); + device_reader_->Start(); + std::vector op_total_time; + std::vector op_name; + for (auto& op : ops_) { + bool need_skip = false; + for (auto t = 0u; t < skip_ops_.size(); ++t) { + if (op->Type().find(skip_ops_[t]) != std::string::npos) { + need_skip = true; + break; + } + } + if (!need_skip) { + op_name.push_back(op->Type()); + } + } + + VLOG(3) << "op name size: " << op_name.size(); + op_total_time.resize(op_name.size()); + for (size_t i = 0; i < op_total_time.size(); ++i) { + op_total_time[i] = 0.0; + } + platform::Timer timeline; + double total_time = 0.0; + double read_time = 0.0; + double pull_sparse_time = 0.0; + double collect_label_time = 0.0; + double fill_sparse_time = 0.0; + double push_sparse_time = 0.0; + double push_dense_time = 0.0; + int cur_batch; + int batch_cnt = 0; + timeline.Start(); + while ((cur_batch = device_reader_->Next()) > 0) { + timeline.Pause(); + read_time += timeline.ElapsedSec(); + total_time += timeline.ElapsedSec(); + VLOG(3) << "program config size: " << param_.program_config_size(); + for (size_t i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).pull_sparse_table_id(i)); + TableParameter table; + for (auto i : param_.sparse_table()) { + if (i.table_id() == tid) { + table = i; + break; + } + } + timeline.Start(); + fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid, + sparse_key_names_[tid], &features_[tid], + &feature_values_[tid], table.fea_dim()); + timeline.Pause(); + pull_sparse_time += timeline.ElapsedSec(); + CollectLabelInfo(i); + timeline.Pause(); + collect_label_time += timeline.ElapsedSec(); + timeline.Start(); + FillSparseValue(i); + timeline.Pause(); + fill_sparse_time += timeline.ElapsedSec(); + } + VLOG(3) << "Fill sparse value for all sparse table done."; + + int run_op_idx = 0; + for (auto& op : ops_) { + bool need_skip = false; + for (auto t = 0u; t < skip_ops_.size(); ++t) { + if (op->Type().find(skip_ops_[t]) != std::string::npos) { + need_skip = true; + break; + } + } + if (!need_skip) { + timeline.Start(); + op->Run(*thread_scope_, place_); + timeline.Pause(); + op_total_time[run_op_idx++] += timeline.ElapsedSec(); + total_time += timeline.ElapsedSec(); + } + } + + for (size_t i = 0; i < param_.program_config(0).push_sparse_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).push_sparse_table_id(i)); + TableParameter table; + for (auto i : param_.sparse_table()) { + if (i.table_id() == tid) { + table = i; + break; + } + } + timeline.Start(); + fleet_ptr_->PushSparseVarsWithLabelAsync( + *thread_scope_, tid, features_[tid], feature_labels_[tid], + sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), + &feature_grads_[tid], &push_sparse_status_); + timeline.Pause(); + push_sparse_time += timeline.ElapsedSec(); + } + + timeline.Start(); + for (size_t i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).push_dense_table_id(i)); + fleet_ptr_->PushDenseVarsAsync( + *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_); + } + timeline.Pause(); + push_dense_time += timeline.ElapsedSec(); + + VLOG(3) << "push sparse and dense gradient done."; + int32_t tmp_push_dense_wait_times = -1; + int32_t tmp_push_sparse_wait_times = -1; + static uint32_t push_dense_wait_times = + static_cast(tmp_push_dense_wait_times); + static uint32_t push_sparse_wait_times = + static_cast(tmp_push_sparse_wait_times); + if (push_dense_status_.size() >= push_dense_wait_times) { + for (auto& t : push_dense_status_) { + t.wait(); + } + push_dense_status_.resize(0); + } + + if (tmp_push_dense_wait_times == -1) { + push_dense_status_.resize(0); + } + + if (push_sparse_status_.size() >= push_sparse_wait_times) { + for (auto& t : push_sparse_status_) { + t.wait(); + } + push_sparse_status_.resize(0); + } + + if (tmp_push_sparse_wait_times == -1) { + push_sparse_status_.resize(0); + } + VLOG(3) << "going to increase thread version"; + + VLOG(3) << "push dense table id size: " + << param_.program_config(0).push_dense_table_id_size(); + + for (size_t i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).push_dense_table_id(i)); + pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid); + } + + thread_scope_->DropKids(); + ++batch_cnt; + + if (thread_id_ == 0) { + // should be configured here + if (batch_cnt > 0 && batch_cnt % 100 == 0) { + for (size_t i = 0; i < op_total_time.size(); ++i) { + fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i, + op_name[i].c_str(), op_total_time[i] / batch_cnt); + } + fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt); + fprintf(stderr, "IO percent: %f\n", read_time / total_time * 100); + } + } + } } void DownpourWorker::TrainFiles() { diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 0bc65f484dad3320bb95e0e9986629495bbc5368..148893fafc18953327c09511ac7d9fbab1265bd0 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -90,7 +90,7 @@ void HogwildWorker::TrainFilesWithProfiler() { int batch_cnt = 0; timeline.Start(); while ((cur_batch = device_reader_->Next()) > 0) { - LOG(WARNING) << "read a batch in thread " << thread_id_; + VLOG(3) << "read a batch in thread " << thread_id_; timeline.Pause(); read_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index e57e04068b387262e479ac2328f81969f4e6f7d9..6d99a1ba87dd8782d6694c02e5384ce6af6c8543 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -83,6 +83,7 @@ class DistMultiTrainer : public MultiTrainer { virtual ~DistMultiTrainer() {} virtual void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set); virtual void InitOtherEnv(const ProgramDesc& main_program); + virtual void Run(); virtual void Finalize(); protected: diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 0f364e77c7ce6ac14b23f72c37739cf5c16d74d4..1314a324063e8fcb93e3f24846b9bed86445a7a9 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -627,7 +627,7 @@ class Executor(object): fetch_list=None, scope=None, thread=0, - opt_info=None): + debug=False): if scope is None: scope = global_scope() if fetch_list is None: @@ -636,6 +636,8 @@ class Executor(object): if not compiled: trainer = TrainerFactory().create_trainer(program._fleet_opt) trainer.set_program(program) + with open("fleet_desc.prototxt", "w") as fout: + fout.write(str(program._fleet_opt["fleet_desc"])) else: trainer = TrainerFactory().create_trainer( program.program._fleet_opt) @@ -644,8 +646,11 @@ class Executor(object): trainer.set_thread(dataset.thread_num) else: trainer.set_thread(thread) + trainer.set_debug(debug) trainer.gen_trainer_desc() dataset._prepare_to_run() + with open("trainer_desc.prototxt", "w") as fout: + fout.write(trainer._desc()) self._default_executor.run_from_dataset(program.desc, scope, dataset.dataset, trainer._desc()) diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 06b2c2b28d5668ea5d664ee15d9d69d1fc6b0329..fc4f53ff1dd46221ae3f065ad5d3414df9318d86 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -101,10 +101,10 @@ class MPISymetricRoleMaker(MPIRoleMaker): return self.get_size() def worker_index(self): - return self.rank / self.proc_per_node_ + return self.rank_ / self.proc_per_node_ def server_index(self): - return self.rank / self.proc_per_node_ + return self.rank_ / self.proc_per_node_ def barrier_worker(self): if self.is_worker(): diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index c6f26340f9be78b0e6ec40c9253753523af96ffc..8bc739707b35b7a6d238ffe085456d3487ff279d 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -36,6 +36,9 @@ class TrainerDesc(object): self.device_worker_ = None self.program_ = None + def set_debug(self, debug): + self.proto_desc.debug = debug + def set_thread(self, thread_num): self.proto_desc.thread_num = thread_num @@ -60,6 +63,10 @@ class MultiTrainer(TrainerDesc): super(MultiTrainer, self).__init__() pass + def set_program(self, program): + super(MultiTrainer, self).set_program(program) + self.program_ = program + def gen_trainer_desc(self): super(MultiTrainer, self).gen_trainer_desc() self.proto_desc.class_name = "MultiTrainer" @@ -71,8 +78,14 @@ class DistMultiTrainer(TrainerDesc): super(DistMultiTrainer, self).__init__() pass + def set_program(self, program): + super(DistMultiTrainer, self).set_program(program) + self.program_ = program + def gen_trainer_desc(self): super(DistMultiTrainer, self).gen_trainer_desc() self.proto_desc.class_name = "DistMultiTrainer" + if self.program_ == None: + print("None program") self.device_worker_.set_program(self.program_) self.device_worker_.gen_worker_desc(self.proto_desc)