diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 8960fe5d636c3f8242300da28c10b5317306e57a..bd0bb2ee3b0252f47318c59d9940d8dd478723de 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader { void start_thread() { buffer_ = framework::MakeChannel(kDoubleBufferSize); - std::thread prefetch([this] { PrefetchThreadFunc(); }); - prefetch.detach(); + prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); } void ReadNext(std::vector* out) override; void ReInit() override; - ~DoubleBufferReader() { buffer_->Close(); } + ~DoubleBufferReader() { + buffer_->Close(); + prefetcher_.join(); + delete buffer_; + } bool HasNext() const override; private: void PrefetchThreadFunc(); + std::thread prefetcher_; framework::Channel* buffer_; platform::Place place_; std::vector> ctxs_; @@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector* out) { void DoubleBufferReader::ReInit() { reader_->ReInit(); buffer_->Close(); + prefetcher_.join(); + delete buffer_; start_thread(); } @@ -159,8 +165,8 @@ void DoubleBufferReader::PrefetchThreadFunc() { if (!buffer_->Send(&batch)) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " - "prefetch thread terminate."; - return; + "prefetch thread will terminate."; + break; } } buffer_->Close();