From d4d946db5a58d1439d71eb17e69fc79a1d869b32 Mon Sep 17 00:00:00 2001 From: sneaxiy Date: Mon, 25 Jun 2018 11:18:22 +0000 Subject: [PATCH] update blocking queue --- .../operators/reader/create_py_reader_op.cc | 9 +++-- .../reader/lod_tensor_blocking_queue.h | 34 ++++++++----------- paddle/fluid/pybind/pybind.cc | 15 ++++---- 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index aac81d1813..36587360f7 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -28,7 +28,7 @@ class PyReader : public framework::ReaderBase { void ReadNext(std::vector* out) override { bool success; - *out = queue_->Dequeue(&success); + *out = queue_->Pop(&success); if (!success) out->clear(); } @@ -45,6 +45,10 @@ class CreatePyReaderOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { + auto* out = scope.FindVar(Output("Out")) + ->template GetMutable(); + if (out->Get() != nullptr) return; + const std::string& queue_name = Input("blocking_queue"); auto* queue_holder_var = scope.FindVar(queue_name); PADDLE_ENFORCE( @@ -53,8 +57,7 @@ class CreatePyReaderOp : public framework::OperatorBase { queue_name); auto* queue_holder = queue_holder_var->template GetMutable(); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); + out->Reset(new PyReader(queue_holder->GetQueue())); } }; diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index a2129f6af4..30d962ba10 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -34,36 +34,33 @@ class LoDTensorBlockingQueue { private: LoDTensorBlockingQueue(size_t capacity, const std::vector& dims) - : dims_(dims) { - queue_.reset( - new BlockingQueue>(capacity)); - } + : queue_(capacity), dims_(dims) {} public: - bool Enqueue(const std::vector& lod_tensor_vec) { + bool Push(const std::vector& lod_tensor_vec) { CheckDims(lod_tensor_vec); - return queue_->Send(lod_tensor_vec); + return queue_.Send(lod_tensor_vec); } - bool Enqueue(std::vector&& lod_tensor_vec) { + bool Push(std::vector&& lod_tensor_vec) { CheckDims(lod_tensor_vec); - return queue_->Send(std::move(lod_tensor_vec)); + return queue_.Send(std::move(lod_tensor_vec)); } - std::vector Dequeue(bool* ok = nullptr) { + std::vector Pop(bool* ok = nullptr) { std::vector lod_tensor_vec; - bool success = queue_->Receive(&lod_tensor_vec); + bool success = queue_.Receive(&lod_tensor_vec); if (ok != nullptr) *ok = success; return lod_tensor_vec; } - inline size_t Cap() const { return queue_->Cap(); } + inline size_t Cap() const { return queue_.Cap(); } - inline size_t Size() const { return queue_->Size(); } + inline size_t Size() const { return queue_.Size(); } - inline void Close() { return queue_->Close(); } + inline void Close() { return queue_.Close(); } - inline bool IsClosed() const { return queue_->IsClosed(); } + inline bool IsClosed() const { return queue_.IsClosed(); } private: void CheckDims(const std::vector& lod_tensor_vec) { @@ -71,15 +68,16 @@ class LoDTensorBlockingQueue { "Expect input size is %d but found %s", dims_.size(), lod_tensor_vec.size()); for (size_t i = 0; i < dims_.size(); ++i) { - const auto& in_dims = lod_tensor_vec[i].dims(); + const auto& in_dims = framework::slice_ddim( + lod_tensor_vec[i].dims(), 1, lod_tensor_vec[i].dims().size()); const auto& expect_dims = framework::slice_ddim(dims_[i], 1, dims_[i].size()); PADDLE_ENFORCE(in_dims == expect_dims, - "Dims of the %d-th input tensor does not match", i); + "Dims of the %d-th input tensor do not match", i); } } - std::unique_ptr>> queue_; + BlockingQueue> queue_; std::vector dims_; }; @@ -92,8 +90,6 @@ class LoDTensorBlockingQueueHolder { queue_.reset(new LoDTensorBlockingQueue(capacity, dims)); } - inline std::shared_ptr GetQueue() { return queue_; } - inline const std::shared_ptr& GetQueue() const { return queue_; } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 6963a0c101..36d0809968 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -303,19 +303,16 @@ All parameter, weight, gradient are variables in Paddle. using LoDTensorBlockingQueueHolder = ::paddle::operators::reader::LoDTensorBlockingQueueHolder; py::class_(m, "LoDTensorBlockingQueue", "") - .def("enqueue", + .def("push", [](LoDTensorBlockingQueue &self, const std::vector &lod_tensor_vec) { pybind11::gil_scoped_release release; - return self.Enqueue(lod_tensor_vec); + return self.Push(lod_tensor_vec); }) - .def("size", - [](const LoDTensorBlockingQueue &self) { return self.Size(); }) - .def("capacity", - [](const LoDTensorBlockingQueue &self) { return self.Cap(); }) - .def("close", [](LoDTensorBlockingQueue &self) { return self.Close(); }) - .def("is_closed", - [](const LoDTensorBlockingQueue &self) { return self.IsClosed(); }); + .def("size", &LoDTensorBlockingQueue::Size) + .def("capacity", &LoDTensorBlockingQueue::Cap) + .def("close", &LoDTensorBlockingQueue::Close) + .def("is_closed", &LoDTensorBlockingQueue::IsClosed); m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity, -- GitLab