diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 788f7582aebdce65f56d5270943e6e62f720b744..3f0f4492488f4741547a7ac346cc362e4e9fe859 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -20,8 +20,8 @@ namespace paddle { namespace operators { namespace reader { -static constexpr size_t kChannelSize = 2; -static constexpr size_t kCacheSize = 4; // kChannelSize + 2 +static constexpr size_t kCacheSize = 2; +static constexpr size_t kChannelSize = 0; // kCacheSize - 2 class DoubleBufferReader : public framework::DecoratedReader { public: @@ -36,7 +36,7 @@ class DoubleBufferReader : public framework::DecoratedReader { ReaderBase* reader, platform::Place target_place = platform::CPUPlace()) : DecoratedReader(reader), place_(target_place) { #ifdef PADDLE_WITH_CUDA - for (size_t i = 0; i < kChannelSize + 2; ++i) { + for (size_t i = 0; i < kCacheSize; ++i) { if (platform::is_gpu_place(place_)) { ctxs_.emplace_back(new platform::CUDADeviceContext( boost::get(place_))); @@ -51,17 +51,17 @@ class DoubleBufferReader : public framework::DecoratedReader { void ReInit() override; void StartPrefetcher() { - buffer_ = framework::MakeChannel(kChannelSize); + channel_ = framework::MakeChannel(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); } void EndPrefetcher() { - buffer_->Close(); - if (prefecther_.joinable()) { + channel_->Close(); + if (prefetcher_.joinable()) { prefetcher_.join(); } - delete buffer_; - buffer_ = nullptr; + delete channel_; + channel_ = nullptr; } ~DoubleBufferReader() { EndPrefetcher(); } @@ -70,7 +70,7 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); std::thread prefetcher_; - framework::Channel* buffer_; + framework::Channel* channel_; platform::Place place_; std::vector> ctxs_; }; @@ -127,9 +127,9 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { }; bool DoubleBufferReader::HasNext() const { - while (!buffer_->IsClosed() && !buffer_->CanReceive()) { + while (!channel_->IsClosed() && !channel_->CanReceive()) { } - return buffer_->CanReceive() + return channel_->CanReceive(); } void DoubleBufferReader::ReadNext(std::vector* out) { @@ -138,8 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector* out) { } Item batch; - buffer_->Receive(&batch); - *out = batch.payload_; + channel_->Receive(&batch); + *out = batch.payloads_; if (batch.ctx_) { batch.ctx_->Wait(); } @@ -167,26 +167,26 @@ void DoubleBufferReader::PrefetchThreadFunc() { gpu_batch.resize(cpu_batch.size()); for (size_t i = 0; i < cpu_batch.size(); ++i) { framework::TensorCopy(cpu_batch[i], place_, *gpu_ctx, &gpu_batch[i]); - gpu_batch[i].set_lod(batch.payloads_[i].lod()); + gpu_batch[i].set_lod(cpu_batch[i].lod()); } - batch.payload_ = gpu_batch; + batch.payloads_ = gpu_batch; batch.ctx_ = gpu_ctx; } else { // CPUPlace - batch.payload_ = cpu_batch; + batch.payloads_ = cpu_batch; } ++cached_tensor_id; cached_tensor_id %= kCacheSize; try { - buffer_->Send(&batch); + channel_->Send(&batch); } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread will terminate."; break; } } - buffer_->Close(); + channel_->Close(); VLOG(5) << "Prefetch thread terminates."; }