From 2ea4a5d96c0d134c84651e691510f90c8b19f0fa Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 12 Mar 2018 15:39:31 +0800 Subject: [PATCH] Polish double buffer reader --- .../reader/create_double_buffer_reader_op.cc | 79 ++++++++++++++----- python/paddle/fluid/layers/io.py | 10 ++- .../tests/unittests/test_recordio_reader.py | 14 +++- 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ba08ea12e2..ca947fff43 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -24,11 +24,16 @@ static constexpr size_t kDoubleBufferSize = 2; class DoubleBufferReader : public framework::DecoratedReader { public: - explicit DoubleBufferReader(ReaderBase* reader) - : DecoratedReader(reader), - buffer_(framework::MakeChannel>( - kDoubleBufferSize)) { - std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); + explicit DoubleBufferReader( + ReaderBase* reader, platform::Place target_place = platform::CPUPlace()) + : DecoratedReader(reader), place_(target_place) { + start_thread(); + } + + void start_thread() { + buffer_ = framework::MakeChannel>( + kDoubleBufferSize); + std::thread prefetch([this] { PrefetchThreadFunc(); }); prefetch.detach(); } @@ -43,6 +48,8 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); framework::Channel>* buffer_; + platform::Place place_; + mutable std::vector local_buffer_; }; class CreateDoubleBufferReaderOp : public framework::OperatorBase { @@ -56,7 +63,20 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { ->Get(); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new DoubleBufferReader(underlying_reader.Get())); + + auto place_str = Attr("place"); + platform::Place place; + if (place_str == "CPU") { + place = platform::CPUPlace(); + } else { + std::istringstream sin(place_str); + sin.seekg(std::string("CUDA:").size(), std::ios::beg); + size_t num; + sin >> num; + place = platform::CUDAPlace(static_cast(num)); + } + + out->Reset(new DoubleBufferReader(underlying_reader.Get(), place)); } }; @@ -71,44 +91,65 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { It launches another thread to execute the 'underlying reader' asynchronously, which prevents reading process from blocking subsequent training. )DOC"); + std::unordered_set enum_range; + constexpr size_t kMaxCUDADevs = 128; + for (size_t i = 0; i < kMaxCUDADevs; ++i) { + enum_range.insert(string::Sprintf("CUDA:%d", i)); + } + enum_range.insert("CPU"); + AddAttr("place", "The double buffer place, default is CPU") + .SetDefault("CPU") + .InEnum({enum_range}); } }; void DoubleBufferReader::ReadNext(std::vector* out) { out->clear(); - buffer_->Receive(out); + if (local_buffer_.empty()) { + buffer_->Receive(out); + } else { + *out = local_buffer_; + local_buffer_.clear(); + } } void DoubleBufferReader::ReInit() { reader_->ReInit(); buffer_->Close(); - // The existing prefetch thread will terminate for the buffer_ is closed. - buffer_ = framework::MakeChannel>( - kDoubleBufferSize); - std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); - prefetch.detach(); + start_thread(); } void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; - while (true) { + while (reader_->HasNext()) { std::vector batch; reader_->ReadNext(&batch); - if (batch.empty()) { - // EOF - buffer_->Close(); - VLOG(5) << "Reached the end of the file. The prefetch thread terminates."; - break; + if (platform::is_gpu_place(place_)) { + std::vector gpu_batch; + gpu_batch.resize(batch.size()); + for (size_t i = 0; i < batch.size(); ++i) { + framework::TensorCopy(batch[i], place_, &gpu_batch[i]); + gpu_batch[i].set_lod(batch[i].lod()); + } } + if (!buffer_->Send(&batch)) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread terminates."; break; } } + buffer_->Close(); } -bool DoubleBufferReader::HasNext() const { PADDLE_THROW("Not Implemented"); } +bool DoubleBufferReader::HasNext() const { + if (local_buffer_.empty()) { + bool ok = buffer_->Receive(&local_buffer_); + return ok; + } else { + return true; + } +} } // namespace reader } // namespace operators diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 81dd978949..9c91f395e7 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,7 +21,7 @@ from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'read_file', 'create_shuffle_reader' + 'read_file', 'create_shuffle_reader', 'create_double_buffer_reader' ] @@ -306,6 +306,14 @@ def create_shuffle_reader(reader, buffer_size): {'buffer_size': int(buffer_size)}) +def create_double_buffer_reader(reader, place=None): + attrs = dict() + if place is not None: + attrs['place'] = str(place).upper() + return __create_decorated_reader__('create_double_buffer_reader', reader, + attrs) + + def read_file(file_obj): helper = LayerHelper('read_file') out = [ diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index cdebda5b7d..24a0074d9b 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -13,9 +13,10 @@ # limitations under the License. import unittest + import paddle.fluid as fluid -import paddle.v2.dataset.mnist as mnist import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist class TestRecordIO(unittest.TestCase): @@ -53,7 +54,12 @@ class TestRecordIO(unittest.TestCase): fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) - exe = fluid.Executor(fluid.CPUPlace()) + if fluid.core.is_compiled_with_cuda(): + place = fluid.CUDAPlace(0) + else: + place = fluid.CPUPlace() + + exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) avg_loss_np = [] @@ -69,3 +75,7 @@ class TestRecordIO(unittest.TestCase): def test_shuffle_reader(self): self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) + + def test_double_buffer_reader(self): + self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader, + place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) -- GitLab