diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index a020e0c68f6decd0234350117e9b9c06a658f192..b086bf6e72dfc146736936d26823f77d3b978b9c 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -18,10 +18,7 @@ namespace paddle { namespace operators { namespace reader { -BufferedReader::~BufferedReader() { - reader_->Shutdown(); - buffer_.clear(); -} +BufferedReader::~BufferedReader() { reader_->Shutdown(); } BufferedReader::BufferedReader( const std::shared_ptr &reader, const platform::Place &place, size_t buffer_size) @@ -29,43 +26,60 @@ BufferedReader::BufferedReader( thread_pool_(1), place_(place), buffer_size_(buffer_size) { + cpu_buffer_.resize(buffer_size); + gpu_buffer_.resize(buffer_size); AppendFutureToBatchSize(); } void BufferedReader::AppendFutureToBatchSize() { - while (buffer_.size() < buffer_size_) { - AppendFuture(); + PADDLE_ENFORCE_EQ(position_.size(), 0U); + for (size_t i = 0; i < buffer_size_; ++i) { + AppendFuture(i); } } -void BufferedReader::AppendFuture() { - buffer_.emplace_back(thread_pool_.enqueue([this] { - TensorVec cpu_buffer; - reader_->ReadNext(&cpu_buffer); - if (platform::is_gpu_place(place_)) { - TensorVec gpu_buffer; +void BufferedReader::AppendFuture(size_t i) { + position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { + TensorVec &cpu = cpu_buffer_[i]; + reader_->ReadNext(&cpu); - for (size_t i = 0; i < cpu_buffer.size(); ++i) { - gpu_buffer.emplace_back(); - framework::TensorCopySync(cpu_buffer[i], place_, &gpu_buffer.back()); - } + if (cpu.empty()) { + return -1UL; + } - cpu_buffer = gpu_buffer; + if (platform::is_gpu_place(place_)) { + TensorVec &gpu = gpu_buffer_[i]; + gpu.resize(cpu.size()); + for (size_t i = 0; i < cpu.size(); ++i) { + framework::TensorCopySync(cpu[i], place_, &gpu[i]); + } } - return cpu_buffer; + return i; })); } void BufferedReader::ShutdownImpl() { reader_->Shutdown(); - buffer_.clear(); + while (!position_.empty()) { + position_.pop(); + } } void BufferedReader::StartImpl() { reader_->Start(); AppendFutureToBatchSize(); } void BufferedReader::ReadNextImpl(std::vector *out) { - PADDLE_ENFORCE_EQ(buffer_.size(), buffer_size_); - *out = buffer_.front().get(); - buffer_.pop_front(); - AppendFuture(); + if (position_.empty()) { + out->clear(); + return; + } + size_t i = position_.front().get(); + position_.pop(); + + if (i == -1UL) { + ReadNextImpl(out); + return; + } + + *out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i]; + AppendFuture(i); } } // namespace reader diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index eb702a2322af220852091954a5c80d4794077f76..58999beebb8c2582ef90a35bd671ba51f34ac782 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "ThreadPool.h" #include "paddle/fluid/framework/reader.h" @@ -36,7 +37,7 @@ class BufferedReader : public framework::DecoratedReader { private: void AppendFutureToBatchSize(); - void AppendFuture(); + void AppendFuture(size_t i); protected: void ShutdownImpl() override; @@ -47,7 +48,10 @@ class BufferedReader : public framework::DecoratedReader { ThreadPool thread_pool_; platform::Place place_; const size_t buffer_size_; - std::list buffer_; + + std::queue> position_; + std::vector cpu_buffer_; + std::vector gpu_buffer_; }; } // namespace reader