// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/platform/place.h" namespace paddle { namespace framework { class ReaderBase { public: virtual void ReadNext(std::vector* out); virtual void Shutdown(); virtual void Start(); // Return the readers which are the end of decorating chain. Basically // they are readers just before read op. std::unordered_set GetEndPoints(); virtual ~ReaderBase(); protected: virtual void ReadNextImpl(std::vector* out) {} virtual void ShutdownImpl() {} virtual void StartImpl() {} enum ReaderStatus { kRunning, kStopped }; ReaderStatus status_{kRunning}; mutable std::mutex mu_; private: friend class DecoratedReader; friend class MultiDecoratedReader; // These methods can be only invoked inside DecoratedReader to record the // decorating chain. void InsertDecoratedReader( const std::shared_ptr& decorated_reader); // A set of which readers that decorated this reader. std::vector> decorated_readers_; }; class DecoratedReaderBase : public ReaderBase { public: virtual void RegisterDecorateChain() = 0; }; class DecoratedReader : public DecoratedReaderBase, public std::enable_shared_from_this { public: explicit DecoratedReader(const std::shared_ptr& reader) : DecoratedReaderBase(), reader_(reader) { PADDLE_ENFORCE_NOT_NULL(reader_); } void RegisterDecorateChain() final { reader_->InsertDecoratedReader(shared_from_this()); } ~DecoratedReader(); protected: void ShutdownImpl() override { reader_->Shutdown(); } void StartImpl() override { reader_->Start(); } std::shared_ptr reader_; }; class MultiDecoratedReader : public DecoratedReaderBase, public std::enable_shared_from_this { public: explicit MultiDecoratedReader( const std::vector>& readers) : readers_(readers) { PADDLE_ENFORCE(!readers_.empty()); for (auto& r : readers_) { PADDLE_ENFORCE_NOT_NULL(r); } } void RegisterDecorateChain() final { for (auto& r : readers_) { r->InsertDecoratedReader(shared_from_this()); } } protected: void ShutdownImpl() override { for (auto& r : readers_) { r->Shutdown(); } } void StartImpl() override { for (auto& r : readers_) { r->Start(); } } std::vector> readers_; }; // FileReader is just a conceptual class. class FileReader : public ReaderBase {}; // The ReaderHolder is used as reader' unified wrapper, // making it easier to access different type reader in Variables. class ReaderHolder { public: template void Reset(const std::shared_ptr& reader) { auto reader_base = std::dynamic_pointer_cast(reader); PADDLE_ENFORCE_NOT_NULL(reader_base); reader_ = reader_base; } const std::shared_ptr& Get() const { return reader_; } void ReadNext(std::vector* out) { PADDLE_ENFORCE_NOT_NULL(reader_); reader_->ReadNext(out); } void ResetAll() { auto end_readers = reader_->GetEndPoints(); for (auto* reader : end_readers) { reader->Shutdown(); } for (auto* reader : end_readers) { reader->Start(); } } void Shutdown() { PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Shutdown(); } void Start() { PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Start(); } operator const std::shared_ptr&() const { return this->reader_; } private: std::shared_ptr reader_; }; template inline std::shared_ptr MakeDecoratedReader( ARGS&&... args) { std::shared_ptr reader( new T(std::forward(args)...)); reader->RegisterDecorateChain(); return reader; } } // namespace framework } // namespace paddle