diff --git a/paddle/fluid/operators/reader/py_array_feed_queue.h b/paddle/fluid/operators/reader/py_array_feed_queue.h deleted file mode 100644 index f9552f73a66e4d75b9ae60c23efe5d4cb93fa62b..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/py_array_feed_queue.h +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include //NOLINT -#include -#include // NOLINT -#include -#include "glog/logging.h" -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/operators/reader/py_blocking_queue.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" -#include "paddle/fluid/pybind/tensor_py.h" - -namespace paddle { -namespace operators { -namespace reader { - -using PyTuple = ::pybind11::tuple; -using PyArray = ::pybind11::array; - -template -using PyArrayT = ::pybind11::array_t; - -class PyArrayToTensorVisitor : public boost::static_visitor { - public: -#define PY_ARRAY_TO_TENSOR_WITH_TYPE(dtype, func_name) \ - pybind::func_name(tensor_, static_cast&>(py_array_), \ - place) - -#define PY_ARRAY_TO_TENSOR(func_name) \ - if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(size_t, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(int64_t, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(int32_t, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(int16_t, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(uint8_t, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(float, func_name); \ - } else if (IsType()) { \ - PY_ARRAY_TO_TENSOR_WITH_TYPE(double, func_name); \ - } else { \ - PADDLE_THROW("unsupported dtype of python array"); \ - } - - PyArrayToTensorVisitor(const PyArray& py_array, framework::Tensor* tensor) - : py_array_(py_array), tensor_(tensor) {} - - void operator()(const platform::CPUPlace& place) { - PY_ARRAY_TO_TENSOR(PyCPUTensorSetFromArray); - } - - void operator()(const platform::CUDAPlace& place) { -#ifdef PADDLE_WITH_CUDA - PY_ARRAY_TO_TENSOR(PyCUDATensorSetFromArray); -#else - PADDLE_THROW("CUDAPlace is not supported in CPU only version"); -#endif - } - - void operator()(const platform::CUDAPinnedPlace& place) { -#ifdef PADDLE_WITH_CUDA - PY_ARRAY_TO_TENSOR(PyCUDAPinnedTensorSetFromArray); -#else - PADDLE_THROW("CUDAPinnedPlace is not supported in CPU only version"); -#endif - } - -#undef PY_ARRAY_TO_TENSOR -#undef PY_ARRAY_TO_TENSOR_WITH_TYPE - - private: - template - inline bool IsType() const { - return ::pybind11::isinstance>(py_array_); - } - - private: - const PyArray& py_array_; - framework::Tensor* tensor_; -}; - -class PyArrayFeedQueueHolder; - -// PyArrayFeedQueue must be thread-safe -class PyArrayFeedQueue { - friend class PyArrayFeedQueueHolder; - - private: - PyArrayFeedQueue(size_t capacity, const std::vector& dims, - const platform::Place& place) - : dims_(dims), place_(place) { - queue_.reset( - new PyBlockingQueue>(capacity)); - } - - public: - ~PyArrayFeedQueue() { Close(); } - - bool Enqueue(const std::vector& py_array_vec) { - auto lod_tensor_vec = PyArrayVecToLoDTensorVec(py_array_vec); - VLOG(5) << "Enqueue at address " << reinterpret_cast(this); - return queue_->Send(std::move(lod_tensor_vec)); - } - - bool Enqueue(const std::vector& tensor_vec) { - VLOG(5) << "Enqueue at address " << reinterpret_cast(this); - return queue_->Send(tensor_vec); - } - - std::vector Dequeue() { - VLOG(5) << "Dequeue at address " << reinterpret_cast(this); - std::vector ret; - return queue_->Receive(&ret) ? ret : std::vector(); - } - - inline size_t Size() const { return queue_->Size(); } - - inline size_t Cap() const { return queue_->Cap(); } - - inline bool IsClosed() const { return queue_->IsClosed(); } - - inline void Close() { queue_->Close(); } - - private: - std::vector PyArrayVecToLoDTensorVec( - const std::vector& py_array_vec) { - PADDLE_ENFORCE(dims_.size() == py_array_vec.size(), - "expected input tensor number %d but found %d", dims_.size(), - py_array_vec.size()); - - size_t i = 0; - if (py_array_vec.size() > 1) { - size_t dim0 = py_array_vec[0].shape()[0]; - for (size_t j = 1; j < py_array_vec.size(); ++j) { - PADDLE_ENFORCE(dim0 == py_array_vec[j].shape()[0], - "0-dim of the %d-th input tensor is %d, but 0-dim of " - "the 0-th input tensor is %d", - j, py_array_vec[j].shape()[0], dim0); - } - } - - std::vector lod_tensor_vec; - lod_tensor_vec.reserve(py_array_vec.size()); - - std::for_each( - py_array_vec.begin(), py_array_vec.end(), [&](const PyArray& py_array) { - for (int64_t j = 1; j < dims_[i].size(); ++j) { - PADDLE_ENFORCE( - dims_[i][j] == static_cast(py_array.shape()[j]), - "expected %d-dim of %d-th input tensor is %d but found %d", j, - i, dims_[i][j], py_array.shape()[j]); - } - - lod_tensor_vec.emplace_back(framework::LoDTensor()); - PyArrayToTensorVisitor visitor(py_array, &(lod_tensor_vec.back())); - boost::apply_visitor(visitor, place_); - ++i; - }); - return lod_tensor_vec; - } - - std::unique_ptr>> queue_; - std::vector dims_; - platform::Place place_; -}; - -class PyArrayFeedQueueHolder { - public: - PyArrayFeedQueueHolder() {} - - void InitOnce(size_t capacity, const std::vector& dims, - const platform::Place& place) { - PADDLE_ENFORCE( - feeder_ == nullptr, - "PyArrayFeedQueueHolder::InitOnce() can only be called once"); - feeder_.reset(new PyArrayFeedQueue(capacity, dims, place)); - } - - std::shared_ptr GetFeeder() { return feeder_; } - const std::shared_ptr& GetFeeder() const { return feeder_; } - - private: - std::shared_ptr feeder_; -}; - -} // namespace reader -} // namespace operators -} // namespace paddle