未验证 提交 ab1a4df9 编写于 作者: D danleifeng 提交者: GitHub

【cherrypick】support cuda11 for heterps; add profiler in oneps (#32957)

* cherrypick for #32640 :add profile and fix dataset hang in heterps;test=develop

* cherrypick for #32640 :add profile and fix dataset hang in heterps;test=develop

* cherrypick for #32640 :add profile and fix dataset hang in heterps;test=develop
上级 4639f5de
......@@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase {
HogwildWorkerParameter param_;
std::vector<std::string> skip_ops_;
std::map<std::string, int> stat_var_name_map_;
#ifdef PADDLE_WITH_HETERPS
platform::DeviceContext* dev_ctx_ = nullptr;
#endif
};
class DownpourWorker : public HogwildWorker {
......
......@@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) {
for (int i = 0; i < param_.stat_var_names_size(); ++i) {
stat_var_name_map_[param_.stat_var_names(i)] = 1;
}
#ifdef PADDLE_WITH_HETERPS
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
#endif
}
void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
......@@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() {
VLOG(3) << "Going to run op " << op_name[i];
if (!need_skip) {
ops_[i]->Run(*thread_scope_, place_);
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
#endif
}
VLOG(3) << "Op " << op_name[i] << " Finished";
timeline.Pause();
......@@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() {
total_inst += cur_batch;
++batch_cnt;
PrintFetchVars();
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time
<< " seconds, ins_num: " << total_inst;
for (size_t i = 0; i < op_name.size(); ++i) {
VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i]
<< ", mean time: " << op_total_time[i] / total_inst
<< "s, totol time:" << op_total_time[i] << "sec";
}
#else
if (thread_id_ == 0) {
if (batch_cnt > 0 && batch_cnt % 100 == 0) {
for (size_t i = 0; i < ops_.size(); ++i) {
......@@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() {
fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time);
}
}
#endif
thread_scope_->DropKids();
timeline.Start();
}
......@@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() {
void HogwildWorker::TrainFiles() {
platform::SetNumThreads(1);
platform::Timer timeline;
timeline.Start();
int total_ins_num = 0;
// how to accumulate fetched values here
device_reader_->Start();
int cur_batch;
......@@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() {
}
}
total_ins_num += cur_batch;
PrintFetchVars();
thread_scope_->DropKids();
}
timeline.Pause();
VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec()
<< " seconds, ins_num: " << total_ins_num;
#if defined PADDLE_WITH_PSCORE
if (thread_barrier_) {
paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement();
......
......@@ -31,6 +31,7 @@ class DatasetBase(object):
self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1
self.filelist = []
self.use_ps_gpu = False
def init(self,
batch_size=1,
......@@ -212,6 +213,14 @@ class DatasetBase(object):
self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_readers()
def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu
def _finish_to_run(self):
self.dataset.destroy_readers()
......@@ -529,11 +538,17 @@ class InMemoryDataset(DatasetBase):
def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num)
def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num)
......
......@@ -74,6 +74,7 @@ class DatasetBase(object):
self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1
self.filelist = []
self.use_ps_gpu = False
def set_pipe_command(self, pipe_command):
"""
......@@ -300,6 +301,14 @@ class DatasetBase(object):
self.dataset.set_data_feed_desc(self.desc())
self.dataset.create_readers()
def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu
def _finish_to_run(self):
self.dataset.destroy_readers()
......@@ -391,6 +400,9 @@ class InMemoryDataset(DatasetBase):
)
def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num)
......@@ -400,6 +412,9 @@ class InMemoryDataset(DatasetBase):
)
def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num:
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num)
......
......@@ -1507,6 +1507,9 @@ class Executor(object):
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
# in case of calling _set_use_ps_gpu explicitly
if dataset.use_ps_gpu is False:
dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
trainer_instance = self._default_executor.init_for_dataset(
......
......@@ -73,6 +73,7 @@ class TestCommunicator(unittest.TestCase):
dataset.init(
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset._set_use_ps_gpu(1)
dataset.load_into_memory()
os.environ["TEST_MODE"] = "1"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册