diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 3231b2ab27a779c1b4f644e8d13b85826e9384bd..567b0ee99f67fe39a7ca3fb0f41afcfd3ee179d7 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -18,11 +18,13 @@ namespace paddle { namespace framework { void ReaderBase::ReadNext(std::vector *out) { + std::lock_guard lock(mu_); PADDLE_ENFORCE_EQ(status_, ReaderStatus::kRunning); ReadNextImpl(out); } void ReaderBase::Shutdown() { + std::lock_guard lock(mu_); if (status_ != ReaderStatus::kStopped) { ShutdownImpl(); status_ = ReaderStatus::kStopped; @@ -30,6 +32,7 @@ void ReaderBase::Shutdown() { } void ReaderBase::Start() { + std::lock_guard lock(mu_); if (status_ != ReaderStatus::kRunning) { StartImpl(); status_ = ReaderStatus::kRunning; diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 9dc5fce4aa586594bff8730db98bab72cda5a69a..8e7f43cdf97c369fd405c8beed3259cde9af9634 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -44,6 +44,8 @@ class ReaderBase { virtual void StartImpl() = 0; ReaderStatus status_{kRunning}; + + mutable std::mutex mu_; }; class DecoratedReader : public ReaderBase { diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index a39c8a00538875e4e3284898230a6cb0693b7a12..9dbcc35e6f5bb01c159980a49dd4b4c9d37d2aab 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -22,7 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) -reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc) diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc deleted file mode 100644 index 88a2bcab8df2c90c33d4abc555aa403ed4c7c4d5..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class ThreadedReader : public framework::DecoratedReader { - public: - explicit ThreadedReader(const std::shared_ptr& reader) - : DecoratedReader(reader) {} - - void ReadNextImpl(std::vector* out) override { - std::lock_guard lock(mutex_); - reader_->ReadNext(out); - } - - private: - void ShutdownImpl() override { - std::lock_guard lock(mutex_); - reader_->Shutdown(); - } - - void StartImpl() override { - std::lock_guard lock(mutex_); - reader_->Start(); - } - - std::mutex mutex_; -}; - -class CreateThreadedReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(new ThreadedReader(underlying_reader.Get())); - } -}; - -class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddComment(R"DOC( - CreateThreadedReader Operator - - This operator creates a threaded reader. A threaded reader's - 'ReadNext()' can be invoked by several threads at the same - time. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader, - reader::CreateThreadedReaderOp, - reader::CreateThreadedReaderOpMaker); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index f33ae76aea95ceeca73c5bae6e4e490cdff29bf3..2346252658f6888f372b5565d01e18fc89945efd 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -529,9 +529,6 @@ def open_files(filenames, main_prog_reader = multi_pass( reader=main_prog_reader, pass_num=pass_num) - if for_parallel: - main_prog_reader = parallel(reader=main_prog_reader) - return monkey_patch_reader_methods(main_prog_reader) @@ -647,11 +644,6 @@ def multi_pass(reader, pass_num): 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) -def parallel(reader): - return __create_shared_decorated_reader__('create_threaded_reader', reader, - {}) - - def read_file(reader): """ Execute the given reader and get data via it.