From 502faf62a991d0421969de114393b95f73f0bcae Mon Sep 17 00:00:00 2001 From: sneaxiy Date: Mon, 25 Jun 2018 11:07:06 +0000 Subject: [PATCH] complete_py_reader_cpp --- benchmark/fluid/args.py | 10 -- benchmark/fluid/fluid_benchmark.py | 86 +++----------- benchmark/fluid/models/machine_translation.py | 2 +- benchmark/fluid/models/mnist.py | 29 +---- benchmark/fluid/models/resnet.py | 20 +--- .../fluid/models/stacked_dynamic_lstm.py | 2 +- benchmark/fluid/models/vgg.py | 29 ++--- paddle/fluid/operators/reader/CMakeLists.txt | 2 +- .../fluid/operators/reader/blocking_queue.h | 17 ++- .../operators/reader/create_py_reader_op.cc | 81 +++++++++++++ .../reader/lod_tensor_blocking_queue.h | 107 ++++++++++++++++++ paddle/fluid/pybind/pybind.cc | 62 +++++----- python/paddle/fluid/layers/io.py | 57 +--------- 13 files changed, 264 insertions(+), 240 deletions(-) create mode 100644 paddle/fluid/operators/reader/create_py_reader_op.cc create mode 100644 paddle/fluid/operators/reader/lod_tensor_blocking_queue.h diff --git a/benchmark/fluid/args.py b/benchmark/fluid/args.py index dcd4ee2324..68a3d42d7a 100644 --- a/benchmark/fluid/args.py +++ b/benchmark/fluid/args.py @@ -122,15 +122,5 @@ def parse_args(): type=str, default="", help='Directory that contains all the training recordio files.') - parser.add_argument( - '--use_py_reader_op', - action='store_true', - help='Whether to use Python reader op, omitted when use_reader_op is true' - ) - parser.add_argument( - '--feed_queue_capacity', - type=int, - default=64, - help='Capacity of feed queue when use_py_reader_op is true') args = parser.parse_args() return args diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index b5acb6549f..ece1102dce 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -25,9 +25,6 @@ import paddle.fluid.profiler as profiler import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler from args import * -import threading - -feed_queue = None def append_nccl2_prepare(trainer_id): @@ -134,7 +131,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, exe = fluid.Executor(place) exe.run(startup_prog) - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: feed_var_list = [ var for var in train_prog.global_block().vars.itervalues() if var.is_data @@ -144,12 +141,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, iters, num_samples, start_time = 0, 0, time.time() for pass_id in range(args.pass_num): train_losses = [] - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: reader_generator = train_reader() batch_id = 0 data = None while True: - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: data = next(reader_generator, None) if data == None: break @@ -159,7 +156,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, start_time = time.time() num_samples = 0 - if args.use_reader_op or args.use_py_reader_op: + if args.use_reader_op: try: loss = exe.run(train_prog, fetch_list=[avg_loss]) except fluid.core.EnforceNotMet as ex: @@ -173,7 +170,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, # FIXME(wuyi): For use_reader_op, if the current # pass is not the last, the last batch of this pass # is also equal to args.batch_size. - if args.use_reader_op or args.use_py_reader_op: + if args.use_reader_op: num_samples += args.batch_size * args.gpus else: num_samples += len(data) @@ -183,13 +180,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, print_train_time(start_time, time.time(), num_samples) print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))), # evaluation - if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: + if not args.no_test and batch_acc and not args.use_reader_op: pass_test_acc = test(exe, infer_prog, test_reader, feeder, batch_acc) print(", Test Accuracy: %f" % pass_test_acc) print("\n") # TODO(wuyi): add warmup passes to get better perf data. - close_feed_queue() exit(0) @@ -199,7 +195,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, args, train_prog, startup_prog, nccl_id_var, num_trainers, trainer_id): place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: feed_var_list = [ var for var in train_prog.global_block().vars.itervalues() if var.is_data @@ -242,12 +238,12 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, num_samples = 0 iters = 0 start_time = time.time() - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: reader_generator = train_reader() batch_id = 0 data = None while True: - if not args.use_reader_op and not args.use_py_reader_op: + if not args.use_reader_op: data = next(reader_generator, None) if data == None: break @@ -261,14 +257,14 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, if iters == args.skip_batch_num: start_time = time.time() num_samples = 0 - if args.use_fake_data or args.use_reader_op or args.use_py_reader_op: + if args.use_fake_data or args.use_reader_op: try: loss, = exe.run([avg_loss.name]) except fluid.core.EnforceNotMet as ex: break else: loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) - if args.use_reader_op or args.use_py_reader_op: + if args.use_reader_op: num_samples += args.batch_size * args.gpus else: num_samples += len(data) @@ -279,7 +275,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_id += 1 print_train_time(start_time, time.time(), num_samples) - if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: + if not args.no_test and batch_acc and not args.use_reader_op: # we have not implement record io for test # skip test when use args.use_reader_op test_acc = test(startup_exe, infer_prog, test_reader, feeder, @@ -311,46 +307,7 @@ def print_paddle_envs(): print('------------------------------------------------') -def feed_data(feed_queue, train_reader, test_reader, dshapes, args): - train_cnt = 0 - test_cnt = 0 - print_per_train_batch = 1 - train_data_generator = train_reader() - start = time.time() - while True: - next_data = next(train_data_generator, None) - if next_data is None: - break - - next_data = list(next_data) - for i in range(len(next_data)): - if not isinstance(next_data[i], np.ndarray): - next_data[i] = np.array(next_data[i]) - next_data[i] = next_data[i].reshape([-1] + dshapes[i]) - - if not feed_queue.enqueue(next_data): - break - - train_cnt += 1 - ''' - if train_cnt % print_per_train_batch == 0: - end = time.time() - print('Feed queue size: %d, capacity: %d, speed: %.5fsec/batch' - % (feed_queue.size(), feed_queue.capacity(), (end-start)/print_per_train_batch)) - start = end - ''' - feed_queue.close() - - -def close_feed_queue(): - global feed_queue - if feed_queue is not None: - feed_queue.close() - - def main(): - global feed_queue - args = parse_args() print_arguments(args) print_paddle_envs() @@ -364,23 +321,8 @@ def main(): pr = cProfile.Profile() pr.enable() model_def = __import__("models.%s" % args.model, fromlist=["models"]) - model = model_def.get_model(args) - - if not args.use_reader_op and args.use_py_reader_op: - feed_queue = model[-4] - train_reader = model[-3] - test_reader = model[-2] - dshapes = model[-1] - feed_thread = threading.Thread( - target=feed_data, - args=(feed_queue, train_reader, test_reader, dshapes, args)) - #feed_thread.setDaemon(True) - feed_thread.start() - model = model[:-4] - - train_args = list(model) + train_args = list(model_def.get_model(args)) train_args.append(args) - # Run optimizer.minimize(avg_loss) train_args[2].minimize(train_args[0]) if args.memory_optimize: @@ -396,7 +338,6 @@ def main(): train_args.extend([nccl_id_var, num_trainers, trainer_id]) train_parallel(*train_args) train(*train_args) - close_feed_queue() exit(0) # for other update methods, use default programs @@ -421,4 +362,3 @@ def main(): if __name__ == "__main__": main() - close_feed_queue() diff --git a/benchmark/fluid/models/machine_translation.py b/benchmark/fluid/models/machine_translation.py index 43f0368cd4..17f6b03826 100644 --- a/benchmark/fluid/models/machine_translation.py +++ b/benchmark/fluid/models/machine_translation.py @@ -182,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor): def get_model(args): - if args.use_reader_op or args.use_py_reader_op: + if args.use_reader_op: raise Exception("machine_translation do not support reader op for now.") embedding_dim = 512 encoder_size = 512 diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index fa5e1b6d64..8e740dc689 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -66,14 +66,13 @@ def cnn_model(data): def get_model(args): - dshape = [1, 28, 28] if args.use_reader_op: filelist = [ os.path.join(args.data_path, f) for f in os.listdir(args.data_path) ] data_file = fluid.layers.open_files( filenames=filelist, - shapes=[[-1] + dshape, (-1, 1)], + shapes=[[-1, 1, 28, 28], (-1, 1)], lod_levels=[0, 0], dtypes=["float32", "int64"], thread_num=args.gpus, @@ -82,18 +81,8 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) images, label = fluid.layers.read_file(data_file) - elif args.use_py_reader_op: - data_file, feed_queue = fluid.layers.py_array_reader( - capacity=args.feed_queue_capacity, - shapes=[[-1] + dshape, [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - images, label = fluid.layers.read_file(data_file) else: - images = fluid.layers.data(name='pixel', shape=dshape, dtype=DTYPE) + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) label = fluid.layers.data(name='label', shape=[1], dtype='int64') if args.device == 'CPU' and args.cpus > 1: @@ -129,16 +118,8 @@ def get_model(args): learning_rate=0.001, beta1=0.9, beta2=0.999) # Reader - underlying_train_reader = paddle.dataset.mnist.train() - underlying_test_reader = paddle.dataset.mnist.test() train_reader = paddle.batch( - underlying_train_reader, batch_size=args.batch_size * args.gpus) + paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( - underlying_test_reader, batch_size=args.batch_size) - - if not args.use_reader_op and args.use_py_reader_op: - return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc, \ - feed_queue, underlying_train_reader, underlying_test_reader, \ - (dshape, [1]) - else: - return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc + paddle.dataset.mnist.test(), batch_size=args.batch_size) + return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc diff --git a/benchmark/fluid/models/resnet.py b/benchmark/fluid/models/resnet.py index 7fb81b04fb..9ed1093c54 100644 --- a/benchmark/fluid/models/resnet.py +++ b/benchmark/fluid/models/resnet.py @@ -163,16 +163,6 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) input, label = fluid.layers.read_file(data_file) - elif args.use_py_reader_op: - data_file, feed_queue = fluid.layers.py_array_reader( - capacity=args.feed_queue_capacity, - shapes=[[-1] + dshape, [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - input, label = fluid.layers.read_file(data_file) else: input = fluid.layers.data(name='data', shape=dshape, dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') @@ -214,11 +204,5 @@ def get_model(args): batched_test_reader = paddle.batch( train_reader, batch_size=args.batch_size, drop_last=True) - if not args.use_reader_op and args.use_py_reader_op: - return avg_cost, inference_program, optimizer, batched_train_reader,\ - batched_test_reader, batch_acc, \ - feed_queue, train_reader, test_reader, \ - (dshape, [1]) - else: - return avg_cost, inference_program, optimizer, batched_train_reader,\ - batched_test_reader, batch_acc + return avg_cost, inference_program, optimizer, batched_train_reader,\ + batched_test_reader, batch_acc diff --git a/benchmark/fluid/models/stacked_dynamic_lstm.py b/benchmark/fluid/models/stacked_dynamic_lstm.py index 64c8cde15f..3231542a17 100644 --- a/benchmark/fluid/models/stacked_dynamic_lstm.py +++ b/benchmark/fluid/models/stacked_dynamic_lstm.py @@ -44,7 +44,7 @@ def crop_sentence(reader, crop_size): def get_model(args): - if args.use_reader_op or args.use_py_reader_op: + if args.use_reader_op: raise Exception( "stacked_dynamic_lstm do not support reader op for now.") lstm_size = 512 diff --git a/benchmark/fluid/models/vgg.py b/benchmark/fluid/models/vgg.py index 739681d4b6..932601302d 100644 --- a/benchmark/fluid/models/vgg.py +++ b/benchmark/fluid/models/vgg.py @@ -54,16 +54,12 @@ def vgg16_bn_drop(input): def get_model(args): if args.data_set == "cifar10": - underlying_train_reader = paddle.dataset.cifar.train10() - underlying_test_reader = paddle.dataset.cifar.test10() classdim = 10 if args.data_format == 'NCHW': data_shape = [3, 32, 32] else: data_shape = [32, 32, 3] else: - underlying_train_reader = paddle.dataset.flowers.train() - underlying_test_reader = paddle.dataset.flowers.test() classdim = 102 if args.data_format == 'NCHW': data_shape = [3, 224, 224] @@ -85,16 +81,6 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) images, label = fluid.layers.read_file(data_file) - elif args.use_py_reader_op: - data_file, feed_queue = fluid.layers.py_array_reader( - capacity=args.feed_queue_capacity, - shapes=[[-1] + data_shape, [-1, 1]], - lod_levels=[0, 0], - dtypes=["float32", "int64"]) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - images, label = fluid.layers.read_file(data_file) else: images = fluid.layers.data( name='data', shape=data_shape, dtype='float32') @@ -123,14 +109,13 @@ def get_model(args): # data reader train_reader = paddle.batch( paddle.reader.shuffle( - underlying_train_reader, buf_size=5120), + paddle.dataset.cifar.train10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), + buf_size=5120), batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( - underlying_test_reader, batch_size=args.batch_size) + paddle.dataset.cifar.test10() + if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), + batch_size=args.batch_size) - if not args.use_reader_op and args.use_py_reader_op: - return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, \ - feed_queue, underlying_train_reader, underlying_test_reader, \ - (data_shape, [1]) - else: - return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc + return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index b6016e1d20..a39c8a0053 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -24,7 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) -reader_library(create_py_array_reader_op SRCS create_py_array_reader_op.cc) +reader_library(create_py_reader_op SRCS create_py_reader_op.cc) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) # Export local libraries to parent diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 71684b1417..6befc868a7 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -38,6 +38,8 @@ class BlockingQueue { "The capacity of a reader::BlockingQueue must be greater than 0."); } + ~BlockingQueue() { Close(); } + bool Send(const T& elem) { std::unique_lock lock(mutex_); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); @@ -88,24 +90,29 @@ class BlockingQueue { receive_cv_.notify_all(); } - bool IsClosed() { + bool IsClosed() const { std::lock_guard lock(mutex_); return closed_; } - size_t Cap() { + size_t Cap() const { std::lock_guard lock(mutex_); return capacity_; } + size_t Size() const { + std::lock_guard lock(mutex_); + return queue_.size(); + } + private: size_t capacity_; bool closed_; std::deque queue_; - std::mutex mutex_; - std::condition_variable receive_cv_; - std::condition_variable send_cv_; + mutable std::mutex mutex_; + mutable std::condition_variable receive_cv_; + mutable std::condition_variable send_cv_; }; } // namespace reader } // namespace operators diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc new file mode 100644 index 0000000000..aac81d1813 --- /dev/null +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -0,0 +1,81 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" + +namespace paddle { +namespace operators { +namespace reader { + +class PyReader : public framework::ReaderBase { + public: + explicit PyReader(const std::shared_ptr& queue) { + PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null"); + queue_ = queue; + } + + void ReadNext(std::vector* out) override { + bool success; + *out = queue_->Dequeue(&success); + if (!success) out->clear(); + } + + void ReInit() override {} + + private: + std::shared_ptr queue_; +}; + +class CreatePyReaderOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + const std::string& queue_name = Input("blocking_queue"); + auto* queue_holder_var = scope.FindVar(queue_name); + PADDLE_ENFORCE( + queue_holder_var != nullptr, + "No LoDTensorBlockingQueueHolder variable with name %s found", + queue_name); + auto* queue_holder = + queue_holder_var->template GetMutable(); + auto* out = scope.FindVar(Output("Out")) + ->template GetMutable(); + out->Reset(new PyReader(queue_holder->GetQueue())); + } +}; + +class CreatePyReaderOpMaker : public FileReaderMakerBase { + protected: + void Apply() override { + AddInput("blocking_queue", + "Name of the `LoDTensorBlockingQueueHolder` variable"); + + AddComment(R"DOC( + Create PyReader to support LoDTensor data feeding in Python side. + )DOC"); + } +}; + +} // namespace reader +} // namespace operators +} // namespace paddle + +namespace reader = ::paddle::operators::reader; + +REGISTER_FILE_READER_OPERATOR(create_py_reader, reader::CreatePyReaderOp, + reader::CreatePyReaderOpMaker); diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h new file mode 100644 index 0000000000..a2129f6af4 --- /dev/null +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -0,0 +1,107 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/ddim.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/operators/reader/blocking_queue.h" +#include "paddle/fluid/platform/place.h" + +namespace paddle { +namespace operators { +namespace reader { + +class LoDTensorBlockingQueueHolder; + +class LoDTensorBlockingQueue { + friend class LoDTensorBlockingQueueHolder; + + private: + LoDTensorBlockingQueue(size_t capacity, + const std::vector& dims) + : dims_(dims) { + queue_.reset( + new BlockingQueue>(capacity)); + } + + public: + bool Enqueue(const std::vector& lod_tensor_vec) { + CheckDims(lod_tensor_vec); + return queue_->Send(lod_tensor_vec); + } + + bool Enqueue(std::vector&& lod_tensor_vec) { + CheckDims(lod_tensor_vec); + return queue_->Send(std::move(lod_tensor_vec)); + } + + std::vector Dequeue(bool* ok = nullptr) { + std::vector lod_tensor_vec; + bool success = queue_->Receive(&lod_tensor_vec); + if (ok != nullptr) *ok = success; + return lod_tensor_vec; + } + + inline size_t Cap() const { return queue_->Cap(); } + + inline size_t Size() const { return queue_->Size(); } + + inline void Close() { return queue_->Close(); } + + inline bool IsClosed() const { return queue_->IsClosed(); } + + private: + void CheckDims(const std::vector& lod_tensor_vec) { + PADDLE_ENFORCE(dims_.size() == lod_tensor_vec.size(), + "Expect input size is %d but found %s", dims_.size(), + lod_tensor_vec.size()); + for (size_t i = 0; i < dims_.size(); ++i) { + const auto& in_dims = lod_tensor_vec[i].dims(); + const auto& expect_dims = + framework::slice_ddim(dims_[i], 1, dims_[i].size()); + PADDLE_ENFORCE(in_dims == expect_dims, + "Dims of the %d-th input tensor does not match", i); + } + } + + std::unique_ptr>> queue_; + std::vector dims_; +}; + +class LoDTensorBlockingQueueHolder { + public: + void InitOnce(size_t capacity, const std::vector& dims) { + PADDLE_ENFORCE( + queue_ == nullptr, + "LoDTensorBlockingQueueHolder::InitOnce() can only be called once"); + queue_.reset(new LoDTensorBlockingQueue(capacity, dims)); + } + + inline std::shared_ptr GetQueue() { return queue_; } + + inline const std::shared_ptr& GetQueue() const { + return queue_; + } + + private: + std::shared_ptr queue_; +}; + +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 472595f6a8..6963a0c101 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -34,7 +34,7 @@ limitations under the License. */ #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/operators/activation_op.h" -#include "paddle/fluid/operators/reader/py_array_feed_queue.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/profiler.h" @@ -298,40 +298,38 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Reader", "") .def("reset", &framework::ReaderHolder::ReInit); - using PyArrayFeedQueue = ::paddle::operators::reader::PyArrayFeedQueue; - using PyArrayFeedQueueHolder = - ::paddle::operators::reader::PyArrayFeedQueueHolder; - using PyArray = ::paddle::operators::reader::PyArray; - py::class_(m, "PyArrayFeedQueue", "") - .def( - "enqueue", - [](PyArrayFeedQueue &self, const std::vector &py_array_vec) { - return self.Enqueue(py_array_vec); - }) + using LoDTensorBlockingQueue = + ::paddle::operators::reader::LoDTensorBlockingQueue; + using LoDTensorBlockingQueueHolder = + ::paddle::operators::reader::LoDTensorBlockingQueueHolder; + py::class_(m, "LoDTensorBlockingQueue", "") .def("enqueue", - [](PyArrayFeedQueue &self, + [](LoDTensorBlockingQueue &self, const std::vector &lod_tensor_vec) { + pybind11::gil_scoped_release release; return self.Enqueue(lod_tensor_vec); }) - .def("size", [](const PyArrayFeedQueue &self) { return self.Size(); }) - .def("capacity", [](const PyArrayFeedQueue &self) { return self.Cap(); }) - .def("close", [](PyArrayFeedQueue &self) { return self.Close(); }) + .def("size", + [](const LoDTensorBlockingQueue &self) { return self.Size(); }) + .def("capacity", + [](const LoDTensorBlockingQueue &self) { return self.Cap(); }) + .def("close", [](LoDTensorBlockingQueue &self) { return self.Close(); }) .def("is_closed", - [](const PyArrayFeedQueue &self) { return self.IsClosed(); }); + [](const LoDTensorBlockingQueue &self) { return self.IsClosed(); }); - m.def("init_py_array_feed_queue", + m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity, - const std::vector> &shapes, - const ::paddle::platform::Place &place) -> PyArrayFeedQueue * { - std::vector dims(shapes.size()); - std::transform(shapes.begin(), shapes.end(), dims.begin(), - [](const std::vector &shape) { - return make_ddim(shape); - }); - auto *holder = var.GetMutable(); - holder->InitOnce(capacity, dims, place); - return holder->GetFeeder().get(); - }, + const std::vector> &shapes) + -> LoDTensorBlockingQueue * { + std::vector dims(shapes.size()); + std::transform(shapes.begin(), shapes.end(), dims.begin(), + [](const std::vector &shape) { + return make_ddim(shape); + }); + auto *holder = var.GetMutable(); + holder->InitOnce(capacity, dims); + return holder->GetQueue().get(); + }, py::return_value_policy::reference); py::class_(m, "Scope", "") @@ -505,6 +503,7 @@ All parameter, weight, gradient are variables in Paddle. pybind11::gil_scoped_release release; self.Run(prog, scope, block_id, create_local_scope, create_vars); }); + m.def("init_gflags", framework::InitGflags); m.def("init_glog", framework::InitGLOG); m.def("init_devices", @@ -669,7 +668,12 @@ All parameter, weight, gradient are variables in Paddle. &ParallelExecutor::FeedTensorsIntoLocalScopes) .def("feed_and_split_tensor_into_local_scopes", &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) - .def("run", &ParallelExecutor::Run); + .def("run", [](ParallelExecutor &self, + const std::vector &fetch_tensors, + const std::string &fetched_var_name) { + pybind11::gil_scoped_release release; + self.Run(fetch_tensors, fetched_var_name); + }); BindRecordIOWriter(&m); return m.ptr(); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 811471c5fd..f3ab47c96b 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -24,8 +24,7 @@ from layer_function_generator import generate_layer_fn, templatedoc __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', 'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch', - 'double_buffer', 'random_data_generator', 'py_array_reader', 'Preprocessor', - 'load' + 'double_buffer', 'random_data_generator', 'Preprocessor', 'load' ] @@ -449,60 +448,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) -def py_array_reader(capacity, - shapes, - lod_levels, - dtypes, - place=None, - for_parallel=True): - - if place is None: - place = core.CPUPlace() - - if not isinstance(place, core.Place): - new_place = core.Place() - new_place.set_place(place) - place = new_place - - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - feeder_name = unique_name('py_array_feed_queue') - var = global_scope().var(feeder_name) - - #feed_shapes = [shape[1:] for shape in shapes] - feed_queue = core.init_py_array_feed_queue(var, capacity, shapes, place) - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var( - name=unique_name('create_py_array_reader')) - startup_blk.append_op( - type='create_py_array_reader', - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks, - 'feeder_name': feeder_name - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - if for_parallel: - main_prog_var = parallel(reader=main_prog_var) - - return monkey_patch_reader_methods(main_prog_var), feed_queue - - def open_files(filenames, shapes, lod_levels, -- GitLab