未验证 提交 98c7191d 编写于 作者: Y Yuang Liu 提交者: GitHub

[hybrid performance] pipeline cache trainer (#33998)

上级 dfff52ea
...@@ -581,6 +581,7 @@ class SectionWorker : public DeviceWorker { ...@@ -581,6 +581,7 @@ class SectionWorker : public DeviceWorker {
void RunUpdate( void RunUpdate(
std::unique_ptr<GarbageCollector>&, std::unique_ptr<GarbageCollector>&,
std::unordered_map<const OperatorBase*, std::vector<std::string>>&); std::unordered_map<const OperatorBase*, std::vector<std::string>>&);
void PrepareUnusedVar();
protected: protected:
int section_id_; int section_id_;
...@@ -595,6 +596,8 @@ class SectionWorker : public DeviceWorker { ...@@ -595,6 +596,8 @@ class SectionWorker : public DeviceWorker {
std::vector<std::unique_ptr<OperatorBase>> ops_; std::vector<std::unique_ptr<OperatorBase>> ops_;
std::shared_ptr<framework::ProgramDesc> program_; std::shared_ptr<framework::ProgramDesc> program_;
std::unordered_map<const OperatorBase*, std::vector<std::string>>
unused_vars_;
static uint64_t batch_id_; static uint64_t batch_id_;
platform::DeviceContext* dev_ctx_ = nullptr; platform::DeviceContext* dev_ctx_ = nullptr;
......
...@@ -113,19 +113,28 @@ void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, ...@@ -113,19 +113,28 @@ void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program,
this_worker->SetRootScope(root_scope_); this_worker->SetRootScope(root_scope_);
this_worker->SetMinibatchScope(minibatch_scope_); this_worker->SetMinibatchScope(minibatch_scope_);
this_worker->SetMicrobatchScopes(microbatch_scopes_); this_worker->SetMicrobatchScopes(microbatch_scopes_);
this_worker->PrepareUnusedVar();
} }
void PipelineTrainer::Run() { void PipelineTrainer::Run() {
VLOG(5) << "Going to run PipelineTrainer::Run()"; VLOG(5) << "Going to run PipelineTrainer::Run()";
section_thread_ = std::async(&DeviceWorker::TrainFiles, worker_.get());
}
void PipelineTrainer::Finalize() {
try { try {
section_thread_.get(); worker_->TrainFiles();
} catch (platform::EOFException& e) { } catch (platform::EOFException& e) {
std::rethrow_exception(std::current_exception()); std::rethrow_exception(std::current_exception());
} }
for (auto* micro_scop : microbatch_scopes_) {
// By default, we should delete all kid scopes after run executor because
// some operators may create local scope when running, such as while_op.
// But when while_op also create a local executor to run it's sub block,
// the sub scopes it created should not be dropped immediately, because
// while_grad_op will use some variables created during while_op run, so
// we need to keep the kids and wait for the outer executor to drop them.
micro_scop->DropKids();
}
}
void PipelineTrainer::Finalize() {
if (need_dump_field_) { if (need_dump_field_) {
FinalizeDumpEnv(); FinalizeDumpEnv();
} }
......
...@@ -96,12 +96,16 @@ void SectionWorker::RunUpdate( ...@@ -96,12 +96,16 @@ void SectionWorker::RunUpdate(
} }
} }
void SectionWorker::PrepareUnusedVar() {
VLOG(5) << "begin prepare the unsed vars";
unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_);
}
void SectionWorker::TrainFiles() { void SectionWorker::TrainFiles() {
VLOG(5) << "begin section_worker TrainFiles"; VLOG(5) << "begin section_worker TrainFiles";
int64_t max_memory_size = GetEagerDeletionThreshold(); int64_t max_memory_size = GetEagerDeletionThreshold();
std::unique_ptr<GarbageCollector> gc; std::unique_ptr<GarbageCollector> gc;
auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_);
if (max_memory_size >= 0) { if (max_memory_size >= 0) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
......
...@@ -1638,8 +1638,12 @@ class Executor(object): ...@@ -1638,8 +1638,12 @@ class Executor(object):
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
trainer_desc = trainer._desc() # slow, cache trainer_desc = trainer._desc() # slow, cache
ctx = [trainer_desc, dataset, scope, real_fetch_list] trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer_desc, scope, dataset.dataset)
ctx = [scope, real_fetch_list, trainer_instance]
if use_program_cache: self._add_ctx_cache(cache_key, ctx) if use_program_cache: self._add_ctx_cache(cache_key, ctx)
return ctx return ctx
def _run_pipeline(self, def _run_pipeline(self,
...@@ -1654,20 +1658,17 @@ class Executor(object): ...@@ -1654,20 +1658,17 @@ class Executor(object):
print_period=100, print_period=100,
fetch_handler=None, fetch_handler=None,
use_program_cache=False): use_program_cache=False):
trainer_desc, dataset, scope, real_fetch_list = \ scope, real_fetch_list, trainer_instance = \
self._prepare_pipeline_ctx(program, dataset, scope, thread, self._prepare_pipeline_ctx(program, dataset, scope, thread,
is_infer, debug, fetch_list, fetch_info, is_infer, debug, fetch_list, fetch_info,
print_period, fetch_handler, print_period, fetch_handler,
use_program_cache) use_program_cache)
trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer_desc, scope, dataset.dataset)
self._default_executor.run_from_dataset(trainer_instance) self._default_executor.run_from_dataset(trainer_instance)
self._default_executor.release_trainer(trainer_instance)
dataset._dynamic_adjust_after_train() if not use_program_cache:
dataset._finish_to_run() self._default_executor.release_trainer(trainer_instance)
if real_fetch_list: if real_fetch_list:
arr = scope.find_var('fetch').get_fetch_list() arr = scope.find_var('fetch').get_fetch_list()
tensors = arr._move_to_list() tensors = arr._move_to_list()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册