From 79082c94594adaf4765e950151da51c84ec137b8 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 10 Dec 2018 17:31:52 +0800 Subject: [PATCH] fix pyreader failed --- .../scope_buffered_ssa_graph_executor.cc | 27 +++++++++---------- .../scope_buffered_ssa_graph_executor.h | 5 ++-- .../details/threaded_ssa_graph_executor.cc | 1 - paddle/fluid/framework/parallel_executor.cc | 22 +++++++++++---- .../fluid/operators/reader/buffered_reader.cc | 2 -- .../operators/reader/create_py_reader_op.cc | 2 -- .../fluid/operators/reader/open_files_op.cc | 2 -- 7 files changed, 31 insertions(+), 30 deletions(-) diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index abc6b9f559..85898af417 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -27,34 +27,31 @@ namespace framework { namespace details { ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor( ExecutionStrategy strategy, std::vector local_scopes, - std::vector> var_infos_list, - std::vector places, + std::vector var_infos, std::vector places, std::unique_ptr &&underlying_executor) : strategy_(std::move(strategy)), underlying_executor_(std::move(underlying_executor)), local_scopes_(std::move(local_scopes)), - var_infos_list_(std::move(var_infos_list)), + var_infos_(std::move(var_infos)), places_(std::move(places)) {} FeedFetchList ScopeBufferedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { if (drop_scope_counter_ == 0) { // Create local scopes. - for (size_t i = 0; i < local_scopes_.size(); ++i) { - auto &scope = local_scopes_[i]; + for (auto it = local_scopes_.rbegin(); it != local_scopes_.rend(); ++it) { + auto &scope = *it; Scope &local_scope = scope->NewScope(); *scope->Var(details::kLocalExecScopeName)->GetMutable() = &local_scope; - for (auto &var_infos : var_infos_list_) { - for (auto &info : var_infos) { - if (scope->FindVar(info.name_) != nullptr) { - continue; - } - if (info.persistable_) { // Persistable - InitializeVariable(scope->Var(info.name_), info.type_); - } else { - InitializeVariable(local_scope.Var(info.name_), info.type_); - } + for (auto &info : var_infos_) { + if (scope->FindVar(info.name_) != nullptr) { + continue; + } + if (info.persistable_) { // Persistable + InitializeVariable(scope->Var(info.name_), info.type_); + } else { + InitializeVariable(local_scope.Var(info.name_), info.type_); } } } diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h index 51230d4a42..5e87e0bf50 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h @@ -38,8 +38,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { public: ScopeBufferedSSAGraphExecutor( ExecutionStrategy strategy, std::vector local_scopes, - std::vector> var_info_list, - std::vector places, + std::vector var_infos, std::vector places, std::unique_ptr&& underlying_executor); const ir::Graph& Graph() const override { @@ -54,7 +53,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { ExecutionStrategy strategy_; std::unique_ptr underlying_executor_; std::vector local_scopes_; - std::vector> var_infos_list_; + std::vector var_infos_; std::vector places_; }; } // namespace details diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 677a293794..cebf63364d 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -216,7 +216,6 @@ void ThreadedSSAGraphExecutor::RunOp( if (LIKELY(!strategy_.dry_run_)) { op->Run(strategy_.use_cuda_); } - VLOG(10) << op << " " << op->Name() << " Done "; running_ops_--; ready_var_q->Extend(op->Outputs()); VLOG(10) << op << " " << op->Name() << "Signal posted"; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 186f0cb803..2a9ca3e815 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -141,7 +141,6 @@ ParallelExecutor::ParallelExecutor( std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { - VLOG(1) << "kParallelGraph mode!!"; for (size_t i = 0; i < member_->places_.size(); ++i) { std::unique_ptr graph = build_strategy.Apply( main_program, {member_->places_[i]}, loss_var_name, params, @@ -178,8 +177,8 @@ ParallelExecutor::ParallelExecutor( ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_); ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_); ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_); - graphs[0] = ref_cnt_pass->Apply(std::move(graphs[i])); - graphs[0]->SetNotOwned("garbage_collector", &gcs_); + graphs[i] = ref_cnt_pass->Apply(std::move(graphs[i])); + graphs[i]->SetNotOwned("garbage_collector", &gcs_); } } } @@ -192,6 +191,18 @@ ParallelExecutor::ParallelExecutor( // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars + std::vector var_infos; + for (auto &graph : graphs) { + for (auto &node : graph->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); + } + } + } + /** std::vector> var_infos_list; for (size_t i = 0; i < graphs.size(); ++i) { std::vector var_infos; @@ -203,8 +214,9 @@ ParallelExecutor::ParallelExecutor( var_infos.back().persistable_ = node->Var()->Persistable(); } } - var_infos_list.emplace_back(std::move(var_infos)); + var_infos_list.push_back(std::move(var_infos)); } + **/ // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { @@ -236,7 +248,7 @@ ParallelExecutor::ParallelExecutor( } member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, std::move(var_infos_list), + exec_strategy, member_->local_scopes_, std::move(var_infos), member_->places_, std::move(member_->executor_))); } diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index cfa192f8e1..26ff221dfa 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -58,9 +58,7 @@ void BufferedReader::ReadAsync(size_t i) { TensorVec &gpu = gpu_buffer_[i]; gpu.resize(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { - VLOG(1) << "launch tensor copy from cpu to cpu, idx: " << i; framework::TensorCopySync(cpu[i], place_, &gpu[i]); - VLOG(1) << "done " << i; gpu[i].set_lod(cpu[i].lod()); } } diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 093b0e56b3..901a92ab5b 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -28,10 +28,8 @@ class PyReader : public framework::FileReader { } void ReadNext(std::vector* out) override { - VLOG(1) << "come in PyReader::ReadNext function, out: " << out; bool success; *out = queue_->Pop(&success); - VLOG(1) << "call PyReader::ReadNext " << success; if (!success) out->clear(); } diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index ae37a18725..38223e0699 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -115,12 +115,10 @@ class PreemptiveReaderContainer : public IReaderContainer { } void ReadNext(std::vector* out) override { - VLOG(1) << "flag"; if (!pending_.empty()) { auto future_it = complete_queue_.Pop(); FutureItem item = future_it->get(); if (item.exception_) { - VLOG(1) << "item has exception!!!"; for (auto it = futures_.begin(); it != futures_.end(); ++it) { if (it != future_it) { it->wait(); // Wait all other threads complete. -- GitLab