diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index a49e492e48028b15d724cbdc7c1b5efbc809ddcf..d33809a0a2b7ce435115d4ca90925e62b5dd7259 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase { HogwildWorkerParameter param_; std::vector skip_ops_; std::map stat_var_name_map_; +#ifdef PADDLE_WITH_HETERPS + platform::DeviceContext* dev_ctx_ = nullptr; +#endif }; class DownpourWorker : public HogwildWorker { diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 89dc5c7d3ea932388fd8ab220478bb438f6b35f8..b2d170888e28fc4e9918c26f000a5983c09811ee 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) { for (int i = 0; i < param_.stat_var_names_size(); ++i) { stat_var_name_map_[param_.stat_var_names(i)] = 1; } +#ifdef PADDLE_WITH_HETERPS + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); +#endif } void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { @@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() { VLOG(3) << "Going to run op " << op_name[i]; if (!need_skip) { ops_[i]->Run(*thread_scope_, place_); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); +#endif } VLOG(3) << "Op " << op_name[i] << " Finished"; timeline.Pause(); @@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() { total_inst += cur_batch; ++batch_cnt; PrintFetchVars(); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); + VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time + << " seconds, ins_num: " << total_inst; + for (size_t i = 0; i < op_name.size(); ++i) { + VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i] + << ", mean time: " << op_total_time[i] / total_inst + << "s, totol time:" << op_total_time[i] << "sec"; + } +#else if (thread_id_ == 0) { if (batch_cnt > 0 && batch_cnt % 100 == 0) { for (size_t i = 0; i < ops_.size(); ++i) { @@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() { fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time); } } +#endif thread_scope_->DropKids(); timeline.Start(); } @@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() { void HogwildWorker::TrainFiles() { platform::SetNumThreads(1); + platform::Timer timeline; + timeline.Start(); + int total_ins_num = 0; // how to accumulate fetched values here device_reader_->Start(); int cur_batch; @@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() { } } + total_ins_num += cur_batch; PrintFetchVars(); thread_scope_->DropKids(); } + timeline.Pause(); + VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() + << " seconds, ins_num: " << total_ins_num; #if defined PADDLE_WITH_PSCORE if (thread_barrier_) { paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement(); diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 10c27ea91d249428e889701104f2fc8d837bbac9..e63369903190ae4f02275fcc5245127b317b253c 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -31,6 +31,7 @@ class DatasetBase(object): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def init(self, batch_size=1, @@ -212,6 +213,14 @@ class DatasetBase(object): self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -529,12 +538,18 @@ class InMemoryDataset(DatasetBase): def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) def _set_queue_num(self, queue_num): diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 86c63ababbbfdbc9b7d07c95e37dda8c67d18d2f..db51cb549ad36d61692825a7d76bafcda146a7e2 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -74,6 +74,7 @@ class DatasetBase(object): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def set_pipe_command(self, pipe_command): """ @@ -300,6 +301,14 @@ class DatasetBase(object): self.dataset.set_data_feed_desc(self.desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -391,7 +400,10 @@ class InMemoryDataset(DatasetBase): ) def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) @deprecated( @@ -400,7 +412,10 @@ class InMemoryDataset(DatasetBase): ) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) @deprecated( diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 62a9c42ee0a61c0b01d4562daca4b30e83f24792..620729795bc20afe94e7b9973e9d74dd5743d050 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1507,6 +1507,9 @@ class Executor(object): trainer._gen_trainer_desc() self._dump_debug_info(program=program, trainer=trainer) + # in case of calling _set_use_ps_gpu explicitly + if dataset.use_ps_gpu is False: + dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_instance = self._default_executor.init_for_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 5de1ebf5813722276584441b0588ac4871302078..0b956d5031fec7c0abdffe220a786b30999ab06f 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -73,6 +73,7 @@ class TestCommunicator(unittest.TestCase): dataset.init( batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) + dataset._set_use_ps_gpu(1) dataset.load_into_memory() os.environ["TEST_MODE"] = "1"