diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 5f4f31abea48912707743b0ebcaa389f27e9c5a5..c662ae8d85d280114e9bc1c19544789bd6508675 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -64,7 +64,7 @@ cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope d cc_library(eager_deletion_op_handle SRCS eager_deletion_op_handle.cc DEPS lod_tensor selected_rows reference_count_pass_helper) -set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass reference_count_pass eager_deletion_pass buffer_shared_inplace_op_pass buffer_shared_cross_op_memory_reuse_pass) +set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass reference_count_pass eager_deletion_pass buffer_shared_inplace_op_pass buffer_shared_cross_op_memory_reuse_pass set_reader_device_count_pass) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS}) cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index ca6871cc3ee35de62878265b291dcd8e6c181573..d7e066357f2cb99256db223350b4134b2bbc697d 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -66,6 +66,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { AppendPrintGraphPass("graph_viz_pass", "_fused_graph"); AppendMultiDevPass(); + AppendSetReaderDeviceCountPass(); AppendMultiGraphOptPasses(); AppendPassToSetMkldnnAttr("mkldnn_placement_pass"); @@ -221,6 +222,10 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { &strategy_); } + void AppendSetReaderDeviceCountPass() { + AppendPass("set_reader_device_count_pass"); + } + void AppendPrintGraphPass(const std::string &pass_name, const std::string &debug_file_suffix) { if (!strategy_.debug_graphviz_path_.empty()) { @@ -385,6 +390,8 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, "GPU, skipped."; continue; } + } else if (pass->Type() == "set_reader_device_count_pass") { + pass->SetNotOwned>(kPlaces, &places); } VLOG(1) << "Start Apply Pass " << pass->Type(); graph = pass->Apply(graph); @@ -421,6 +428,7 @@ USE_PASS(fuse_sgd_op_pass); USE_PASS(fuse_momentum_op_pass); USE_PASS(fuse_all_reduce_op_pass); USE_PASS(runtime_context_cache_pass); +USE_PASS(set_reader_device_count_pass); #ifdef PADDLE_WITH_MKLDNN USE_PASS(mkldnn_placement_pass); #endif diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt index 4cdb6a7d30882d095a2666ccc45ed7716954c37c..85a7f44c4807221b4489db3ddb923425ee405332 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt @@ -11,6 +11,7 @@ endif() cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle fetch_barrier_op_handle ${ALL_REDUCE_OP_HANDLES} reduce_op_handle broadcast_op_handle fused_broadcast_op_handle) cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) +cc_library(set_reader_device_count_pass SRCS set_reader_device_count_pass.cc DEPS graph graph_helper pass multi_devices_graph_pass) cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass) diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_count_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_count_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..ecb40171ee574294720193cf08ee4409f50c17c8 --- /dev/null +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_count_pass.cc @@ -0,0 +1,78 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/computation_op_handle.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/pass.h" + +namespace paddle { +namespace framework { +namespace ir { + +class SetReaderDeviceCountPass : public Pass { + protected: + void ApplyImpl(Graph *graph) const override; + + private: + int GetDeviceCount() const; + + std::unordered_set ReaderOpSet() const; +}; + +int SetReaderDeviceCountPass::GetDeviceCount() const { + return static_cast( + Get>(details::kPlaces).size()); +} + +std::unordered_set SetReaderDeviceCountPass::ReaderOpSet() const { + return {"create_py_reader"}; +} + +void SetReaderDeviceCountPass::ApplyImpl(Graph *graph) const { + auto dev_cnt = GetDeviceCount(); + auto reader_ops = ReaderOpSet(); + size_t found_op_num = 0; + + for (auto &node : graph->Nodes()) { + if (node->IsOp() && node->Op() && + reader_ops.count(node->Op()->Type()) != 0) { + auto &op_handle = dynamic_cast( + node->Wrapper()); + auto *op_desc = node->Op(); + auto &op_base_attrs = + const_cast(op_handle.GetOp()->Attrs()); + int dev_idx = static_cast(op_handle.GetScopeIdx()); + + op_desc->SetAttr("device_index", dev_idx); + op_desc->SetAttr("device_count", dev_cnt); + + op_base_attrs["device_index"] = dev_idx; + op_base_attrs["device_count"] = dev_cnt; + + ++found_op_num; + VLOG(10) << "Found op " << op_desc->Type() << " on device " << dev_idx; + } + } + + VLOG(10) << "Found op number " << found_op_num; +} + +} // namespace ir +} // namespace framework +} // namespace paddle + +REGISTER_PASS(set_reader_device_count_pass, + paddle::framework::ir::SetReaderDeviceCountPass) + .RequirePassAttr(paddle::framework::details::kPlaces); diff --git a/paddle/fluid/framework/var_type_traits.h b/paddle/fluid/framework/var_type_traits.h index 4ab01b9068e546022ec53325686b2349cd45f482..15b0f16b70d12489a4a94e4a1b451f86a9d495f0 100644 --- a/paddle/fluid/framework/var_type_traits.h +++ b/paddle/fluid/framework/var_type_traits.h @@ -56,6 +56,7 @@ class CudnnRNNCache; namespace reader { class LoDTensorBlockingQueueHolder; +class OrderedMultiDeviceLoDTensorBlockingQueueHolder; } // namespace reader } // namespace operators @@ -139,6 +140,7 @@ using VarTypeRegistry = detail::VarTypeRegistryImpl< Tensor, LoDTensor, SelectedRows, std::vector, LoDRankTable, LoDTensorArray, platform::PlaceList, ReaderHolder, std::string, Scope *, operators::reader::LoDTensorBlockingQueueHolder, + operators::reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder, #ifdef PADDLE_WITH_CUDA #ifndef _WIN32 ncclUniqueId, platform::Communicator, platform::NCCLCommunicator, diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 4dbee98956fcba3ecfe34d546749d0bea883c7a4..f1a7110ba4b2650e83f943e8ad7aa81f913a0853 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -38,8 +38,21 @@ class CreatePyReaderOp : public framework::OperatorBase { queue_holder_var, "No LoDTensorBlockingQueueHolder variable with name %s found", queue_name); - auto* queue_holder = - queue_holder_var->template GetMutable(); + std::shared_ptr queue; + std::shared_ptr ordered_queue; + if (queue_holder_var->IsType()) { + queue = queue_holder_var->Get().GetQueue(); + } else if (queue_holder_var + ->IsType()) { + auto* queue_holder = + queue_holder_var + ->GetMutable(); + auto dev_cnt = Attr("device_count"); + auto dev_idx = static_cast(Attr("device_index")); + ordered_queue = queue_holder->GetQueue(); + ordered_queue->InitOnce(dev_cnt); + queue = ordered_queue->GetQueue(dev_idx); + } /* Coverting shape_concat and ranks into DDim of each data. shape_concat and ranks are shapes and shape ranks of each data.E.g. @@ -71,8 +84,20 @@ class CreatePyReaderOp : public framework::OperatorBase { for (size_t i = 0; i < need_check_feed_int.size(); ++i) { need_check_feed.push_back(static_cast(need_check_feed_int[i])); } - out->Reset(std::make_shared(queue_holder->GetQueue(), dims, - var_types, need_check_feed)); + auto py_reader = + std::make_shared(queue, dims, var_types, need_check_feed); + if (ordered_queue) { + ordered_queue->AddResetMethod([py_reader] { + auto end_readers = py_reader->GetEndPoints(); + for (auto* reader : end_readers) { + reader->Shutdown(); + } + for (auto* reader : end_readers) { + reader->Start(); + } + }); + } + out->Reset(py_reader); } }; @@ -82,6 +107,12 @@ class CreatePyReaderOpMaker : public FileReaderMakerBase { AddInput("blocking_queue", "Name of the `LoDTensorBlockingQueueHolder` variable"); + AddAttr("device_index", "The device index this reader offers data") + .SetDefault(0); + AddAttr("device_count", + "The total number of devices the reader offers data") + .SetDefault(1); + AddComment(R"DOC( Create PyReader to support LoDTensor data feeding in Python side. )DOC"); diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index cd295552cb431b68489f5e1f06b5d8c3d826c121..e218fc7a0991c9bdbb58c5b75286631ad8d1e222 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -27,16 +27,11 @@ namespace paddle { namespace operators { namespace reader { -class LoDTensorBlockingQueueHolder; - class LoDTensorBlockingQueue { - friend class LoDTensorBlockingQueueHolder; - - private: + public: explicit LoDTensorBlockingQueue(size_t capacity, bool speed_test_mode = false) : queue_(capacity, speed_test_mode) {} - public: bool Push(const std::vector& lod_tensor_vec) { return queue_.Send(lod_tensor_vec); } @@ -67,10 +62,145 @@ class LoDTensorBlockingQueue { inline void Kill() { queue_.Kill(); } + inline bool WaitForInited() { return true; } + private: BlockingQueue> queue_; }; +class OrderedMultiDeviceLoDTensorBlockingQueue { + public: + OrderedMultiDeviceLoDTensorBlockingQueue(size_t capacity, + bool speed_test_mode = false) + : capacity_(capacity), speed_test_mode_(speed_test_mode) {} + + inline bool WaitForInited() { + std::unique_lock lock(init_mutex_); + cv_.wait(lock, [this] { return queues_ != nullptr || is_closing_; }); + is_closing_ = false; + return queues_ != nullptr; + } + + inline void InitOnce(size_t dev_cnt) { + PADDLE_ENFORCE_GE(dev_cnt, 1, platform::errors::InvalidArgument( + "Device count to init " + "OrderedMultiDeviceLoDTensorBlockingQueue" + " must be larger than 1")); + VLOG(3) << "Ordered queue init start"; + { + std::lock_guard lock(init_mutex_); + if (queues_) { + PADDLE_ENFORCE_EQ(queues_->size(), dev_cnt, + platform::errors::InvalidArgument( + "Device count to init queue must be equal")); + } else { + queues_.reset( + new std::vector>(dev_cnt)); + for (auto& item : *queues_) { + auto cap = (capacity_ + dev_cnt - 1) / dev_cnt; + item.reset(new LoDTensorBlockingQueue(cap, speed_test_mode_)); + } + } + } + VLOG(3) << "Ordered queue init finish"; + cv_.notify_all(); + } + + const std::shared_ptr& GetQueue(size_t idx) const { + std::lock_guard lock(init_mutex_); + PADDLE_ENFORCE_NOT_NULL(queues_, + platform::errors::NotFound( + "Queues must be inited first before getting")); + PADDLE_ENFORCE_LT( + idx, queues_->size(), + platform::errors::OutOfRange("The queue index is out of range")); + return (*queues_)[idx]; + } + + bool Push(const std::vector& lod_tensor_vec) { + return CurQueue()->Push(lod_tensor_vec); + } + + bool Push(std::vector&& lod_tensor_vec) { + return CurQueue()->Push(std::move(lod_tensor_vec)); + } + + inline size_t Cap() const { return capacity_; } + + inline size_t Size() const { + size_t size = 0; + if (queues_) { + for (auto& item : *queues_) { + size += item->Size(); + } + } + return size; + } + + inline void ReOpen() { + if (queues_) { + for (auto& item : *queues_) { + item->ReOpen(); + } + } + data_index_ = 0; + } + + inline void Close() { + { + std::lock_guard lock(init_mutex_); + if (queues_ == nullptr) { + is_closing_ = true; + } + } + cv_.notify_all(); + if (queues_) { + for (auto& item : *queues_) { + item->Close(); + } + } + } + + inline void Kill() { + if (queues_) { + for (auto& item : *queues_) { + item->Kill(); + } + } + } + + inline void Reset() { + std::lock_guard reset_lock(reset_mutex_); + for (auto& method : reset_methods_) { + method(); + } + data_index_ = 0; + } + + inline void AddResetMethod(const std::function& reset_method) { + std::lock_guard reset_lock(reset_mutex_); + reset_methods_.emplace_back(reset_method); + } + + private: + const std::shared_ptr& CurQueue() { + return (*queues_)[data_index_.fetch_add(1) % queues_->size()]; + } + + private: + std::unique_ptr>> queues_; + mutable std::atomic data_index_{0}; + const size_t capacity_; + const bool speed_test_mode_; + + std::vector> reset_methods_; + mutable std::mutex reset_mutex_; + + bool is_closing_{false}; + mutable std::mutex init_mutex_; + mutable std::condition_variable cv_; +}; + class LoDTensorBlockingQueueHolder { public: void InitOnce(size_t capacity, bool speed_test_mode = false) { @@ -88,6 +218,26 @@ class LoDTensorBlockingQueueHolder { std::shared_ptr queue_; }; +class OrderedMultiDeviceLoDTensorBlockingQueueHolder { + public: + void InitOnce(size_t capacity, bool speed_test_mode = false) { + PADDLE_ENFORCE_EQ(queue_, nullptr, + platform::errors::AlreadyExists( + "OrderedMultiDeviceLoDTensorBlockingQueueHolder::" + "InitOnce() can only be called once")); + queue_.reset(new OrderedMultiDeviceLoDTensorBlockingQueue(capacity, + speed_test_mode)); + } + + inline const std::shared_ptr& + GetQueue() const { + return queue_; + } + + private: + std::shared_ptr queue_; +}; + } // namespace reader } // namespace operators } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 837088b4f8cb7067dfebcb496c7f1402182ee255..07eb3ee44decff599a1efe7d55ef4261743c77d8 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -48,7 +48,6 @@ limitations under the License. */ #include "paddle/fluid/memory/allocation/allocator_strategy.h" #include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/py_func_op.h" -#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/cpu_info.h" #include "paddle/fluid/platform/dynload/dynamic_loader.h" @@ -91,9 +90,6 @@ limitations under the License. */ #include "pybind11/stl.h" -DEFINE_bool(reader_queue_speed_test_mode, false, - "If set true, the queue.pop will only get data from queue but not " - "remove the data from queue for speed testing"); DECLARE_bool(use_mkldnn); #ifdef PADDLE_WITH_NGRAPH DECLARE_bool(use_ngraph); @@ -942,35 +938,6 @@ All parameter, weight, gradient are variables in Paddle. BindReader(&m); - using LoDTensorBlockingQueue = - ::paddle::operators::reader::LoDTensorBlockingQueue; - using LoDTensorBlockingQueueHolder = - ::paddle::operators::reader::LoDTensorBlockingQueueHolder; - - py::class_>( - m, "LoDTensorBlockingQueue", "") - .def("push", - [](LoDTensorBlockingQueue &self, - const std::vector &lod_tensor_vec) { - pybind11::gil_scoped_release release; - return self.Push(lod_tensor_vec); - }) - .def("size", &LoDTensorBlockingQueue::Size) - .def("capacity", &LoDTensorBlockingQueue::Cap) - .def("close", &LoDTensorBlockingQueue::Close) - .def("kill", &LoDTensorBlockingQueue::Kill) - .def("is_closed", &LoDTensorBlockingQueue::IsClosed); - - m.def("init_lod_tensor_blocking_queue", - [](Variable &var, - size_t capacity) -> std::shared_ptr { - VLOG(1) << "init_lod_tensor_blocking_queue"; - auto *holder = var.GetMutable(); - holder->InitOnce(capacity, FLAGS_reader_queue_speed_test_mode); - return holder->GetQueue(); - }, - py::return_value_policy::copy); - py::class_(m, "_Scope", R"DOC( Scope is an association of a name to Variable. All variables belong to Scope. diff --git a/paddle/fluid/pybind/reader_py.cc b/paddle/fluid/pybind/reader_py.cc index 2d39af2faa5679fb2b1a7f76ce1119ac5784cd0e..db2e0f9a98495597587a9a1698ebc1bd177bd193 100644 --- a/paddle/fluid/pybind/reader_py.cc +++ b/paddle/fluid/pybind/reader_py.cc @@ -20,20 +20,40 @@ #include #include #include "Python.h" +#include "gflags/gflags.h" #include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/imperative/tracer.h" #include "paddle/fluid/operators/reader/buffered_reader.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/operators/reader/py_reader.h" #include "paddle/fluid/platform/place.h" #include "pybind11/stl.h" +DEFINE_bool(reader_queue_speed_test_mode, false, + "If set true, the queue.pop will only get data from queue but not " + "remove the data from queue for speed testing"); + namespace paddle { namespace pybind { namespace py = pybind11; +namespace reader = operators::reader; + +static const std::shared_ptr &GetQueue( + const std::shared_ptr &queue, size_t idx) { + return queue; +} + +static const std::shared_ptr &GetQueue( + const std::shared_ptr + &queue, + size_t idx) { + return queue->GetQueue(idx); +} +template class MultiDeviceFeedReader { public: using ResultDictList = @@ -41,7 +61,7 @@ class MultiDeviceFeedReader { using ResultList = std::vector>; MultiDeviceFeedReader( - const std::shared_ptr &queue, + const std::shared_ptr &queue, const std::vector &names, const std::vector> &shapes, const std::vector &dtypes, @@ -54,12 +74,25 @@ class MultiDeviceFeedReader { for (auto &shape : shapes) { dims.push_back(framework::make_ddim(shape)); } - std::shared_ptr reader( - new operators::reader::PyReader(queue, dims, dtypes, need_check_feed)); + + auto first_reader = std::make_shared( + GetQueue(queue, 0), dims, dtypes, need_check_feed); + + auto create_or_get_reader = [&](size_t idx) { + if (idx == 0 || + std::is_same::value) { + return first_reader; + } else { + return std::make_shared(GetQueue(queue, idx), dims, + dtypes, need_check_feed); + } + }; readers_.reserve(dst_places.size()); - for (auto &p : dst_places) { + for (size_t i = 0; i < dst_places.size(); ++i) { + auto &p = dst_places[i]; auto *holder = new framework::ReaderHolder(); + auto reader = create_or_get_reader(i); if (use_double_buffer) { holder->Reset( framework::MakeDecoratedReader( @@ -183,7 +216,7 @@ class MultiDeviceFeedReader { PADDLE_ENFORCE_EQ(status, Status::kSuccess); } - std::shared_ptr queue_; + std::shared_ptr queue_; std::vector names_; std::unique_ptr<::ThreadPool> pool_; @@ -195,22 +228,18 @@ class MultiDeviceFeedReader { std::vector> ret_; }; -void BindReader(py::module *module) { +template +void BindMultiDeviceReader(py::module *module, const char *reader_name) { auto &m = *module; - namespace reader = ::paddle::operators::reader; - - py::class_(m, "Reader", "") - .def("start", &framework::ReaderHolder::Start) - .def("reset", &framework::ReaderHolder::ResetAll); - - py::class_(m, "MultiDeviceFeedReader", "") - .def("read_next", &MultiDeviceFeedReader::ReadNext, + using ReaderType = MultiDeviceFeedReader; + py::class_(m, reader_name, "") + .def("read_next", &ReaderType::ReadNext, py::call_guard()) - .def("read_next_list", &MultiDeviceFeedReader::ReadNextList, + .def("read_next_list", &ReaderType::ReadNextList, py::call_guard()) .def("read_next_var_list", - [](MultiDeviceFeedReader &self) { + [](ReaderType &self) { auto result_list = self.ReadNextList(); auto &tensor_list = result_list[0]; std::vector> var_list; @@ -234,23 +263,105 @@ void BindReader(py::module *module) { return var_list; }, py::call_guard()) - .def("reset", &MultiDeviceFeedReader::Reset, + .def("reset", &ReaderType::Reset, py::call_guard()); +} + +void BindReader(py::module *module) { + auto &m = *module; + + m.def("init_lod_tensor_blocking_queue", + [](framework::Variable &var, size_t capacity, + bool is_ordered) -> py::object { + VLOG(1) << "init_lod_tensor_blocking_queue"; + if (is_ordered) { + auto *holder = var.GetMutable< + reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder>(); + holder->InitOnce(capacity, FLAGS_reader_queue_speed_test_mode); + return py::cast(holder->GetQueue()); + } else { + auto *holder = + var.GetMutable(); + holder->InitOnce(capacity, FLAGS_reader_queue_speed_test_mode); + return py::cast(holder->GetQueue()); + } + }, + py::return_value_policy::copy); + + py::class_(m, "Reader", "") + .def("start", &framework::ReaderHolder::Start) + .def("reset", &framework::ReaderHolder::ResetAll); + + py::class_>( + m, "LoDTensorBlockingQueue", "") + .def("push", + [](reader::LoDTensorBlockingQueue &self, + const std::vector &lod_tensor_vec) { + return self.Push(lod_tensor_vec); + }, + py::call_guard()) + .def("size", &reader::LoDTensorBlockingQueue::Size) + .def("capacity", &reader::LoDTensorBlockingQueue::Cap) + .def("close", &reader::LoDTensorBlockingQueue::Close) + .def("kill", &reader::LoDTensorBlockingQueue::Kill) + .def("wait_for_inited", &reader::LoDTensorBlockingQueue::WaitForInited, + py::call_guard()); + + py::class_>( + m, "OrderedMultiDeviceLoDTensorBlockingQueue", "") + .def("push", + [](reader::OrderedMultiDeviceLoDTensorBlockingQueue &self, + const std::vector &lod_tensor_vec) { + return self.Push(lod_tensor_vec); + }, + py::call_guard()) + .def("size", &reader::OrderedMultiDeviceLoDTensorBlockingQueue::Size) + .def("capacity", &reader::OrderedMultiDeviceLoDTensorBlockingQueue::Cap) + .def("close", &reader::OrderedMultiDeviceLoDTensorBlockingQueue::Close) + .def("kill", &reader::OrderedMultiDeviceLoDTensorBlockingQueue::Kill) + .def("wait_for_inited", + &reader::OrderedMultiDeviceLoDTensorBlockingQueue::WaitForInited, + py::call_guard()) + .def("reset", &reader::OrderedMultiDeviceLoDTensorBlockingQueue::Reset); + + BindMultiDeviceReader( + module, "MultiDeviceFeedReader"); + BindMultiDeviceReader( + module, "OrderedMultiDeviceFeedReader"); m.def("create_py_reader", - [](const std::shared_ptr - &queue, + [](const std::shared_ptr &queue, const std::vector &names, const std::vector> &shapes, const std::vector &dtypes, const std::vector &need_check_feed, const std::vector &dst_places, bool use_double_buffer) { - return new MultiDeviceFeedReader(queue, names, shapes, dtypes, - need_check_feed, dst_places, - use_double_buffer); + return new MultiDeviceFeedReader( + queue, names, shapes, dtypes, need_check_feed, dst_places, + use_double_buffer); }, py::return_value_policy::take_ownership); + + m.def( + "create_py_reader", + [](const std::shared_ptr + &queue, + const std::vector &names, + const std::vector> &shapes, + const std::vector &dtypes, + const std::vector &need_check_feed, + const std::vector &dst_places, + bool use_double_buffer) { + queue->InitOnce(dst_places.size()); + return new MultiDeviceFeedReader< + reader::OrderedMultiDeviceLoDTensorBlockingQueue>( + queue, names, shapes, dtypes, need_check_feed, dst_places, + use_double_buffer); + }, + py::return_value_policy::take_ownership); } } // namespace pybind diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 7c6e5aa1859e762cbada4392b24545ebcacb6c4e..9120a5a14e0c5369d05c1a31f939165ef645d63b 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -429,7 +429,7 @@ def _py_reader(capacity, double_buffer_name = "_".join([name, "double_buffer"]) var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, False) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=reader_name) diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 6e81e2bf83e9b0782be83435031b77648264b6de..44fdad8110609dc9e7a9169f0767ec0f1b6e6627 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -87,7 +87,8 @@ class DataLoader(object): use_double_buffer=True, iterable=True, return_list=False, - use_multiprocess=False): + use_multiprocess=False, + keep_order=False): """ Create a DataLoader object for loading data from Python generator. Data would be prefetched using Python thread and be pushed @@ -133,6 +134,15 @@ class DataLoader(object): can be used in the dygraph mode. In the static graph mode, whether this parameter is set or not has no effect. The Default value is False. + keep_order (bool): whether to assign the data to CPU cores or GPU + cards in order. Supposing that there are 2 batches and we use + 2 GPU cards to run the network. If keep_order=True, GPU 0 would + get batch 0 and GPU 1 would get batch 1 exactly. If + keep_order=False, GPU 0 may get batch 0 or may get batch 1, and + GPU 1 may get the rest of the data, which is uncertain. If + keep_order=True, the framework may do some synchronization to + keep the reading order, which may be slower. The default value + is False. Returns: loader (DataLoader): the created DataLoader object. @@ -271,12 +281,15 @@ class DataLoader(object): assert relu.shape == [BATCH_SIZE, 784] """ if in_dygraph_mode(): + # Dygraph only support multiprocess training when using multi GPUs. + # So in each process, we only use 1 GPU card to train the network, + # so `keep_order` would also be True. return DygraphGeneratorLoader(feed_list, capacity, use_double_buffer, iterable, return_list, use_multiprocess) else: return GeneratorLoader(feed_list, capacity, use_double_buffer, - iterable, return_list) + iterable, return_list, keep_order) @staticmethod def from_dataset(dataset, places, drop_last=True): @@ -334,6 +347,7 @@ class DygraphGeneratorLoader(DataLoaderBase): self._batch_reader = None self._places = None self._feed_list = feed_list + self._keep_order = True if not capacity: raise ValueError("Please give value to capacity.") @@ -406,7 +420,7 @@ class DygraphGeneratorLoader(DataLoaderBase): self._dtypes = [] self._need_check_feed = [] self._blocking_queue = core.init_lod_tensor_blocking_queue( - core.Variable(), self._capacity) + core.Variable(), self._capacity, self._keep_order) self._reader = core.create_py_reader( self.queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_double_buffer) @@ -614,7 +628,8 @@ class GeneratorLoader(DataLoaderBase): capacity=None, use_double_buffer=True, iterable=True, - return_list=False): + return_list=False, + keep_order=False): self._tensor_reader = None self._places = None self._thread = None @@ -628,6 +643,7 @@ class GeneratorLoader(DataLoaderBase): raise Exception("Feed list must be given under static mode.") self._use_double_buffer = use_double_buffer self._capacity = capacity + self._keep_order = keep_order if not self._iterable: self._init_non_iterable() @@ -647,8 +663,8 @@ class GeneratorLoader(DataLoaderBase): self._need_check_feed = [ v.desc.need_check_feed() for v in self._feed_list ] - self._queue = core.init_lod_tensor_blocking_queue(core.Variable(), - self._capacity) + self._queue = core.init_lod_tensor_blocking_queue( + core.Variable(), self._capacity, self._keep_order) self._reader = core.create_py_reader( self.queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_double_buffer) @@ -675,16 +691,21 @@ class GeneratorLoader(DataLoaderBase): double_buffer_name = data_loader_unique_name_generator('double_buffer') var = global_scope().var(queue_name) - self._queue = core.init_lod_tensor_blocking_queue(var, self._capacity) + self._queue = core.init_lod_tensor_blocking_queue(var, self._capacity, + self._keep_order) + + if self._keep_order: + block = default_main_program().current_block() + else: + block = default_startup_program().current_block() - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=reader_name) + reader_var = block.create_var(name=reader_name) dtype_int = [int(t) for t in dtypes] - startup_blk.append_op( + block.append_op( type='create_py_reader', inputs={'blocking_queue': [queue_name]}, - outputs={'Out': [startup_var]}, + outputs={'Out': [reader_var]}, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, @@ -693,16 +714,23 @@ class GeneratorLoader(DataLoaderBase): 'ranks': ranks }) - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True + reader_var.desc.set_dtypes(dtypes) + reader_var.persistable = True + reader_var.stop_gradient = True - main_prog_var = _copy_reader_var_( - default_main_program().current_block(), startup_var) + if self._keep_order: + main_prog_var = reader_var + reader = main_prog_var + reader.reset = self._queue.reset + else: + main_prog_var = _copy_reader_var_( + default_main_program().current_block(), reader_var) - main_prog_var.stop_gradient = True - main_prog_var.persistable = True + main_prog_var.stop_gradient = True + main_prog_var.persistable = True + + reader = monkey_patch_reader_methods(main_prog_var) - reader = monkey_patch_reader_methods(main_prog_var) if self._use_double_buffer: double_buffer_reader = double_buffer( reader, name=double_buffer_name) @@ -765,14 +793,19 @@ class GeneratorLoader(DataLoaderBase): " to locate the data causes this issue.\n\t* Please consider using " "'fluid.create_lod_tensor' to convert it to a LoD-Tensor.")) + return arr + def _start(self): def __thread_main__(): try: + if not self._queue.wait_for_inited(): + return + for tensors in self._tensor_reader(): array = core.LoDTensorArray() for item in tensors: if not isinstance(item, core.LoDTensor): - self._check_input_array(item) + item = self._check_input_array(item) tmp = core.LoDTensor() tmp.set(item, core.CPUPlace()) item = tmp diff --git a/python/paddle/fluid/tests/unittests/test_dataloader_keep_order.py b/python/paddle/fluid/tests/unittests/test_dataloader_keep_order.py new file mode 100644 index 0000000000000000000000000000000000000000..c96adc56b95007e471ee48b51cc1b9ec5d8fbbdd --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dataloader_keep_order.py @@ -0,0 +1,185 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import unittest +import numpy as np +import os +import six + + +def create_reader(shape, batch_number): + def __impl__(): + idx = 0 + for _ in six.moves.range(batch_number): + yield np.ones(shape).astype('float32') * idx, + idx += 1 + + return __impl__ + + +class DataLoaderKeepOrderTestBase(unittest.TestCase): + def initParameters(self): + self.iterable = False + self.break_num = 10000 + + def setUp(self): + self.epoch_num = 3 + self.batch_num = 40 + self.shape = [3, 4, 5] + self.initParameters() + + def build_network(self, places): + input_data = fluid.data(shape=self.shape, dtype='float32', name="input") + loader = fluid.io.DataLoader.from_generator( + capacity=16, + feed_list=[input_data], + keep_order=True, + iterable=self.iterable) + + fc = fluid.layers.fc(input_data, size=10) + loss = fluid.layers.reduce_mean(fc) + + loader.set_batch_generator( + create_reader(self.shape, self.batch_num), + places=places if loader.iterable else None) + + return input_data, loss, loader + + def assertInputData(self, batch_id, input_data, dev_cnt): + if isinstance(input_data, list): + self.assertTrue(len(input_data), dev_cnt) + start_val = dev_cnt * batch_id + for each_input_dict in input_data: + input_tensor = np.array(each_input_dict["input"]) + self.assertEqual(self.shape, list(input_tensor.shape)) + self.assertTrue((input_tensor == start_val).all()) + start_val += 1 + else: + self.assertEqual( + list(input_data.shape), + [self.shape[0] * dev_cnt] + self.shape[1:]) + start_val = dev_cnt * batch_id + for idx in six.moves.range(dev_cnt): + data_part = input_data[idx * self.shape[0]:(idx + 1) * + self.shape[0], :] + self.assertTrue((data_part == start_val).all()) + start_val += 1 + + def get_places(self): + place_list = [fluid.cpu_places(1), fluid.cpu_places(4)] + if fluid.is_compiled_with_cuda(): + place_list.extend([fluid.cuda_places(0), fluid.cuda_places([0, 1])]) + return place_list + + def test_main(self): + for p in self.get_places(): + use_compiled_program_list = [True] if len(p) > 1 else [False, True] + for use_compiled_program in use_compiled_program_list: + self.run_main_with_place(p, use_compiled_program) + + def run_main_with_place(self, places, use_compiled_program=True): + with fluid.scope_guard(fluid.Scope()): + with fluid.program_guard(fluid.Program(), fluid.Program()): + input_data, loss, loader = self.build_network(places) + fetch_list = [input_data] + + exe = fluid.Executor(places[0]) + exe.run(fluid.default_startup_program()) + + dev_cnt = len(places) + if dev_cnt > 1: + self.assertTrue(use_compiled_program) + + main_program = fluid.default_main_program() + if use_compiled_program: + main_program = fluid.CompiledProgram( + main_program).with_data_parallel( + loss_name=loss.name, places=places) + + max_batch_num = min(self.break_num, + int(self.batch_num / dev_cnt)) + + if loader.iterable: + early_break = False + for epoch_id in six.moves.range(self.epoch_num): + early_break = False + batch_id = 0 + for data in loader(): + if batch_id >= self.break_num: + early_break = True + break + self.assertInputData(batch_id, data, dev_cnt) + fetch_val, = exe.run(program=main_program, + feed=data, + fetch_list=fetch_list) + self.assertInputData(batch_id, fetch_val, dev_cnt) + batch_id += 1 + + self.assertEqual(batch_id, max_batch_num) + + if early_break: + loader._reset() + else: + for epoch_id in six.moves.range(self.epoch_num): + batch_id = 0 + loader.start() + try: + while True: + if batch_id >= self.break_num: + loader.reset() + break + fetch_val, = exe.run(program=main_program, + fetch_list=fetch_list) + self.assertInputData(batch_id, fetch_val, + dev_cnt) + batch_id += 1 + except fluid.core.EOFException: + loader.reset() + + self.assertEqual(batch_id, max_batch_num) + + +class IterableDataLoaderKeepOrderTest2(DataLoaderKeepOrderTestBase): + def initParameters(self): + self.iterable = True + self.break_num = 10000 + + +class IterableDataLoaderKeepOrderTest3(DataLoaderKeepOrderTestBase): + def initParameters(self): + self.iterable = False + self.break_num = 2 + + +class IterableDataLoaderKeepOrderTest4(DataLoaderKeepOrderTestBase): + def initParameters(self): + self.iterable = True + self.break_num = 2 + + +class IterableDataLoaderKeepOrderTest5(DataLoaderKeepOrderTestBase): + def initParameters(self): + self.iterable = False + self.break_num = 0 + + +class IterableDataLoaderKeepOrderTest6(DataLoaderKeepOrderTestBase): + def initParameters(self): + self.iterable = True + self.break_num = 0 + + +if __name__ == '__main__': + unittest.main()