diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index b40099542cfd5d0f1475e16f3be977b45fbd7144..c44bda490bb6f05ae77001de4748bb2b73a88df8 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -581,6 +581,7 @@ class SectionWorker : public DeviceWorker { void RunUpdate( std::unique_ptr&, std::unordered_map>&); + void PrepareUnusedVar(); protected: int section_id_; @@ -595,6 +596,8 @@ class SectionWorker : public DeviceWorker { std::vector> ops_; std::shared_ptr program_; + std::unordered_map> + unused_vars_; static uint64_t batch_id_; platform::DeviceContext* dev_ctx_ = nullptr; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 3bd50229b94deb0e09f242d529c3225ec2e4408a..42577972e9b79d2dcfdf692afdec19b3ab576c90 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -113,19 +113,28 @@ void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, this_worker->SetRootScope(root_scope_); this_worker->SetMinibatchScope(minibatch_scope_); this_worker->SetMicrobatchScopes(microbatch_scopes_); + this_worker->PrepareUnusedVar(); } void PipelineTrainer::Run() { VLOG(5) << "Going to run PipelineTrainer::Run()"; - section_thread_ = std::async(&DeviceWorker::TrainFiles, worker_.get()); -} - -void PipelineTrainer::Finalize() { try { - section_thread_.get(); + worker_->TrainFiles(); } catch (platform::EOFException& e) { std::rethrow_exception(std::current_exception()); } + for (auto* micro_scop : microbatch_scopes_) { + // By default, we should delete all kid scopes after run executor because + // some operators may create local scope when running, such as while_op. + // But when while_op also create a local executor to run it's sub block, + // the sub scopes it created should not be dropped immediately, because + // while_grad_op will use some variables created during while_op run, so + // we need to keep the kids and wait for the outer executor to drop them. + micro_scop->DropKids(); + } +} + +void PipelineTrainer::Finalize() { if (need_dump_field_) { FinalizeDumpEnv(); } diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index 993b9ac52c5b561fa2c0c850845425ac19cc97bf..a7e84b34b2436bf60d1af19f4f128597250d5033 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -96,12 +96,16 @@ void SectionWorker::RunUpdate( } } +void SectionWorker::PrepareUnusedVar() { + VLOG(5) << "begin prepare the unsed vars"; + unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); +} + void SectionWorker::TrainFiles() { VLOG(5) << "begin section_worker TrainFiles"; int64_t max_memory_size = GetEagerDeletionThreshold(); std::unique_ptr gc; - auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); if (max_memory_size >= 0) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) if (platform::is_gpu_place(place_)) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 81f4ae32397b4c678afe8048d8ae684ba138ed50..4f56666a64ba387fc979c93d51a1454a2a599165 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1638,8 +1638,12 @@ class Executor(object): dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_desc = trainer._desc() # slow, cache - ctx = [trainer_desc, dataset, scope, real_fetch_list] + trainer_instance = self._default_executor.init_for_dataset( + program.desc, trainer_desc, scope, dataset.dataset) + + ctx = [scope, real_fetch_list, trainer_instance] if use_program_cache: self._add_ctx_cache(cache_key, ctx) + return ctx def _run_pipeline(self, @@ -1654,20 +1658,17 @@ class Executor(object): print_period=100, fetch_handler=None, use_program_cache=False): - trainer_desc, dataset, scope, real_fetch_list = \ + scope, real_fetch_list, trainer_instance = \ self._prepare_pipeline_ctx(program, dataset, scope, thread, is_infer, debug, fetch_list, fetch_info, print_period, fetch_handler, use_program_cache) - trainer_instance = self._default_executor.init_for_dataset( - program.desc, trainer_desc, scope, dataset.dataset) - self._default_executor.run_from_dataset(trainer_instance) - self._default_executor.release_trainer(trainer_instance) - dataset._dynamic_adjust_after_train() - dataset._finish_to_run() + if not use_program_cache: + self._default_executor.release_trainer(trainer_instance) + if real_fetch_list: arr = scope.find_var('fetch').get_fetch_list() tensors = arr._move_to_list()