未验证 提交 beab9563 编写于 作者: D danleifeng 提交者: GitHub

【heterps】support cuda11 for heterps; add profiler in oneps (#32640)

* add trainprofiler for heterps in oneps; test=develop

* add set_use_ps_gpu; test=develop
上级 c8affff0
...@@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase { ...@@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase {
HogwildWorkerParameter param_; HogwildWorkerParameter param_;
std::vector<std::string> skip_ops_; std::vector<std::string> skip_ops_;
std::map<std::string, int> stat_var_name_map_; std::map<std::string, int> stat_var_name_map_;
#ifdef PADDLE_WITH_HETERPS
platform::DeviceContext* dev_ctx_ = nullptr;
#endif
}; };
class DownpourWorker : public HogwildWorker { class DownpourWorker : public HogwildWorker {
......
...@@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) { ...@@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) {
for (int i = 0; i < param_.stat_var_names_size(); ++i) { for (int i = 0; i < param_.stat_var_names_size(); ++i) {
stat_var_name_map_[param_.stat_var_names(i)] = 1; stat_var_name_map_[param_.stat_var_names(i)] = 1;
} }
#ifdef PADDLE_WITH_HETERPS
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
#endif
} }
void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
...@@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() { ...@@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() {
VLOG(3) << "Going to run op " << op_name[i]; VLOG(3) << "Going to run op " << op_name[i];
if (!need_skip) { if (!need_skip) {
ops_[i]->Run(*thread_scope_, place_); ops_[i]->Run(*thread_scope_, place_);
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
#endif
} }
VLOG(3) << "Op " << op_name[i] << " Finished"; VLOG(3) << "Op " << op_name[i] << " Finished";
timeline.Pause(); timeline.Pause();
...@@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() { ...@@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() {
total_inst += cur_batch; total_inst += cur_batch;
++batch_cnt; ++batch_cnt;
PrintFetchVars(); PrintFetchVars();
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time
<< " seconds, ins_num: " << total_inst;
for (size_t i = 0; i < op_name.size(); ++i) {
VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i]
<< ", mean time: " << op_total_time[i] / total_inst
<< "s, totol time:" << op_total_time[i] << "sec";
}
#else
if (thread_id_ == 0) { if (thread_id_ == 0) {
if (batch_cnt > 0 && batch_cnt % 100 == 0) { if (batch_cnt > 0 && batch_cnt % 100 == 0) {
for (size_t i = 0; i < ops_.size(); ++i) { for (size_t i = 0; i < ops_.size(); ++i) {
...@@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() { ...@@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() {
fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time); fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time);
} }
} }
#endif
thread_scope_->DropKids(); thread_scope_->DropKids();
timeline.Start(); timeline.Start();
} }
...@@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() { ...@@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() {
void HogwildWorker::TrainFiles() { void HogwildWorker::TrainFiles() {
platform::SetNumThreads(1); platform::SetNumThreads(1);
platform::Timer timeline;
timeline.Start();
int total_ins_num = 0;
// how to accumulate fetched values here // how to accumulate fetched values here
device_reader_->Start(); device_reader_->Start();
int cur_batch; int cur_batch;
...@@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() { ...@@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() {
} }
} }
total_ins_num += cur_batch;
PrintFetchVars(); PrintFetchVars();
thread_scope_->DropKids(); thread_scope_->DropKids();
} }
timeline.Pause();
VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec()
<< " seconds, ins_num: " << total_ins_num;
#if defined PADDLE_WITH_PSCORE #if defined PADDLE_WITH_PSCORE
if (thread_barrier_) { if (thread_barrier_) {
paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement(); paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement();
......
...@@ -33,6 +33,7 @@ class DatasetBase(object): ...@@ -33,6 +33,7 @@ class DatasetBase(object):
self.dataset = core.Dataset("MultiSlotDataset") self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1 self.thread_num = 1
self.filelist = [] self.filelist = []
self.use_ps_gpu = False
def init(self, def init(self,
batch_size=1, batch_size=1,
...@@ -214,6 +215,15 @@ class DatasetBase(object): ...@@ -214,6 +215,15 @@ class DatasetBase(object):
self.dataset.set_data_feed_desc(self._desc()) self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_readers() self.dataset.create_readers()
def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu
def _finish_to_run(self): def _finish_to_run(self):
self.dataset.destroy_readers() self.dataset.destroy_readers()
...@@ -531,11 +541,17 @@ class InMemoryDataset(DatasetBase): ...@@ -531,11 +541,17 @@ class InMemoryDataset(DatasetBase):
def _dynamic_adjust_before_train(self, thread_num): def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num: if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num) self.dataset.dynamic_adjust_readers_num(thread_num)
def _dynamic_adjust_after_train(self): def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num: if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num) self.dataset.dynamic_adjust_readers_num(self.thread_num)
......
...@@ -74,6 +74,7 @@ class DatasetBase(object): ...@@ -74,6 +74,7 @@ class DatasetBase(object):
self.dataset = core.Dataset("MultiSlotDataset") self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1 self.thread_num = 1
self.filelist = [] self.filelist = []
self.use_ps_gpu = False
def set_pipe_command(self, pipe_command): def set_pipe_command(self, pipe_command):
""" """
...@@ -300,6 +301,15 @@ class DatasetBase(object): ...@@ -300,6 +301,15 @@ class DatasetBase(object):
self.dataset.set_data_feed_desc(self.desc()) self.dataset.set_data_feed_desc(self.desc())
self.dataset.create_readers() self.dataset.create_readers()
def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu
def _finish_to_run(self): def _finish_to_run(self):
self.dataset.destroy_readers() self.dataset.destroy_readers()
...@@ -391,6 +401,9 @@ class InMemoryDataset(DatasetBase): ...@@ -391,6 +401,9 @@ class InMemoryDataset(DatasetBase):
) )
def _dynamic_adjust_before_train(self, thread_num): def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num: if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num) self.dataset.dynamic_adjust_readers_num(thread_num)
...@@ -400,6 +413,9 @@ class InMemoryDataset(DatasetBase): ...@@ -400,6 +413,9 @@ class InMemoryDataset(DatasetBase):
) )
def _dynamic_adjust_after_train(self): def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num: if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num) self.dataset.dynamic_adjust_readers_num(self.thread_num)
......
...@@ -1507,6 +1507,9 @@ class Executor(object): ...@@ -1507,6 +1507,9 @@ class Executor(object):
trainer._gen_trainer_desc() trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer) self._dump_debug_info(program=program, trainer=trainer)
# in case of calling _set_use_ps_gpu explicitly
if dataset.use_ps_gpu is False:
dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
trainer_instance = self._default_executor.init_for_dataset( trainer_instance = self._default_executor.init_for_dataset(
......
...@@ -73,6 +73,7 @@ class TestCommunicator(unittest.TestCase): ...@@ -73,6 +73,7 @@ class TestCommunicator(unittest.TestCase):
dataset.init( dataset.init(
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"]) dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset._set_use_ps_gpu(1)
dataset.load_into_memory() dataset.load_into_memory()
os.environ["TEST_MODE"] = "1" os.environ["TEST_MODE"] = "1"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册