// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { namespace operators { namespace reader { class PyReader : public framework::FileReader { public: explicit PyReader(const std::shared_ptr& queue) : framework::FileReader() { PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null"); queue_ = queue; } void ReadNext(std::vector* out) override { bool success; *out = queue_->Pop(&success); if (!success) out->clear(); } ~PyReader() { VLOG(1) << "~PyReader"; queue_->Close(); } void Shutdown() override { VLOG(1) << "PyReader shutdown!"; queue_->Close(); } void Start() override { queue_->ReOpen(); } private: std::shared_ptr queue_; }; class CreatePyReaderOp : public framework::OperatorBase { public: using framework::OperatorBase::OperatorBase; private: void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); if (out->Get() != nullptr) return; const std::string& queue_name = Input("blocking_queue"); auto* queue_holder_var = scope.FindVar(queue_name); PADDLE_ENFORCE_NOT_NULL( queue_holder_var, "No LoDTensorBlockingQueueHolder variable with name %s found", queue_name); auto* queue_holder = queue_holder_var->template GetMutable(); out->Reset(std::make_shared(queue_holder->GetQueue())); } }; class CreatePyReaderOpMaker : public FileReaderMakerBase { protected: void Apply() override { AddInput("blocking_queue", "Name of the `LoDTensorBlockingQueueHolder` variable"); AddComment(R"DOC( Create PyReader to support LoDTensor data feeding in Python side. )DOC"); } }; } // namespace reader } // namespace operators } // namespace paddle namespace reader = ::paddle::operators::reader; REGISTER_FILE_READER_OPERATOR(create_py_reader, reader::CreatePyReaderOp, reader::CreatePyReaderOpMaker);