diff --git a/doc/design/cpp_data_feeding.md b/doc/design/cpp_data_feeding.md index 40205350f99722f0b71bfa6f390fe9d01d831966..22c2a925eb8c5e1dd8451e1d3cba261ce471ec51 100644 --- a/doc/design/cpp_data_feeding.md +++ b/doc/design/cpp_data_feeding.md @@ -20,9 +20,8 @@ class ReaderBase { PADDLE_ENFORCE(!shapes_.empty()); } // Read the next batch of data. (A 'batch' can be only one instance) + // If the next batch doesn't exist, the '*out' will be an empty std::vector. virtual void ReadNext(std::vector* out) = 0; - // Show whether the next bacth exists. - virtual bool HasNext() const = 0; // Reinitialize the reader and read the file from the begin. virtual void ReInit() = 0; diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 27ab6e750c2e665fa5055a3ecfb2f315cb4000c0..1be3f4ef1f46bd8d72a285afa69b52d6f519ccf5 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -26,7 +26,6 @@ class ReaderBase { PADDLE_ENFORCE(!shapes_.empty()); } virtual void ReadNext(std::vector* out) = 0; - virtual bool HasNext() const = 0; virtual void ReInit() = 0; @@ -52,8 +51,6 @@ class DecoratedReader : public ReaderBase { PADDLE_ENFORCE_NOT_NULL(reader_); } - bool HasNext() const override { return reader_->HasNext(); } - void ReInit() override { reader_->ReInit(); } protected: @@ -69,7 +66,6 @@ class ReaderHolder { ReaderBase* Get() const { return reader_.get(); } void ReadNext(std::vector* out) { reader_->ReadNext(out); } - bool HasNext() const { return reader_->HasNext(); } void ReInit() { reader_->ReInit(); } DDim shape(size_t idx) const { return reader_->shape(idx); } diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 62beab82d4f2b0b795d5d32f50352172de6870cc..2a5605e0d378a184ae132e657b2872279784855d 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -60,15 +60,16 @@ class ReadOp : public framework::OperatorBase { const platform::Place& dev_place) const override { framework::ReaderHolder* reader = scope.FindVar(Input("Reader"))->GetMutable(); - if (!reader->HasNext()) { + std::vector out_arg_names = Outputs("Out"); + std::vector ins; + reader->ReadNext(&ins); + if (ins.empty()) { reader->ReInit(); + reader->ReadNext(&ins); PADDLE_ENFORCE( - reader->HasNext(), + !ins.empty(), "Reader can not read the next data even it has been re-initialized."); } - std::vector out_arg_names = Outputs("Out"); - std::vector ins; - reader->ReadNext(&ins); PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); for (size_t i = 0; i < ins.size(); ++i) { auto* out = diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 06489f32d64d69030c084a038acb78ac2bac6200..335c5b26a864381bf87a2824b78f521cdce063e4 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -2,4 +2,5 @@ cc_library(reader_op_registry SRCS reader_op_registry.cc DEPS operator op_regist op_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc DEPS reader_op_registry) op_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc DEPS reader_op_registry) op_library(create_batch_reader_op SRCS create_batch_reader_op.cc DEPS reader_op_registry) -set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op PARENT_SCOPE) +op_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS reader_op_registry) +set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op create_double_buffer_reader_op PARENT_SCOPE) diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc index bac043a5529d877dba79c03f07b9d43c9b71d7aa..277f2856c07b3fec2113486539aec1d9139fae92 100644 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ b/paddle/fluid/operators/reader/create_batch_reader_op.cc @@ -68,10 +68,10 @@ void BatchReader::ReadNext(std::vector* out) { buffer_.clear(); buffer_.reserve(batch_size_); for (int i = 0; i < batch_size_; ++i) { - if (reader_->HasNext()) { - buffer_.push_back(std::vector()); - reader_->ReadNext(&buffer_.back()); - } else { + buffer_.push_back(std::vector()); + reader_->ReadNext(&buffer_.back()); + if (buffer_.back().empty()) { + buffer_.pop_back(); break; } } diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..b6a0609a1e23195ececee0f16a69daa1c1c46ed8 --- /dev/null +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" + +namespace paddle { +namespace operators { +namespace reader { + +static constexpr size_t kDoubleBufferSize = 2; + +class DoubleBufferReader : public framework::DecoratedReader { + public: + explicit DoubleBufferReader(ReaderBase* reader) + : DecoratedReader(reader), + buffer_(framework::MakeChannel>( + kDoubleBufferSize)) { + std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); + prefetch.detach(); + } + + void ReadNext(std::vector* out) override; + void ReInit() override; + + ~DoubleBufferReader() { buffer_->Close(); } + + private: + void PrefetchThreadFunc(); + + framework::Channel>* buffer_; +}; + +class CreateDoubleBufferReaderOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) + ->Get(); + auto* out = scope.FindVar(Output("Out")) + ->template GetMutable(); + out->Reset(new DoubleBufferReader(underlying_reader.Get())); + } +}; + +class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { + public: + CreateDoubleBufferReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) + : DecoratedReaderMakerBase(op_proto, op_checker) { + AddComment(R"DOC( + CreateDoubleBufferReader Operator + + A double buffer reader takes another reader as its 'underlying reader'. + It launches another thread to execute the 'underlying reader' asynchronously, + which prevents reading process from blocking subsequent training. + )DOC"); + } +}; + +void DoubleBufferReader::ReadNext(std::vector* out) { + out->clear(); + buffer_->Receive(out); +} + +void DoubleBufferReader::ReInit() { + reader_->ReInit(); + buffer_->Close(); + // The existing prefetch thread will terminate for the buffer_ is closed. + buffer_ = framework::MakeChannel>( + kDoubleBufferSize); + std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); + prefetch.detach(); +} + +void DoubleBufferReader::PrefetchThreadFunc() { + VLOG(5) << "A new prefetch thread starts."; + while (true) { + std::vector batch; + reader_->ReadNext(&batch); + if (batch.empty()) { + // EOF + buffer_->Close(); + VLOG(5) << "Reached the end of the file. The prefetch thread terminates."; + break; + } + if (!buffer_->Send(&batch)) { + VLOG(5) << "WARNING: The double buffer channel has been closed. The " + "prefetch thread terminates."; + break; + } + } +} + +} // namespace reader +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators::reader; +REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader, + ops::CreateDoubleBufferReaderOp, + ops::CreateDoubleBufferReaderOpMaker); diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index f77ab8ab196dae4cf9351cee9bc5566ec2c04e4b..73c39b5da4484b27f75aeba3c8171c5ffed2398f 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -50,8 +50,6 @@ class RandomDataGenerator : public framework::FileReader { } } - bool HasNext() const override { return true; } - void ReInit() override { return; } private: diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 3e8b463efc99e4a962e5ae14ab133cf634548756..4dac3831109beeed660d32f08fb27c7adf62ac2b 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -39,10 +39,10 @@ void ShuffleReader::ReadNext(std::vector* out) { buffer_.clear(); buffer_.reserve(buffer_size_); for (int i = 0; i < buffer_size_; ++i) { - if (reader_->HasNext()) { - buffer_.push_back(std::vector()); - reader_->ReadNext(&buffer_.back()); - } else { + buffer_.push_back(std::vector()); + reader_->ReadNext(&buffer_.back()); + if (buffer_.back().empty()) { + buffer_.pop_back(); break; } } diff --git a/python/paddle/fluid/tests/test_cpp_reader.py b/python/paddle/fluid/tests/test_cpp_reader.py index b65592057817cef83bf2157c55bacea5bbe34ea1..4b0d039b7e05a55980946a8949e32802e9e57c20 100644 --- a/python/paddle/fluid/tests/test_cpp_reader.py +++ b/python/paddle/fluid/tests/test_cpp_reader.py @@ -15,16 +15,30 @@ import paddle.v2 as paddle import paddle.fluid as fluid import numpy as np +import sys -prog = fluid.framework.Program() -block = prog.current_block() +startup_prog = fluid.framework.Program() +startup_block = startup_prog.current_block() -random_reader = block.create_var( +random_reader = startup_block.create_var( type=fluid.core.VarDesc.VarType.READER, name="RandomDataGenerator") random_reader.desc.set_dtypes( [fluid.core.VarDesc.VarType.FP32, fluid.core.VarDesc.VarType.FP32]) +random_reader.persistable = True +shuffle_reader = startup_block.create_var( + type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader") +shuffle_reader.persistable = True +batch_reader = startup_block.create_var( + type=fluid.core.VarDesc.VarType.READER, name="BatchReader") +batch_reader.persistable = True +double_buffer = startup_block.create_var( + type=fluid.core.VarDesc.VarType.READER, name="DoubleBuffer") +double_buffer.persistable = True + +main_prog = startup_prog.clone() +main_block = main_prog.current_block() -create_random_data_generator_op = block.append_op( +create_random_data_generator_op = startup_block.append_op( type="create_random_data_generator", outputs={"Out": random_reader}, attrs={ @@ -34,37 +48,45 @@ create_random_data_generator_op = block.append_op( "max": 1.0, 'lod_levels': [0, 0] }) -shuffle_reader = block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader") -create_shuffle_reader_op = block.append_op( +create_shuffle_reader_op = startup_block.append_op( type="create_shuffle_reader", inputs={"UnderlyingReader": random_reader}, outputs={"Out": shuffle_reader}, attrs={"buffer_size": 7}) -batch_reader = block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="BatchReader") - -create_batch_reader_op = block.append_op( +create_batch_reader_op = startup_block.append_op( type="create_batch_reader", inputs={"UnderlyingReader": shuffle_reader}, outputs={"Out": batch_reader}, attrs={"batch_size": 10}) -out1 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1") -out2 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2") +create_double_buffer_reader_op = startup_block.append_op( + type="create_double_buffer_reader", + inputs={"UnderlyingReader": batch_reader}, + outputs={"Out": double_buffer}) + +out1 = main_block.create_var( + type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1") +out2 = main_block.create_var( + type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2") -read_op = block.append_op( - type="read", inputs={"Reader": batch_reader}, +main_block.var("DoubleBuffer").desc.set_shapes(double_buffer.desc.shapes()) +main_block.var("DoubleBuffer").desc.set_dtypes(double_buffer.desc.dtypes()) +main_block.var("DoubleBuffer").desc.set_lod_levels( + double_buffer.desc.lod_levels()) + +read_op = main_block.append_op( + type="read", + inputs={"Reader": double_buffer}, outputs={"Out": [out1, out2]}) place = fluid.CPUPlace() exe = fluid.Executor(place) -[res1, res2] = exe.run(prog, fetch_list=[out1, out2]) - -if not (res1.shape == (10, 2) and res2.shape == (10, 1)): - exit(1) +exe.run(startup_prog) -exit(0) +for i in range(1, 100): + [res1, res2] = exe.run(main_prog, fetch_list=[out1, out2]) + if not (res1.shape == (10, 2) and res2.shape == (10, 1)): + exit(1)