diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index b086bf6e72dfc146736936d26823f77d3b978b9c..ba1b3d3e583868c5c360d4766e14c15538b22b33 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -28,15 +28,15 @@ BufferedReader::BufferedReader( buffer_size_(buffer_size) { cpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size); - AppendFutureToBatchSize(); + ReadTillBufferFullAsync(); } -void BufferedReader::AppendFutureToBatchSize() { +void BufferedReader::ReadTillBufferFullAsync() { PADDLE_ENFORCE_EQ(position_.size(), 0U); for (size_t i = 0; i < buffer_size_; ++i) { - AppendFuture(i); + ReadAsync(i); } } -void BufferedReader::AppendFuture(size_t i) { +void BufferedReader::ReadAsync(size_t i) { position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { TensorVec &cpu = cpu_buffer_[i]; reader_->ReadNext(&cpu); @@ -50,6 +50,7 @@ void BufferedReader::AppendFuture(size_t i) { gpu.resize(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { framework::TensorCopySync(cpu[i], place_, &gpu[i]); + gpu[i].set_lod(cpu[i].lod()); } } return i; @@ -60,10 +61,11 @@ void BufferedReader::ShutdownImpl() { while (!position_.empty()) { position_.pop(); } + prev_pos_ = -1UL; } void BufferedReader::StartImpl() { reader_->Start(); - AppendFutureToBatchSize(); + ReadTillBufferFullAsync(); } void BufferedReader::ReadNextImpl(std::vector *out) { if (position_.empty()) { @@ -79,7 +81,14 @@ void BufferedReader::ReadNextImpl(std::vector *out) { } *out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i]; - AppendFuture(i); + + // Do not push current position into ReadAsync. Push the previous position + // Since all computation in fluid are async, change the data of + // current position may cause data error. + if (prev_pos_ != -1Ul) { + ReadAsync(prev_pos_); + } + prev_pos_ = i; } } // namespace reader diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index 58999beebb8c2582ef90a35bd671ba51f34ac782..7f5d3d9fff8f28c8533587f326796f846987412e 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -35,9 +35,9 @@ class BufferedReader : public framework::DecoratedReader { ~BufferedReader() override; private: - void AppendFutureToBatchSize(); + void ReadTillBufferFullAsync(); - void AppendFuture(size_t i); + void ReadAsync(size_t i); protected: void ShutdownImpl() override; @@ -50,8 +50,15 @@ class BufferedReader : public framework::DecoratedReader { const size_t buffer_size_; std::queue> position_; + + // The buffer for reading data. + // NOTE: the simplest way to implement buffered reader is do not use any + // buffer, just async read and create futures as buffer size. However, to + // malloc Tensor every time is extremely slow. Here we store all data in + // buffers and prevent alloc every time. std::vector cpu_buffer_; std::vector gpu_buffer_; + size_t prev_pos_{-1UL}; }; } // namespace reader diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py b/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py index 05715464848d835684dd3cf0e99e5d4dd482e0b6..91b1fd2af7d8aaf85d17965f8b02c35ee3990291 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py @@ -45,12 +45,12 @@ class TestPyReader(unittest.TestCase): ) else fluid.CPUPlace() executor = fluid.Executor(place) - data_file, feed_queue = fluid.layers.py_reader( + data_file = fluid.layers.py_reader( capacity=self.capacity, dtypes=self.dtypes, lod_levels=self.lod_levels, shapes=self.shapes) - + feed_queue = data_file.queue read_out_data = fluid.layers.read_file(data_file) self.inputs = [] diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index 9a5b69eea46e74deeba87aefae4afac84c7745f1..82f39d392500150f07b7074da432bc220a553424 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -52,11 +52,12 @@ def simple_fc_net(in_size, batch_size, queue_capacity, use_double_buffer=False): - reader, feed_queue = fluid.layers.py_reader( + reader = fluid.layers.py_reader( capacity=queue_capacity, shapes=[[-1, in_size], [-1, 1]], lod_levels=[0, 0], dtypes=['float32', 'int64']) + feed_queue = reader.queue reader = fluid.layers.batch(reader, batch_size=batch_size) if use_double_buffer: reader = fluid.layers.double_buffer(reader)