diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index 659f7d595cebcb5b53b0f24c936a06cbd1acbc52..74e6b79a2a3ddd7562d190dcab9dc42324be3cf9 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -22,12 +22,11 @@ namespace reader { class CustomReader : public framework::DecoratedReader { public: CustomReader(ReaderBase* reader, const framework::BlockDesc* sub_block, - const framework::Scope* scope, const platform::Place& dev_place, + const platform::Place& dev_place, const std::vector& source_var_names, const std::vector& sink_var_names) : DecoratedReader(reader), sub_block_(sub_block), - scope_(scope), exe_(framework::Executor(dev_place)), source_var_names_(source_var_names), sink_var_names_(sink_var_names) {} @@ -37,12 +36,10 @@ class CustomReader : public framework::DecoratedReader { void UpdateBlockAndScope(const framework::BlockDesc* sub_block, const framework::Scope* scope) { sub_block_ = sub_block; - scope_ = scope; } private: const framework::BlockDesc* sub_block_; - const framework::Scope* scope_; framework::Executor exe_; std::vector source_var_names_; @@ -67,7 +64,7 @@ class CreateCustomReaderOp : public framework::OperatorBase { const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); out->Reset( - new CustomReader(underlying_reader.Get(), sub_block, &scope, dev_place, + new CustomReader(underlying_reader.Get(), sub_block, dev_place, Attr>("source_var_names"), Attr>("sink_var_names"))); } @@ -150,27 +147,29 @@ void CustomReader::ReadNext(std::vector* out) { "the size of underlying_outs(%d) are not consistent. Each feeding " "element must have its own source and sink variable.", source_var_names_.size(), sink_var_names_.size(), underlying_outs.size()); - - framework::Scope* exe_scope = &scope_->NewScope(); + // The scope for CustomReader's sub-block should be independent and shouldn't + // be any other computation scope's child. Otherwise, data preprocessing and + // compution cannot be concurrent. + auto* scope = new framework::Scope(); // 1. Copy LoDTensors from underlying reader's output to source variables. for (size_t i = 0; i < source_var_names_.size(); ++i) { - framework::Variable* var = exe_scope->Var(source_var_names_[i]); + framework::Variable* var = scope->Var(source_var_names_[i]); framework::LoDTensor* tensor = var->GetMutable(); tensor->ShareDataWith(underlying_outs[i]); tensor->set_lod(underlying_outs[i].lod()); } // 2. Run the sub-block. framework::ProgramDesc* program = sub_block_->Program(); - exe_.Run(*program, exe_scope, sub_block_->ID(), false, true); + exe_.Run(*program, scope, sub_block_->ID(), false, true); // 3. Copy LoDTensors from sink variables to out. out->resize(sink_var_names_.size()); for (size_t i = 0; i < sink_var_names_.size(); ++i) { - framework::Variable* var = exe_scope->FindVar(sink_var_names_[i]); + framework::Variable* var = scope->FindVar(sink_var_names_[i]); PADDLE_ENFORCE_NOT_NULL(var); const framework::LoDTensor& tensor = var->Get(); framework::TensorCopySync(tensor, platform::CPUPlace(), &(*out)[i]); } - scope_->DeleteScope(exe_scope); + delete scope; } } // namespace reader diff --git a/python/paddle/fluid/tests/unittests/test_preprocessor.py b/python/paddle/fluid/tests/unittests/test_preprocessor.py index 37dd366f3c88eb3aeb4e67f48fd225e1162ff38b..cbf1a7e0c50a87cd43507ffdb94109873cf4e5d9 100644 --- a/python/paddle/fluid/tests/unittests/test_preprocessor.py +++ b/python/paddle/fluid/tests/unittests/test_preprocessor.py @@ -74,7 +74,8 @@ class TestPreprocessor(unittest.TestCase): lbl_out = lbl + 1 preprocessor.outputs(img_out, lbl_out) - img, lbl = fluid.layers.io.read_file(preprocessor()) + data_file = fluid.layers.io.double_buffer(preprocessor()) + img, lbl = fluid.layers.io.read_file(data_file) if fluid.core.is_compiled_with_cuda(): place = fluid.CUDAPlace(0)