diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index cd5de19bdc0887ae8a3819981b119b25c90f0035..d6c422415fceea5dd8d5fbf5901637d04923f6bf 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase { HogwildWorkerParameter param_; std::vector skip_ops_; std::map stat_var_name_map_; +#ifdef PADDLE_WITH_HETERPS + platform::DeviceContext* dev_ctx_ = nullptr; +#endif }; class DownpourWorker : public HogwildWorker { diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 89dc5c7d3ea932388fd8ab220478bb438f6b35f8..b2d170888e28fc4e9918c26f000a5983c09811ee 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) { for (int i = 0; i < param_.stat_var_names_size(); ++i) { stat_var_name_map_[param_.stat_var_names(i)] = 1; } +#ifdef PADDLE_WITH_HETERPS + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); +#endif } void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { @@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() { VLOG(3) << "Going to run op " << op_name[i]; if (!need_skip) { ops_[i]->Run(*thread_scope_, place_); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); +#endif } VLOG(3) << "Op " << op_name[i] << " Finished"; timeline.Pause(); @@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() { total_inst += cur_batch; ++batch_cnt; PrintFetchVars(); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); + VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time + << " seconds, ins_num: " << total_inst; + for (size_t i = 0; i < op_name.size(); ++i) { + VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i] + << ", mean time: " << op_total_time[i] / total_inst + << "s, totol time:" << op_total_time[i] << "sec"; + } +#else if (thread_id_ == 0) { if (batch_cnt > 0 && batch_cnt % 100 == 0) { for (size_t i = 0; i < ops_.size(); ++i) { @@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() { fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time); } } +#endif thread_scope_->DropKids(); timeline.Start(); } @@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() { void HogwildWorker::TrainFiles() { platform::SetNumThreads(1); + platform::Timer timeline; + timeline.Start(); + int total_ins_num = 0; // how to accumulate fetched values here device_reader_->Start(); int cur_batch; @@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() { } } + total_ins_num += cur_batch; PrintFetchVars(); thread_scope_->DropKids(); } + timeline.Pause(); + VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() + << " seconds, ins_num: " << total_ins_num; #if defined PADDLE_WITH_PSCORE if (thread_barrier_) { paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement(); diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 2f428346b9c0c5c64ebcf10647d7e0ef76aeaa68..dc41e3589812f3c6de10e4204cddb4ee723cbf88 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -33,6 +33,7 @@ class DatasetBase(object): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def init(self, batch_size=1, @@ -214,6 +215,15 @@ class DatasetBase(object): self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -531,12 +541,18 @@ class InMemoryDataset(DatasetBase): def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) def _set_queue_num(self, queue_num): diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 86c63ababbbfdbc9b7d07c95e37dda8c67d18d2f..b4cd3326ddec5f75fe93090fd2ef5ce12dc45771 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -74,6 +74,7 @@ class DatasetBase(object): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def set_pipe_command(self, pipe_command): """ @@ -300,6 +301,15 @@ class DatasetBase(object): self.dataset.set_data_feed_desc(self.desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -391,7 +401,10 @@ class InMemoryDataset(DatasetBase): ) def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) @deprecated( @@ -400,7 +413,10 @@ class InMemoryDataset(DatasetBase): ) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) @deprecated( diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 62a9c42ee0a61c0b01d4562daca4b30e83f24792..620729795bc20afe94e7b9973e9d74dd5743d050 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1507,6 +1507,9 @@ class Executor(object): trainer._gen_trainer_desc() self._dump_debug_info(program=program, trainer=trainer) + # in case of calling _set_use_ps_gpu explicitly + if dataset.use_ps_gpu is False: + dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_instance = self._default_executor.init_for_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 5de1ebf5813722276584441b0588ac4871302078..0b956d5031fec7c0abdffe220a786b30999ab06f 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -73,6 +73,7 @@ class TestCommunicator(unittest.TestCase): dataset.init( batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) + dataset._set_use_ps_gpu(1) dataset.load_into_memory() os.environ["TEST_MODE"] = "1"