提交 060f4217 编写于 作者: F fengjiayi

Some enhancement on readers

1. Make the feeding thread of py_reader a daemon thread.
2. Update buffer_reader's destructor, fixing a bug.
3. Make pyreader demo script supporting CPU environment.
上级 3a6213f4
...@@ -18,7 +18,14 @@ ...@@ -18,7 +18,14 @@
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace reader { namespace reader {
BufferedReader::~BufferedReader() { reader_->Shutdown(); } BufferedReader::~BufferedReader() {
reader_->Shutdown();
while (!position_.empty()) {
position_.front().wait();
position_.pop();
}
}
BufferedReader::BufferedReader( BufferedReader::BufferedReader(
const std::shared_ptr<framework::ReaderBase> &reader, const std::shared_ptr<framework::ReaderBase> &reader,
const platform::Place &place, size_t buffer_size) const platform::Place &place, size_t buffer_size)
...@@ -30,12 +37,14 @@ BufferedReader::BufferedReader( ...@@ -30,12 +37,14 @@ BufferedReader::BufferedReader(
gpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size);
ReadTillBufferFullAsync(); ReadTillBufferFullAsync();
} }
void BufferedReader::ReadTillBufferFullAsync() { void BufferedReader::ReadTillBufferFullAsync() {
PADDLE_ENFORCE_EQ(position_.size(), 0U); PADDLE_ENFORCE_EQ(position_.size(), 0U);
for (size_t i = 0; i < buffer_size_; ++i) { for (size_t i = 0; i < buffer_size_; ++i) {
ReadAsync(i); ReadAsync(i);
} }
} }
void BufferedReader::ReadAsync(size_t i) { void BufferedReader::ReadAsync(size_t i) {
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
TensorVec &cpu = cpu_buffer_[i]; TensorVec &cpu = cpu_buffer_[i];
...@@ -56,6 +65,7 @@ void BufferedReader::ReadAsync(size_t i) { ...@@ -56,6 +65,7 @@ void BufferedReader::ReadAsync(size_t i) {
return i; return i;
})); }));
} }
void BufferedReader::ShutdownImpl() { void BufferedReader::ShutdownImpl() {
reader_->Shutdown(); reader_->Shutdown();
while (!position_.empty()) { while (!position_.empty()) {
...@@ -63,10 +73,12 @@ void BufferedReader::ShutdownImpl() { ...@@ -63,10 +73,12 @@ void BufferedReader::ShutdownImpl() {
} }
prev_pos_ = -1UL; prev_pos_ = -1UL;
} }
void BufferedReader::StartImpl() { void BufferedReader::StartImpl() {
reader_->Start(); reader_->Start();
ReadTillBufferFullAsync(); ReadTillBufferFullAsync();
} }
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) { void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
if (position_.empty()) { if (position_.empty()) {
out->clear(); out->clear();
......
...@@ -457,7 +457,7 @@ def py_reader(capacity, ...@@ -457,7 +457,7 @@ def py_reader(capacity,
use_double_buffer=True): use_double_buffer=True):
""" """
Create a reader and blocking queue for data feeding in Python Create a reader and blocking queue for data feeding in Python
This layer returns a Reader Variable and a BlockingQueue. This layer returns a Reader Variable and a BlockingQueue.
The BlockingQueue provides `push()` method to push a `LoDTensorArray` The BlockingQueue provides `push()` method to push a `LoDTensorArray`
object into the queue in Python side. In C++ side, the Reader object into the queue in Python side. In C++ side, the Reader
...@@ -478,7 +478,7 @@ def py_reader(capacity, ...@@ -478,7 +478,7 @@ def py_reader(capacity,
Returns: Returns:
tuple(Variable, BlockingQueue): tuple(Variable, BlockingQueue):
A Reader Variable from which we can get feeding data. A Reader Variable from which we can get feeding data.
A BlockingQueue object for data feeding. A BlockingQueue object for data feeding.
Examples: Examples:
...@@ -491,7 +491,7 @@ def py_reader(capacity, ...@@ -491,7 +491,7 @@ def py_reader(capacity,
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'])
# Via the reader, we can use 'read_file' layer to get data: # Via the reader, we can use 'read_file' layer to get data:
image, label = fluid.layers.read_file(reader) image, label = fluid.layers.read_file(reader)
# Via the blocking queue, we can feed data using threads # Via the blocking queue, we can feed data using threads
def feed_data(queue, feed_images, feed_labels): def feed_data(queue, feed_images, feed_labels):
for feed_image, feed_label in zip(feed_images, feed_labels): for feed_image, feed_label in zip(feed_images, feed_labels):
...@@ -499,7 +499,7 @@ def py_reader(capacity, ...@@ -499,7 +499,7 @@ def py_reader(capacity,
data.append(feed_image) data.append(feed_image)
data.append(feed_label) data.append(feed_label)
queue.push(data) queue.push(data)
thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels)) thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels))
thread.start() thread.start()
""" """
...@@ -579,6 +579,7 @@ def py_reader(capacity, ...@@ -579,6 +579,7 @@ def py_reader(capacity,
feed_queue.close() feed_queue.close()
reader.thread = threading.Thread(target=__provider_thread__) reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start() reader.thread.start()
def __set_tensor_provider__(func): def __set_tensor_provider__(func):
......
...@@ -25,7 +25,8 @@ def network(is_train): ...@@ -25,7 +25,8 @@ def network(is_train):
capacity=10, capacity=10,
shapes=((-1, 784), (-1, 1)), shapes=((-1, 784), (-1, 1)),
dtypes=('float32', 'int64'), dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader") name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
img, label = fluid.layers.read_file(reader) img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
...@@ -56,14 +57,16 @@ def main(): ...@@ -56,14 +57,16 @@ def main():
with fluid.unique_name.guard(): with fluid.unique_name.guard():
test_loss, test_reader = network(False) test_loss, test_reader = network(False)
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) use_cuda = fluid.core.is_compiled_with_cuda()
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
fluid.Executor(place).run(startup_prog)
fluid.Executor(place).run(test_startup)
trainer = fluid.ParallelExecutor( trainer = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=train_prog) use_cuda=use_cuda, loss_name=loss.name, main_program=train_prog)
tester = fluid.ParallelExecutor( tester = fluid.ParallelExecutor(
use_cuda=True, share_vars_from=trainer, main_program=test_prog) use_cuda=use_cuda, share_vars_from=trainer, main_program=test_prog)
train_reader.decorate_paddle_reader( train_reader.decorate_paddle_reader(
paddle.v2.reader.shuffle( paddle.v2.reader.shuffle(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册