diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c907cb48b8f53609d719f2de84e269778ba55de8..1b53410d16fa443235a83cd0f98e2f66d5a58830 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -546,19 +546,39 @@ All parameter, weight, gradient are variables in Paddle. ::paddle::operators::reader::LoDTensorBlockingQueue; using LoDTensorBlockingQueueHolder = ::paddle::operators::reader::LoDTensorBlockingQueueHolder; + + using LockFreeLoDTensorBlockingQueue = + ::paddle::operators::reader::LockFreeLoDTensorBlockingQueue; + using LockFreeLoDTensorBlockingQueueHolder = + ::paddle::operators::reader::LockFreeLoDTensorBlockingQueueHolder; + py::class_>( m, "LoDTensorBlockingQueue", "") .def("push", [](LoDTensorBlockingQueue &self, - const std::vector &lod_tensor_vec) { + std::vector &lod_tensor_vec) { pybind11::gil_scoped_release release; - return self.Push(lod_tensor_vec); + return self.Push(std::move(lod_tensor_vec)); }) .def("size", &LoDTensorBlockingQueue::Size) .def("capacity", &LoDTensorBlockingQueue::Cap) .def("close", &LoDTensorBlockingQueue::Close) .def("is_closed", &LoDTensorBlockingQueue::IsClosed); + py::class_>( + m, "LockFreeLoDTensorBlockingQueue", "") + .def("push", + [](LockFreeLoDTensorBlockingQueue &self, + std::vector &lod_tensor_vec) { + pybind11::gil_scoped_release release; + return self.Push(std::move(lod_tensor_vec)); + }) + .def("size", &LockFreeLoDTensorBlockingQueue::Size) + .def("capacity", &LockFreeLoDTensorBlockingQueue::Cap) + .def("close", &LockFreeLoDTensorBlockingQueue::Close) + .def("is_closed", &LockFreeLoDTensorBlockingQueue::IsClosed); + m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity) -> std::shared_ptr { @@ -568,6 +588,15 @@ All parameter, weight, gradient are variables in Paddle. }, py::return_value_policy::copy); + m.def("init_lock_free_lod_tensor_blocking_queue", + [](Variable &var, + size_t capacity) -> std::shared_ptr { + auto *holder = var.GetMutable(); + holder->InitOnce(capacity); + return holder->GetQueue(); + }, + py::return_value_policy::copy); + py::class_(m, "_Scope", R"DOC( Scope is an association of a name to Variable. All variables belong to Scope. diff --git a/paddle/fluid/pybind/reader_py.cc b/paddle/fluid/pybind/reader_py.cc index a09d18656f10b8d35c08f76a9518d6b8ac38b15c..22f67b38bbed12488b26719000e1a2d58f2ba331 100644 --- a/paddle/fluid/pybind/reader_py.cc +++ b/paddle/fluid/pybind/reader_py.cc @@ -25,77 +25,107 @@ namespace paddle { namespace pybind { -class FeedReader { +class MultiDeviceFeedReader { + public: using ResultDictList = std::vector>; - public: - FeedReader(std::unique_ptr reader, - const std::vector &names, size_t num_places, - bool drop_last = true) - : reader_(std::move(reader)), + MultiDeviceFeedReader( + const std::shared_ptr &queue, + const std::vector &names, + const std::vector &dst_places, bool use_double_buffer) + : queue_(queue), names_(names), - num_places_(num_places), - drop_last_(drop_last) {} - - ResultDictList ReadNext() { - std::vector tensors; - reader_->ReadNext(&tensors); - if (tensors.empty()) return ResultDictList(); + pool_(new ::ThreadPool(dst_places.size())) { + std::shared_ptr reader( + new operators::reader::PyReader(queue)); + + readers_.reserve(dst_places.size()); + for (auto &p : dst_places) { + auto *holder = new framework::ReaderHolder(); + if (use_double_buffer) { + holder->Reset( + framework::MakeDecoratedReader( + reader, p, 2)); + } else { + if (platform::is_gpu_place(p)) { + PADDLE_THROW( + "Place cannot be CUDAPlace when use_double_buffer is False"); + } + holder->Reset(reader); + } + readers_.emplace_back(holder); + } - PADDLE_ENFORCE(tensors.size() % names_.size() == 0, - "Tensor size: %d, names size: %d", tensors.size(), - names_.size()); + futures_.resize(dst_places.size()); + ret_.resize(dst_places.size()); + ReadAsync(); + } - size_t read_place_num = tensors.size() / names_.size(); + ResultDictList ReadNext() { + bool success = WaitFutures(); - if (drop_last_ && read_place_num != num_places_) { - return ResultDictList(); + if (!success) { + return {}; } - ResultDictList ret(read_place_num); - for (size_t i = 0; i < tensors.size(); ++i) { - ret[i / names_.size()].emplace(names_[i % names_.size()], - std::move(tensors[i])); + ResultDictList result(ret_.size()); + for (size_t i = 0; i < ret_.size(); ++i) { + for (size_t j = 0; j < names_.size(); ++j) { + result[i].emplace(names_[j], std::move(ret_[i][j])); + } } - return ret; + ReadAsync(); + return result; } - void Start() { reader_->Start(); } + void Reset() { + Shutdown(); + Start(); - void Reset() { reader_->ResetAll(); } + ReadAsync(); + } + + ~MultiDeviceFeedReader() { + queue_->Close(); + pool_.reset(); + } private: - std::unique_ptr reader_; - std::vector names_; - size_t num_places_; - bool drop_last_; -}; + bool WaitFutures() { + bool success = true; + for (auto &f : futures_) { + success &= f.get(); + } + return success; + } -static std::unique_ptr CreatePyReader( - const std::vector< - std::shared_ptr> &queues, - const std::vector &dst_places) { - std::shared_ptr reader; - if (queues.size() == 1) { - reader.reset(new operators::reader::PyReader(queues[0])); - } else { - reader.reset(new operators::reader::MultiQueuePyReader(queues)); + void Shutdown() { + for (auto &r : readers_) r->Shutdown(); } - std::vector> buffered_reader; - buffered_reader.reserve(dst_places.size()); - for (auto &p : dst_places) { - buffered_reader.emplace_back( - framework::MakeDecoratedReader( - reader, p, 2)); + + void Start() { + for (auto &r : readers_) r->Start(); } - reader = framework::MakeDecoratedReader( - buffered_reader); - auto *holder = new framework::ReaderHolder(); - holder->Reset(reader); - return std::unique_ptr(holder); -} + void ReadAsync() { + for (size_t i = 0; i < readers_.size(); ++i) { + futures_[i] = pool_->enqueue([this, i] { + readers_[i]->ReadNext(&ret_[i]); + return !ret_[i].empty(); + }); + } + } + + std::vector names_; + std::unique_ptr<::ThreadPool> pool_; + + std::shared_ptr queue_; + std::vector> readers_; + std::vector> futures_; + std::vector> ret_; + bool drop_last_; +}; namespace py = pybind11; @@ -108,22 +138,20 @@ void BindReader(py::module *module) { .def("start", &framework::ReaderHolder::Start) .def("reset", &framework::ReaderHolder::ResetAll); - py::class_(m, "FeedReader", "") - .def("read_next", &FeedReader::ReadNext, - py::call_guard()) - .def("start", &FeedReader::Start, + py::class_(m, "MultiDeviceFeedReader", "") + .def("read_next", &MultiDeviceFeedReader::ReadNext, py::call_guard()) - .def("reset", &FeedReader::Reset, + .def("reset", &MultiDeviceFeedReader::Reset, py::call_guard()); m.def("create_py_reader", - [](const std::vector< - std::shared_ptr> - queues, + [](const std::shared_ptr + &queue, const std::vector &names, - const std::vector &dst_places, bool drop_last) { - return new FeedReader(CreatePyReader(queues, dst_places), names, - dst_places.size(), drop_last); + const std::vector &dst_places, + bool use_double_buffer) { + return new MultiDeviceFeedReader(queues, names, dst_places, + use_double_buffer); }, py::return_value_policy::take_ownership); } diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index a9b391fd53a98dc05ee2d909a38dcf82cd5880ea..639be053b00ef1b27169e0c9e96171930f82b02d 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -486,7 +486,8 @@ def _py_reader(capacity, lod_levels=None, name=None, use_double_buffer=True, - feed_list=None): + feed_list=None, + lock_free=False): if feed_list is not None: if not isinstance(feed_list, list): @@ -526,12 +527,17 @@ def _py_reader(capacity, double_buffer_name = "_".join([name, "double_buffer"]) var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) + if not lock_free: + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) + else: + feed_queue = core.init_lock_free_lod_tensor_blocking_queue(var, + capacity) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=reader_name) startup_blk.append_op( - type='create_py_reader', + type='create_py_reader' + if not lock_free else 'create_lock_free_py_reader', inputs={'blocking_queue': [queue_name]}, outputs={'Out': [startup_var]}, attrs={ @@ -638,7 +644,8 @@ def py_reader(capacity, dtypes, lod_levels=None, name=None, - use_double_buffer=True): + use_double_buffer=True, + lock_free=False): """ Create a Python reader for data feeding in Python @@ -763,7 +770,8 @@ def py_reader(capacity, dtypes=dtypes, lod_levels=lod_levels, name=name, - use_double_buffer=use_double_buffer) + use_double_buffer=use_double_buffer, + lock_free=lock_free) def create_py_reader_by_data(capacity, diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 8352587f23dce75276b317053f35f26bc882666a..7c95ea20e3f65aa337f215e9993b619a949e029f 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -17,6 +17,7 @@ import six import threading from .framework import Program, Variable, program_guard from .data_feeder import DataFeeder +import paddle.reader.decorator as decorator __all__ = ['PyReader'] @@ -36,8 +37,8 @@ def _convert_places(places): return ret -class PyReader(object): - def __init__(self, feed_list, places, capacity, multi_queue=True): +class PyReader(Reader): + def __init__(self, feed_list, places, capacity): self._tensor_reader = None self._thread = None @@ -53,15 +54,11 @@ class PyReader(object): self._queue_capacity = capacity - queue_num = len(self._places) if multi_queue else 1 - for _ in six.moves.range(queue_num): - self._queues.append( - core.init_lod_tensor_blocking_queue(core.Variable(), - self._queue_capacity)) + self.queue = core.init_lod_tensor_blocking_queue(core.Variable(), + self._queue_capacity) - self._reader = core.create_py_reader(self._queues, self._var_names, + self._reader = core.create_py_reader(self._queue, self._var_names, self._places, self._drop_last) - self._exited = True def __call__(self): assert self._tensor_reader is not None, \ @@ -77,7 +74,7 @@ class PyReader(object): def next(self): ret = self._reader.read_next() - if len(ret): + if ret: return ret else: self._reset() @@ -86,18 +83,11 @@ class PyReader(object): return Iterator(self) def _reset(self): - if not self._exited: - for q in self._queues: - q.close() - if self._thread: + self._reader.reset() self._thread.join() - self._reader.reset() - def __thread_main__(): - queue_num = len(self._queues) - idx = 0 for tensors in self._tensor_reader(): array = core.LoDTensorArray() for item in tensors: @@ -108,19 +98,13 @@ class PyReader(object): array.append(item) - if not self._queues[idx].push(array): + if not self.queue.push(array): break - idx = (idx + 1) % queue_num - - for q in self._queues: - q.close() - - self._exited = True + self.queue.close() self._thread = threading.Thread(target=__thread_main__) self._thread.daemon = True - self._exited = False self._thread.start() def set_numpy_reader(self, reader): diff --git a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py index 807cbaf39d1d2b949a50444c554ffff71a9e9688..dd64f10395dc6c165e92375c2806db4543f4f5b2 100644 --- a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py +++ b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py @@ -31,7 +31,7 @@ def random_reader(): yield image, label -def simple_fc_net(places, use_legacy_py_reader): +def simple_fc_net(places, use_legacy_py_reader, lock_free=False): startup_prog = fluid.Program() main_prog = fluid.Program() startup_prog.random_seed = 1 @@ -55,7 +55,8 @@ def simple_fc_net(places, use_legacy_py_reader): py_reader = fluid.layers.py_reader( capacity=4, shapes=[(-1, 784), (-1, 1)], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + lock_free=lock_free) image, label = fluid.layers.read_file(py_reader) py_reader.decorate_paddle_reader(reader) @@ -82,7 +83,8 @@ def simple_fc_net(places, use_legacy_py_reader): class TestBase(unittest.TestCase): def run_main(self, use_legacy_py_reader, with_data_parallel, places): - with fluid.scope_guard(fluid.Scope()): + scope = fluid.Scope() + with fluid.scope_guard(scope): startup_prog, main_prog, py_reader, loss = simple_fc_net( places, use_legacy_py_reader) exe = fluid.Executor(place=places[0]) @@ -94,21 +96,29 @@ class TestBase(unittest.TestCase): loss_name=loss.name, places=places) step = 0 + step_list = [] start_t = time.time() if use_legacy_py_reader: for _ in six.moves.range(EPOCH_NUM): + step = 0 py_reader.start() while True: try: - L, = exe.run(program=prog, fetch_list=[loss]) + L, = exe.run(program=prog, + fetch_list=[loss], + use_program_cache=True) + # print('runned', step, py_reader.queue.is_closed(), py_reader.queue.size()) step += 1 except fluid.core.EOFException: + # print('try to reset') py_reader.reset() + # print('reseted') break + step_list.append(step) else: for _ in six.moves.range(EPOCH_NUM): + step = 0 for d in py_reader(): - ''' assert len(d) == len(places) for i, item in enumerate(d): image = item['image'] @@ -117,18 +127,25 @@ class TestBase(unittest.TestCase): assert label.shape() == [BATCH_SIZE, 1] assert image._place()._equals(places[i]) assert label._place()._equals(places[i]) - ''' - L, = exe.run(program=prog, feed=d, fetch_list=[loss]) + L, = exe.run(program=prog, + feed=d, + fetch_list=[loss], + use_program_cache=True) step += 1 + step_list.append(step) end_t = time.time() - return {"time": end_t - start_t, "step": step} - - def prepare_places(self, with_data_parallel): - places = [[fluid.CPUPlace()], ] - if with_data_parallel: - places.append([fluid.CPUPlace()] * 2) + ret = {"time": end_t - start_t, "step": step_list} + scope._remove_from_pool() + return ret + + def prepare_places(self, with_data_parallel, with_cpu=False, with_gpu=True): + places = [] + if with_cpu: + places.append([fluid.CPUPlace()]) + if with_data_parallel: + places.append([fluid.CPUPlace()] * 2) - if fluid.core.is_compiled_with_cuda(): + if with_gpu and fluid.core.is_compiled_with_cuda(): tmp = fluid.cuda_places() assert len(tmp) > 0, "no gpu detected" if with_data_parallel: @@ -140,7 +157,10 @@ class TestBase(unittest.TestCase): for with_data_parallel in [True, False]: for p in self.prepare_places(with_data_parallel): t = [] - for use_legacy_py_reader in [False, True]: + for use_legacy_py_reader in [ + False + ]: #[True, False]: #[False, True]: + print(p, use_legacy_py_reader) ret = self.run_main( use_legacy_py_reader=use_legacy_py_reader, with_data_parallel=with_data_parallel, @@ -148,7 +168,7 @@ class TestBase(unittest.TestCase): ret['legacy'] = use_legacy_py_reader ret['data_parallel'] = with_data_parallel ret['places'] = p - t.append(ret) + t.append([ret['step'], ]) #, ret['places']]) print(t)