diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 5d0c27d30c4becb45c524bccaf9019d994473e1c..71684b14176edc8f71efbefa9a7decffc8f3011e 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -25,6 +25,11 @@ namespace reader { template class BlockingQueue { + // BlockingQueue is for buffered reading and is supposed to use only the + // reader package. It is true that we could and we should have been using + // framework::Channel, but which has currently a deadlock bug. BlockingQueue + // is a workaround and a simplified version of framework::Channel as it + // doesn't support GPU and it implements on buffered blocking queue. public: explicit BlockingQueue(size_t capacity) : capacity_(capacity), closed_(false) { @@ -37,26 +42,28 @@ class BlockingQueue { std::unique_lock lock(mutex_); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; return false; - } else { - PADDLE_ENFORCE_LT(queue_.size(), capacity_); - queue_.push_back(elem); - receive_cv_.notify_one(); - return true; } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + receive_cv_.notify_one(); + return true; } bool Send(T&& elem) { std::unique_lock lock(mutex_); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; return false; - } else { - PADDLE_ENFORCE_LT(queue_.size(), capacity_); - queue_.emplace_back(std::move(elem)); - receive_cv_.notify_one(); - return true; } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + receive_cv_.notify_one(); + return true; } bool Receive(T* elem) { @@ -86,16 +93,6 @@ class BlockingQueue { return closed_; } - bool CanSend() { - std::lock_guard lock(mutex_); - return !closed_ && queue_.size() < capacity_; - } - - bool CanReceive() { - std::lock_guard lock(mutex_); - return !queue_.empty(); - } - size_t Cap() { std::lock_guard lock(mutex_); return capacity_; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 504e069b65def012da72f1547e6bbc2043d3709f..3fdc31dfa5242b6487c308d395d70d7ff348bc73 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -55,8 +55,6 @@ class DoubleBufferReader : public framework::DecoratedReader { ~DoubleBufferReader() { EndPrefetcher(); } private: - bool HasNext() const; - void StartPrefetcher() { channel_ = new reader::BlockingQueue(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); @@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { }; void DoubleBufferReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - size_t cached_tensor_id; - channel_->Receive(&cached_tensor_id); + size_t cached_tensor_id; + if (channel_->Receive(&cached_tensor_id)) { if (platform::is_gpu_place(place_)) { *out = gpu_tensor_cache_[cached_tensor_id]; - ctxs_[cached_tensor_id]->Wait(); } else { // CPU place *out = cpu_tensor_cache_[cached_tensor_id]; } + } else { + out->clear(); } } @@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() { StartPrefetcher(); } -bool DoubleBufferReader::HasNext() const { - while (!channel_->IsClosed() && !channel_->CanReceive()) { - } - return channel_->CanReceive(); -} - void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; size_t cached_tensor_id = 0; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 461b56c7a43ae760e44625166563e805272f88d3..91ad7d56583446ee4686e74187de166f387125df 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase { ~MultiFileReader() { EndScheduler(); } private: - bool HasNext(); void StartNewScheduler(); void EndScheduler(); void ScheduleThreadFunc(); @@ -54,9 +53,8 @@ class MultiFileReader : public framework::ReaderBase { }; void MultiFileReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - buffer_->Receive(out); + if (!buffer_->Receive(out)) { + out->clear(); } } @@ -65,12 +63,6 @@ void MultiFileReader::ReInit() { StartNewScheduler(); } -bool MultiFileReader::HasNext() { - while (!buffer_->IsClosed() && !buffer_->CanReceive()) { - } - return buffer_->CanReceive(); -} - void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_file_idx_ = new reader::BlockingQueue(file_names_.size());