未验证 提交 01fbcb0b 编写于 作者: S sneaxiy 提交者: GitHub

Merge pull request #11695 from sneaxiy/complete_py_reader_cpp

Add Python Reader Op (CPP side)
...@@ -24,6 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o ...@@ -24,6 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
reader_library(create_py_reader_op SRCS create_py_reader_op.cc)
cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
# Export local libraries to parent # Export local libraries to parent
......
...@@ -88,24 +88,29 @@ class BlockingQueue { ...@@ -88,24 +88,29 @@ class BlockingQueue {
receive_cv_.notify_all(); receive_cv_.notify_all();
} }
bool IsClosed() { bool IsClosed() const {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return closed_; return closed_;
} }
size_t Cap() { size_t Cap() const {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return capacity_; return capacity_;
} }
size_t Size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
private: private:
size_t capacity_; size_t capacity_;
bool closed_; bool closed_;
std::deque<T> queue_; std::deque<T> queue_;
std::mutex mutex_; mutable std::mutex mutex_;
std::condition_variable receive_cv_; mutable std::condition_variable receive_cv_;
std::condition_variable send_cv_; mutable std::condition_variable send_cv_;
}; };
} // namespace reader } // namespace reader
} // namespace operators } // namespace operators
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
class PyReader : public framework::ReaderBase {
public:
explicit PyReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue) {
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
queue_ = queue;
}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
bool success;
*out = queue_->Pop(&success);
if (!success) out->clear();
}
void ReInit() override {}
private:
std::shared_ptr<LoDTensorBlockingQueue> queue_;
};
class CreatePyReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
if (out->Get() != nullptr) return;
const std::string& queue_name = Input("blocking_queue");
auto* queue_holder_var = scope.FindVar(queue_name);
PADDLE_ENFORCE(
queue_holder_var != nullptr,
"No LoDTensorBlockingQueueHolder variable with name %s found",
queue_name);
auto* queue_holder =
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
out->Reset(new PyReader(queue_holder->GetQueue()));
}
};
class CreatePyReaderOpMaker : public FileReaderMakerBase {
protected:
void Apply() override {
AddInput("blocking_queue",
"Name of the `LoDTensorBlockingQueueHolder` variable");
AddComment(R"DOC(
Create PyReader to support LoDTensor data feeding in Python side.
)DOC");
}
};
} // namespace reader
} // namespace operators
} // namespace paddle
namespace reader = ::paddle::operators::reader;
REGISTER_FILE_READER_OPERATOR(create_py_reader, reader::CreatePyReaderOp,
reader::CreatePyReaderOpMaker);
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <vector>
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace operators {
namespace reader {
class LoDTensorBlockingQueueHolder;
class LoDTensorBlockingQueue {
friend class LoDTensorBlockingQueueHolder;
private:
LoDTensorBlockingQueue(size_t capacity,
const std::vector<framework::DDim>& dims)
: queue_(capacity), dims_(dims) {}
public:
bool Push(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
CheckDims(lod_tensor_vec);
return queue_.Send(lod_tensor_vec);
}
bool Push(std::vector<framework::LoDTensor>&& lod_tensor_vec) {
CheckDims(lod_tensor_vec);
return queue_.Send(std::move(lod_tensor_vec));
}
std::vector<framework::LoDTensor> Pop(bool* ok = nullptr) {
std::vector<framework::LoDTensor> lod_tensor_vec;
bool success = queue_.Receive(&lod_tensor_vec);
if (ok != nullptr) *ok = success;
return lod_tensor_vec;
}
inline size_t Cap() const { return queue_.Cap(); }
inline size_t Size() const { return queue_.Size(); }
inline void Close() { return queue_.Close(); }
inline bool IsClosed() const { return queue_.IsClosed(); }
private:
void CheckDims(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
PADDLE_ENFORCE(dims_.size() == lod_tensor_vec.size(),
"Expect input size is %d but found %s", dims_.size(),
lod_tensor_vec.size());
for (size_t i = 0; i < dims_.size(); ++i) {
const auto& in_dims = framework::slice_ddim(
lod_tensor_vec[i].dims(), 1, lod_tensor_vec[i].dims().size());
const auto& expect_dims =
framework::slice_ddim(dims_[i], 1, dims_[i].size());
PADDLE_ENFORCE(in_dims == expect_dims,
"Dims of the %d-th input tensor do not match", i);
}
}
BlockingQueue<std::vector<framework::LoDTensor>> queue_;
std::vector<framework::DDim> dims_;
};
class LoDTensorBlockingQueueHolder {
public:
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims) {
PADDLE_ENFORCE(
queue_ == nullptr,
"LoDTensorBlockingQueueHolder::InitOnce() can only be called once");
queue_.reset(new LoDTensorBlockingQueue(capacity, dims));
}
inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const {
return queue_;
}
private:
std::shared_ptr<LoDTensorBlockingQueue> queue_;
};
} // namespace reader
} // namespace operators
} // namespace paddle
...@@ -34,6 +34,7 @@ limitations under the License. */ ...@@ -34,6 +34,7 @@ limitations under the License. */
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/activation_op.h"
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -297,6 +298,37 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -297,6 +298,37 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::ReaderHolder>(m, "Reader", "") py::class_<framework::ReaderHolder>(m, "Reader", "")
.def("reset", &framework::ReaderHolder::ReInit); .def("reset", &framework::ReaderHolder::ReInit);
using LoDTensorBlockingQueue =
::paddle::operators::reader::LoDTensorBlockingQueue;
using LoDTensorBlockingQueueHolder =
::paddle::operators::reader::LoDTensorBlockingQueueHolder;
py::class_<LoDTensorBlockingQueue>(m, "LoDTensorBlockingQueue", "")
.def("push",
[](LoDTensorBlockingQueue &self,
const std::vector<framework::LoDTensor> &lod_tensor_vec) {
pybind11::gil_scoped_release release;
return self.Push(lod_tensor_vec);
})
.def("size", &LoDTensorBlockingQueue::Size)
.def("capacity", &LoDTensorBlockingQueue::Cap)
.def("close", &LoDTensorBlockingQueue::Close)
.def("is_closed", &LoDTensorBlockingQueue::IsClosed);
m.def("init_lod_tensor_blocking_queue",
[](Variable &var, size_t capacity,
const std::vector<std::vector<int64_t>> &shapes)
-> LoDTensorBlockingQueue * {
std::vector<DDim> dims(shapes.size());
std::transform(shapes.begin(), shapes.end(), dims.begin(),
[](const std::vector<int64_t> &shape) {
return make_ddim(shape);
});
auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>();
holder->InitOnce(capacity, dims);
return holder->GetQueue().get();
},
py::return_value_policy::reference);
py::class_<Scope>(m, "Scope", "") py::class_<Scope>(m, "Scope", "")
.def("var", .def("var",
[](Scope &self, const std::string &name) -> Variable * { [](Scope &self, const std::string &name) -> Variable * {
...@@ -463,9 +495,11 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -463,9 +495,11 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_DISTRIBUTE #ifdef PADDLE_WITH_DISTRIBUTE
.def("complete", &Executor::Complete) .def("complete", &Executor::Complete)
#endif #endif
.def("run", .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope,
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) & int block_id, bool create_local_scope, bool create_vars) {
Executor::Run); pybind11::gil_scoped_release release;
self.Run(prog, scope, block_id, create_local_scope, create_vars);
});
m.def("init_gflags", framework::InitGflags); m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG); m.def("init_glog", framework::InitGLOG);
...@@ -631,7 +665,12 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -631,7 +665,12 @@ All parameter, weight, gradient are variables in Paddle.
&ParallelExecutor::FeedTensorsIntoLocalScopes) &ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes", .def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run); .def("run", [](ParallelExecutor &self,
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
pybind11::gil_scoped_release release;
self.Run(fetch_tensors, fetched_var_name);
});
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
return m.ptr(); return m.ptr();
......
...@@ -146,7 +146,7 @@ void PyCPUTensorSetFromArray( ...@@ -146,7 +146,7 @@ void PyCPUTensorSetFromArray(
template <> template <>
// This following specialization maps uint16_t in the parameter type to // This following specialization maps uint16_t in the parameter type to
// platform::float16. // platform::float16.
void PyCPUTensorSetFromArray( inline void PyCPUTensorSetFromArray(
framework::Tensor *self, framework::Tensor *self,
pybind11::array_t<uint16_t, pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast> pybind11::array::c_style | pybind11::array::forcecast>
...@@ -185,7 +185,7 @@ void PyCUDATensorSetFromArray( ...@@ -185,7 +185,7 @@ void PyCUDATensorSetFromArray(
template <> template <>
// This following specialization maps uint16_t in the parameter type to // This following specialization maps uint16_t in the parameter type to
// platform::float16. // platform::float16.
void PyCUDATensorSetFromArray( inline void PyCUDATensorSetFromArray(
framework::Tensor *self, framework::Tensor *self,
pybind11::array_t<uint16_t, pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast> pybind11::array::c_style | pybind11::array::forcecast>
...@@ -224,7 +224,7 @@ void PyCUDAPinnedTensorSetFromArray( ...@@ -224,7 +224,7 @@ void PyCUDAPinnedTensorSetFromArray(
template <> template <>
// This following specialization maps uint16_t in the parameter type to // This following specialization maps uint16_t in the parameter type to
// platform::float16. // platform::float16.
void PyCUDAPinnedTensorSetFromArray( inline void PyCUDAPinnedTensorSetFromArray(
framework::Tensor *self, framework::Tensor *self,
pybind11::array_t<uint16_t, pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast> pybind11::array::c_style | pybind11::array::forcecast>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册