diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 2544b7308c20daedd63e5b8866f3ee4fb0b71f36..946e264f055a8a45dce82c4a7a7a8006470f8c18 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -10,6 +10,9 @@ paddle.fluid.default_startup_program ArgSpec(args=[], varargs=None, keywords=Non paddle.fluid.default_main_program ArgSpec(args=[], varargs=None, keywords=None, defaults=None) paddle.fluid.program_guard ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.name_scope ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.cuda_places ArgSpec(args=['device_ids'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.cpu_places ArgSpec(args=['device_count'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.cuda_pinned_places ArgSpec(args=['device_count'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.Executor.__init__ ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None) paddle.fluid.Executor.close ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.Executor.run ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False)) @@ -44,7 +47,7 @@ paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'fi paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None) paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program'], varargs=None, keywords=None, defaults=None) -paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'], varargs=None, keywords=None, defaults=(None, None, None, None)) +paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from', 'places'], varargs=None, keywords=None, defaults=(None, None, None, None, None)) paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None) paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None paddle.fluid.BuildStrategy.GradientScaleStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.BuildStrategy.GradientScaleStrategy, arg0: int) -> None @@ -58,6 +61,11 @@ paddle.fluid.io.load_params ArgSpec(args=['executor', 'dirname', 'main_program', paddle.fluid.io.load_persistables ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.io.save_inference_model ArgSpec(args=['dirname', 'feeded_var_names', 'target_vars', 'executor', 'main_program', 'model_filename', 'params_filename', 'export_for_deployment'], varargs=None, keywords=None, defaults=(None, None, None, True)) paddle.fluid.io.load_inference_model ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename', 'pserver_endpoints'], varargs=None, keywords=None, defaults=(None, None, None)) +paddle.fluid.io.PyReader.__init__ ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable'], varargs=None, keywords=None, defaults=(True, True)) +paddle.fluid.io.PyReader.decorate_paddle_reader ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.io.PyReader.decorate_tensor_provider ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.io.PyReader.reset ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) +paddle.fluid.io.PyReader.start ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.initializer.ConstantInitializer.__init__ ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False)) paddle.fluid.initializer.UniformInitializer.__init__ ArgSpec(args=['self', 'low', 'high', 'seed'], varargs=None, keywords=None, defaults=(-1.0, 1.0, 0)) paddle.fluid.initializer.NormalInitializer.__init__ ArgSpec(args=['self', 'loc', 'scale', 'seed'], varargs=None, keywords=None, defaults=(0.0, 1.0, 0)) diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 61120dcf12620f5c53cfc09a5a277f6bc840f799..82562bf883d88787858912f7039cf8fef003eccf 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -54,7 +54,6 @@ class ReaderBase { private: friend class DecoratedReader; - friend class MultiDecoratedReader; // These methods can be only invoked inside DecoratedReader to record the // decorating chain. void InsertDecoratedReader( @@ -63,20 +62,15 @@ class ReaderBase { std::vector> decorated_readers_; }; -class DecoratedReaderBase : public ReaderBase { - public: - virtual void RegisterDecorateChain() = 0; -}; - -class DecoratedReader : public DecoratedReaderBase, +class DecoratedReader : public ReaderBase, public std::enable_shared_from_this { public: explicit DecoratedReader(const std::shared_ptr& reader) - : DecoratedReaderBase(), reader_(reader) { + : ReaderBase(), reader_(reader) { PADDLE_ENFORCE_NOT_NULL(reader_); } - void RegisterDecorateChain() final { + void RegisterDecorateChain() { reader_->InsertDecoratedReader(shared_from_this()); } @@ -90,41 +84,6 @@ class DecoratedReader : public DecoratedReaderBase, std::shared_ptr reader_; }; -class MultiDecoratedReader - : public DecoratedReaderBase, - public std::enable_shared_from_this { - public: - explicit MultiDecoratedReader( - const std::vector>& readers) - : readers_(readers) { - PADDLE_ENFORCE(!readers_.empty()); - for (auto& r : readers_) { - PADDLE_ENFORCE_NOT_NULL(r); - } - } - - void RegisterDecorateChain() final { - for (auto& r : readers_) { - r->InsertDecoratedReader(shared_from_this()); - } - } - - protected: - void ShutdownImpl() override { - for (auto& r : readers_) { - r->Shutdown(); - } - } - - void StartImpl() override { - for (auto& r : readers_) { - r->Start(); - } - } - - std::vector> readers_; -}; - // FileReader is just a conceptual class. class FileReader : public ReaderBase {}; @@ -173,10 +132,8 @@ class ReaderHolder { }; template -inline std::shared_ptr MakeDecoratedReader( - ARGS&&... args) { - std::shared_ptr reader( - new T(std::forward(args)...)); +inline std::shared_ptr MakeDecoratedReader(ARGS&&... args) { + std::shared_ptr reader(new T(std::forward(args)...)); reader->RegisterDecorateChain(); return reader; } diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 2701e10b30313f48895d24a1fd96eee8155401a5..5ee1206175600cd668ccbbf5b98053708a4406d3 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -18,7 +18,6 @@ function(reader_library TARGET_NAME) endfunction() cc_library(py_reader SRCS py_reader.cc DEPS reader) -cc_library(compose_reader SRCS compose_reader.cc DEPS reader) cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool) reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader) @@ -41,7 +40,7 @@ cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) # Export local libraries to parent # set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) -op_library(read_op DEPS py_reader compose_reader buffered_reader) +op_library(read_op DEPS py_reader buffered_reader) foreach(src ${LOCAL_READER_LIBS}) set(OP_LIBRARY ${src} ${OP_LIBRARY} CACHE INTERNAL "op libs") diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index b76f482c5755cc16e7226d074ccb404be5e0c7db..7962c0332dbf53964090177b5db0f186d63406f2 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -114,11 +114,6 @@ class BlockingQueue { return queue_.size(); } - void Clear() { - std::lock_guard lock(mutex_); - queue_.clear(); - } - private: size_t capacity_; bool speed_test_mode_; diff --git a/paddle/fluid/operators/reader/compose_reader.cc b/paddle/fluid/operators/reader/compose_reader.cc deleted file mode 100644 index 4b88b9331ce4387bece0406ec41c97425cb27586..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/compose_reader.cc +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/compose_reader.h" - -namespace paddle { -namespace operators { -namespace reader { - -ComposeReader::ComposeReader( - const std::vector> &readers) - : framework::MultiDecoratedReader(readers) {} - -void ComposeReader::ReadNext(std::vector *out) { - out->clear(); - std::vector each_ret; - for (auto &r : readers_) { - r->ReadNext(&each_ret); - out->reserve(out->size() + each_ret.size()); - for (auto &data : each_ret) { - out->emplace_back(std::move(data)); - } - } -} - -} // namespace reader -} // namespace operators -} // namespace paddle diff --git a/paddle/fluid/operators/reader/compose_reader.h b/paddle/fluid/operators/reader/compose_reader.h deleted file mode 100644 index c9e2a2d72f660ec684100aa7cd3de2b3a80142d8..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/compose_reader.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include "paddle/fluid/framework/reader.h" - -namespace paddle { -namespace operators { -namespace reader { - -class ComposeReader : public framework::MultiDecoratedReader { - public: - explicit ComposeReader( - const std::vector> &readers); - - void ReadNext(std::vector *out) override; -}; - -} // namespace reader -} // namespace operators -} // namespace paddle diff --git a/paddle/fluid/operators/reader/py_reader.cc b/paddle/fluid/operators/reader/py_reader.cc index dc84faa9742aeb3a2f03e7599721d3e84c06bf60..f2c28c1df89bcb8109fea7859278799ad3a0e2ed 100644 --- a/paddle/fluid/operators/reader/py_reader.cc +++ b/paddle/fluid/operators/reader/py_reader.cc @@ -36,43 +36,6 @@ void PyReader::Shutdown() { queue_->Close(); } void PyReader::Start() { queue_->ReOpen(); } -MultiQueuePyReader::MultiQueuePyReader( - const std::vector>& queues) - : queues_(queues) { - PADDLE_ENFORCE(!queues_.empty()); - for (auto& q : queues_) { - PADDLE_ENFORCE_NOT_NULL(q); - } -} - -void MultiQueuePyReader::ReadNext(std::vector* out) { - auto idx = read_out_idx_.fetch_add(1) % queues_.size(); - for (size_t i = 0; i < queues_.size(); ++i) { - *out = queues_[idx]->Pop(); - if (!out->empty()) return; - idx = (idx + 1) % queues_.size(); - } -} - -MultiQueuePyReader::~MultiQueuePyReader() { - for (auto& q : queues_) { - q->Close(); - } -} - -void MultiQueuePyReader::Shutdown() { - for (auto& q : queues_) { - q->Close(); - } - read_out_idx_.store(0, std::memory_order::memory_order_seq_cst); -} - -void MultiQueuePyReader::Start() { - for (auto& q : queues_) { - q->ReOpen(); - } -} - } // namespace reader } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/reader/py_reader.h b/paddle/fluid/operators/reader/py_reader.h index 146a2351e5acf799dbe7d2ad45a4eeee91a3e0b6..7d760eca64fb3122989acdf710feb4c1af42fb06 100644 --- a/paddle/fluid/operators/reader/py_reader.h +++ b/paddle/fluid/operators/reader/py_reader.h @@ -39,24 +39,6 @@ class PyReader : public framework::FileReader { std::shared_ptr queue_; }; -class MultiQueuePyReader : public framework::FileReader { - public: - explicit MultiQueuePyReader( - const std::vector>& queues); - - void ReadNext(std::vector* out) override; - - ~MultiQueuePyReader(); - - void Shutdown() override; - - void Start() override; - - private: - std::vector> queues_; - std::atomic read_out_idx_{0}; -}; - } // namespace reader } // namespace operators } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 1b53410d16fa443235a83cd0f98e2f66d5a58830..2acedca245f4181c4c67ccb7442cf61a35d744f7 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -547,11 +547,6 @@ All parameter, weight, gradient are variables in Paddle. using LoDTensorBlockingQueueHolder = ::paddle::operators::reader::LoDTensorBlockingQueueHolder; - using LockFreeLoDTensorBlockingQueue = - ::paddle::operators::reader::LockFreeLoDTensorBlockingQueue; - using LockFreeLoDTensorBlockingQueueHolder = - ::paddle::operators::reader::LockFreeLoDTensorBlockingQueueHolder; - py::class_>( m, "LoDTensorBlockingQueue", "") .def("push", @@ -565,20 +560,6 @@ All parameter, weight, gradient are variables in Paddle. .def("close", &LoDTensorBlockingQueue::Close) .def("is_closed", &LoDTensorBlockingQueue::IsClosed); - py::class_>( - m, "LockFreeLoDTensorBlockingQueue", "") - .def("push", - [](LockFreeLoDTensorBlockingQueue &self, - std::vector &lod_tensor_vec) { - pybind11::gil_scoped_release release; - return self.Push(std::move(lod_tensor_vec)); - }) - .def("size", &LockFreeLoDTensorBlockingQueue::Size) - .def("capacity", &LockFreeLoDTensorBlockingQueue::Cap) - .def("close", &LockFreeLoDTensorBlockingQueue::Close) - .def("is_closed", &LockFreeLoDTensorBlockingQueue::IsClosed); - m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity) -> std::shared_ptr { @@ -588,15 +569,6 @@ All parameter, weight, gradient are variables in Paddle. }, py::return_value_policy::copy); - m.def("init_lock_free_lod_tensor_blocking_queue", - [](Variable &var, - size_t capacity) -> std::shared_ptr { - auto *holder = var.GetMutable(); - holder->InitOnce(capacity); - return holder->GetQueue(); - }, - py::return_value_policy::copy); - py::class_(m, "_Scope", R"DOC( Scope is an association of a name to Variable. All variables belong to Scope. @@ -777,8 +749,6 @@ All parameter, weight, gradient are variables in Paddle. .def("_equals", &IsSamePlace) .def("_equals", &IsSamePlace) - .def("gpu_device_id", - [](platform::CUDAPlace &self) { return self.device; }) .def("__str__", string::to_string); py::class_(m, "CPUPlace") diff --git a/paddle/fluid/pybind/reader_py.cc b/paddle/fluid/pybind/reader_py.cc index 22f67b38bbed12488b26719000e1a2d58f2ba331..8af049031046c365cadd4e62305414a87a7a5ec7 100644 --- a/paddle/fluid/pybind/reader_py.cc +++ b/paddle/fluid/pybind/reader_py.cc @@ -17,7 +17,6 @@ #include #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/operators/reader/buffered_reader.h" -#include "paddle/fluid/operators/reader/compose_reader.h" #include "paddle/fluid/operators/reader/py_reader.h" #include "paddle/fluid/platform/place.h" #include "pybind11/stl.h" @@ -82,7 +81,6 @@ class MultiDeviceFeedReader { void Reset() { Shutdown(); Start(); - ReadAsync(); } @@ -117,14 +115,14 @@ class MultiDeviceFeedReader { } } + std::shared_ptr queue_; std::vector names_; std::unique_ptr<::ThreadPool> pool_; - std::shared_ptr queue_; std::vector> readers_; + std::vector> futures_; std::vector> ret_; - bool drop_last_; }; namespace py = pybind11; @@ -150,7 +148,7 @@ void BindReader(py::module *module) { const std::vector &names, const std::vector &dst_places, bool use_double_buffer) { - return new MultiDeviceFeedReader(queues, names, dst_places, + return new MultiDeviceFeedReader(queue, names, dst_places, use_double_buffer); }, py::return_value_policy::take_ownership); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 639be053b00ef1b27169e0c9e96171930f82b02d..6b9e00035888cc3e4ea7421b82a7fa06807fa0a1 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -486,8 +486,7 @@ def _py_reader(capacity, lod_levels=None, name=None, use_double_buffer=True, - feed_list=None, - lock_free=False): + feed_list=None): if feed_list is not None: if not isinstance(feed_list, list): @@ -527,11 +526,7 @@ def _py_reader(capacity, double_buffer_name = "_".join([name, "double_buffer"]) var = global_scope().var(queue_name) - if not lock_free: - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) - else: - feed_queue = core.init_lock_free_lod_tensor_blocking_queue(var, - capacity) + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=reader_name) @@ -644,8 +639,7 @@ def py_reader(capacity, dtypes, lod_levels=None, name=None, - use_double_buffer=True, - lock_free=False): + use_double_buffer=True): """ Create a Python reader for data feeding in Python @@ -770,8 +764,7 @@ def py_reader(capacity, dtypes=dtypes, lod_levels=lod_levels, name=name, - use_double_buffer=use_double_buffer, - lock_free=lock_free) + use_double_buffer=use_double_buffer) def create_py_reader_by_data(capacity, diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 7c95ea20e3f65aa337f215e9993b619a949e029f..f29231589ebb20da54b3d88b8a8f68b0fb42fe63 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -15,9 +15,11 @@ import core import six import threading -from .framework import Program, Variable, program_guard +from .framework import Program, Variable, program_guard, default_main_program, default_startup_program +from .executor import global_scope from .data_feeder import DataFeeder -import paddle.reader.decorator as decorator +from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer +import unique_name __all__ = ['PyReader'] @@ -37,30 +39,101 @@ def _convert_places(places): return ret -class PyReader(Reader): - def __init__(self, feed_list, places, capacity): +class PyReader(object): + unique_name_generator = unique_name.UniqueNameGenerator() + + def __init__(self, + feed_list, + capacity, + use_double_buffer=True, + iterable=True): self._tensor_reader = None self._thread = None - - # TODO(zjl): to support drop_last = False - self._drop_last = True - + self._iterable = iterable + self._use_double_buffer = use_double_buffer + self._capacity = capacity self._feed_list = feed_list - self._var_names = [v.name for v in feed_list] - - self._queues = [] + self._scope = global_scope() + if not self._iterable: + self._init_non_iterable() + def _init_iterable(self, places): + self._var_names = [v.name for v in self._feed_list] self._places = _convert_places(places) - - self._queue_capacity = capacity - - self.queue = core.init_lod_tensor_blocking_queue(core.Variable(), - self._queue_capacity) - - self._reader = core.create_py_reader(self._queue, self._var_names, - self._places, self._drop_last) + self._queue = core.init_lod_tensor_blocking_queue(core.Variable(), + self._capacity) + self._reader = core.create_py_reader( + self.queue, self._var_names, self._places, self._use_double_buffer) + + def _init_non_iterable(self): + lod_levels = [] + dtypes = [] + shape_concat = [] + ranks = [] + shapes = [] + + for feed_data in self._feed_list: + dtypes.append(feed_data.dtype) + shape_concat.extend(feed_data.shape) + ranks.append(len(feed_data.shape)) + shapes.append(feed_data.shape) + lod_levels.append(feed_data.lod_level) + + queue_name = PyReader.unique_name_generator('lod_tensor_blocking_queue') + reader_name = PyReader.unique_name_generator('create_py_reader') + double_buffer_name = PyReader.unique_name_generator('double_buffer') + + var = self._scope.var(queue_name) + self._queue = core.init_lod_tensor_blocking_queue(var, self._capacity) + + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var(name=reader_name) + + startup_blk.append_op( + type='create_py_reader', + inputs={'blocking_queue': [queue_name]}, + outputs={'Out': [startup_var]}, + attrs={ + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks + }) + + startup_var.desc.set_dtypes(dtypes) + startup_var.persistable = True + + main_prog_var = _copy_reader_var_( + default_main_program().current_block(), startup_var) + + main_prog_var.stop_gradient = True + main_prog_var.persistable = True + + reader = monkey_patch_reader_methods(main_prog_var) + if self._use_double_buffer: + double_buffer_reader = double_buffer( + reader, name=double_buffer_name) + # we return a double buffer reader. However, the reset method comes from + # py_reader. + double_buffer_reader.reset = reader.reset + reader = double_buffer_reader + + self._reader = reader + + default_main_program().current_block().append_op( + type='read', + inputs={'Reader': [self._reader]}, + outputs={'Out': self._feed_list}) + + @property + def queue(self): + return self._queue + + @property + def iterable(self): + return self._iterable def __call__(self): + assert self.iterable, "PyReader is not iterable" assert self._tensor_reader is not None, \ "Data source of PyReader has not set yet" @@ -80,13 +153,22 @@ class PyReader(Reader): self._reset() raise StopIteration + self._start() return Iterator(self) def _reset(self): - if self._thread: - self._reader.reset() - self._thread.join() + self._reader.reset() + self._thread.join() + + def start(self): + assert not self._iterable, "start() cannot be called when PyReader is iterable" + self._start() + def reset(self): + assert not self._iterable, "reset() cannot be called when PyReader is iterable" + self._reset() + + def _start(self): def __thread_main__(): for tensors in self._tensor_reader(): array = core.LoDTensorArray() @@ -98,16 +180,16 @@ class PyReader(Reader): array.append(item) - if not self.queue.push(array): + if not self._queue.push(array): break - self.queue.close() + self._queue.close() self._thread = threading.Thread(target=__thread_main__) self._thread.daemon = True self._thread.start() - def set_numpy_reader(self, reader): + def decorate_paddle_reader(self, reader, places=None): assert self._tensor_reader is None, \ "Cannot reset the data source of PyReader" with program_guard(Program(), Program()): @@ -119,10 +201,12 @@ class PyReader(Reader): for slots in paddle_reader(): yield [slots[var.name] for var in self._feed_list] - self.set_tensor_reader(__tensor_reader_impl__) + self.decorate_tensor_provider(__tensor_reader_impl__, places) - def set_tensor_reader(self, reader): + def decorate_tensor_provider(self, reader, places=None): assert self._tensor_reader is None, \ "Cannot reset the data source of PyReader" self._tensor_reader = reader - self._reset() + if self._iterable: + assert places is not None, "Places cannot be None when py_reader is iterable" + self._init_iterable(places) diff --git a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py index dd64f10395dc6c165e92375c2806db4543f4f5b2..96a11edd496661148e064f41e36024cdb7539bba 100644 --- a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py +++ b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py @@ -31,35 +31,22 @@ def random_reader(): yield image, label -def simple_fc_net(places, use_legacy_py_reader, lock_free=False): +def simple_fc_net(places, use_legacy_py_reader, use_double_buffer): startup_prog = fluid.Program() main_prog = fluid.Program() startup_prog.random_seed = 1 main_prog.random_seed = 1 - reader = paddle.batch(random_reader, batch_size=BATCH_SIZE) with fluid.unique_name.guard(): with fluid.program_guard(main_prog, startup_prog): - if not use_legacy_py_reader: - image = fluid.layers.data( - name='image', shape=[784], dtype='float32') - label = fluid.layers.data( - name='label', shape=[1], dtype='int64') - py_reader = fluid.io.PyReader( - feed_list=[image, label], - places=places, - capacity=4, - multi_queue=False) - py_reader.set_numpy_reader(reader) - else: - py_reader = fluid.layers.py_reader( - capacity=4, - shapes=[(-1, 784), (-1, 1)], - dtypes=['float32', 'int64'], - lock_free=lock_free) - image, label = fluid.layers.read_file(py_reader) - py_reader.decorate_paddle_reader(reader) - + image = fluid.layers.data( + name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + py_reader = fluid.io.PyReader( + feed_list=[image, label], + capacity=4, + iterable=not use_legacy_py_reader, + use_double_buffer=use_double_buffer) hidden = image for hidden_size in [10, 20, 30]: hidden = fluid.layers.fc( @@ -82,11 +69,19 @@ def simple_fc_net(places, use_legacy_py_reader, lock_free=False): class TestBase(unittest.TestCase): - def run_main(self, use_legacy_py_reader, with_data_parallel, places): + def run_main(self, use_legacy_py_reader, with_data_parallel, places, + use_double_buffer): scope = fluid.Scope() with fluid.scope_guard(scope): startup_prog, main_prog, py_reader, loss = simple_fc_net( - places, use_legacy_py_reader) + places, use_legacy_py_reader, use_double_buffer) + + reader = paddle.batch(random_reader, batch_size=BATCH_SIZE) + + ps = places if use_double_buffer else fluid.cpu_places(len(places)) + py_reader.decorate_paddle_reader( + reader, places=ps if py_reader.iterable else None) + exe = fluid.Executor(place=places[0]) exe.run(startup_prog) @@ -98,7 +93,7 @@ class TestBase(unittest.TestCase): step = 0 step_list = [] start_t = time.time() - if use_legacy_py_reader: + if not py_reader.iterable: for _ in six.moves.range(EPOCH_NUM): step = 0 py_reader.start() @@ -107,12 +102,9 @@ class TestBase(unittest.TestCase): L, = exe.run(program=prog, fetch_list=[loss], use_program_cache=True) - # print('runned', step, py_reader.queue.is_closed(), py_reader.queue.size()) step += 1 except fluid.core.EOFException: - # print('try to reset') py_reader.reset() - # print('reseted') break step_list.append(step) else: @@ -125,8 +117,8 @@ class TestBase(unittest.TestCase): label = item['label'] assert image.shape() == [BATCH_SIZE, 784] assert label.shape() == [BATCH_SIZE, 1] - assert image._place()._equals(places[i]) - assert label._place()._equals(places[i]) + assert image._place()._equals(ps[i]) + assert label._place()._equals(ps[i]) L, = exe.run(program=prog, feed=d, fetch_list=[loss], @@ -138,7 +130,7 @@ class TestBase(unittest.TestCase): scope._remove_from_pool() return ret - def prepare_places(self, with_data_parallel, with_cpu=False, with_gpu=True): + def prepare_places(self, with_data_parallel, with_cpu=True, with_gpu=True): places = [] if with_cpu: places.append([fluid.CPUPlace()]) @@ -156,21 +148,13 @@ class TestBase(unittest.TestCase): def test_main(self): for with_data_parallel in [True, False]: for p in self.prepare_places(with_data_parallel): - t = [] - for use_legacy_py_reader in [ - False - ]: #[True, False]: #[False, True]: - print(p, use_legacy_py_reader) - ret = self.run_main( - use_legacy_py_reader=use_legacy_py_reader, - with_data_parallel=with_data_parallel, - places=p) - ret['legacy'] = use_legacy_py_reader - ret['data_parallel'] = with_data_parallel - ret['places'] = p - t.append([ret['step'], ]) #, ret['places']]) - - print(t) + for use_double_buffer in [False, True]: + for use_legacy_py_reader in [False, True]: + ret = self.run_main( + use_legacy_py_reader=use_legacy_py_reader, + with_data_parallel=with_data_parallel, + places=p, + use_double_buffer=use_double_buffer) if __name__ == '__main__':