From 060f42179794623d81f0d464d880dc17720988c9 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Fri, 20 Jul 2018 16:03:38 +0800 Subject: [PATCH] Some enhancement on readers 1. Make the feeding thread of py_reader a daemon thread. 2. Update buffer_reader's destructor, fixing a bug. 3. Make pyreader demo script supporting CPU environment. --- paddle/fluid/operators/reader/buffered_reader.cc | 14 +++++++++++++- python/paddle/fluid/layers/io.py | 9 +++++---- python/paddle/fluid/tests/demo/pyreader.py | 13 ++++++++----- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index ba1b3d3e583..26ff221dfa0 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -18,7 +18,14 @@ namespace paddle { namespace operators { namespace reader { -BufferedReader::~BufferedReader() { reader_->Shutdown(); } +BufferedReader::~BufferedReader() { + reader_->Shutdown(); + while (!position_.empty()) { + position_.front().wait(); + position_.pop(); + } +} + BufferedReader::BufferedReader( const std::shared_ptr &reader, const platform::Place &place, size_t buffer_size) @@ -30,12 +37,14 @@ BufferedReader::BufferedReader( gpu_buffer_.resize(buffer_size); ReadTillBufferFullAsync(); } + void BufferedReader::ReadTillBufferFullAsync() { PADDLE_ENFORCE_EQ(position_.size(), 0U); for (size_t i = 0; i < buffer_size_; ++i) { ReadAsync(i); } } + void BufferedReader::ReadAsync(size_t i) { position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { TensorVec &cpu = cpu_buffer_[i]; @@ -56,6 +65,7 @@ void BufferedReader::ReadAsync(size_t i) { return i; })); } + void BufferedReader::ShutdownImpl() { reader_->Shutdown(); while (!position_.empty()) { @@ -63,10 +73,12 @@ void BufferedReader::ShutdownImpl() { } prev_pos_ = -1UL; } + void BufferedReader::StartImpl() { reader_->Start(); ReadTillBufferFullAsync(); } + void BufferedReader::ReadNextImpl(std::vector *out) { if (position_.empty()) { out->clear(); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 07a7ef15ac5..2e57c9f8943 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -457,7 +457,7 @@ def py_reader(capacity, use_double_buffer=True): """ Create a reader and blocking queue for data feeding in Python - + This layer returns a Reader Variable and a BlockingQueue. The BlockingQueue provides `push()` method to push a `LoDTensorArray` object into the queue in Python side. In C++ side, the Reader @@ -478,7 +478,7 @@ def py_reader(capacity, Returns: tuple(Variable, BlockingQueue): A Reader Variable from which we can get feeding data. - + A BlockingQueue object for data feeding. Examples: @@ -491,7 +491,7 @@ def py_reader(capacity, dtypes=['float32', 'int64']) # Via the reader, we can use 'read_file' layer to get data: image, label = fluid.layers.read_file(reader) - + # Via the blocking queue, we can feed data using threads def feed_data(queue, feed_images, feed_labels): for feed_image, feed_label in zip(feed_images, feed_labels): @@ -499,7 +499,7 @@ def py_reader(capacity, data.append(feed_image) data.append(feed_label) queue.push(data) - + thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels)) thread.start() """ @@ -579,6 +579,7 @@ def py_reader(capacity, feed_queue.close() reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.daemon = True reader.thread.start() def __set_tensor_provider__(func): diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py index 3a7dbf81062..82065401935 100644 --- a/python/paddle/fluid/tests/demo/pyreader.py +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -25,7 +25,8 @@ def network(is_train): capacity=10, shapes=((-1, 784), (-1, 1)), dtypes=('float32', 'int64'), - name="train_reader" if is_train else "test_reader") + name="train_reader" if is_train else "test_reader", + use_double_buffer=True) img, label = fluid.layers.read_file(reader) hidden = img @@ -56,14 +57,16 @@ def main(): with fluid.unique_name.guard(): test_loss, test_reader = network(False) - fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) - fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) + use_cuda = fluid.core.is_compiled_with_cuda() + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + fluid.Executor(place).run(startup_prog) + fluid.Executor(place).run(test_startup) trainer = fluid.ParallelExecutor( - use_cuda=True, loss_name=loss.name, main_program=train_prog) + use_cuda=use_cuda, loss_name=loss.name, main_program=train_prog) tester = fluid.ParallelExecutor( - use_cuda=True, share_vars_from=trainer, main_program=test_prog) + use_cuda=use_cuda, share_vars_from=trainer, main_program=test_prog) train_reader.decorate_paddle_reader( paddle.v2.reader.shuffle( -- GitLab