diff --git a/benchmark/fluid/args.py b/benchmark/fluid/args.py index 68a3d42d7a8a8082730f4cae3b5d4ea33819ca2f..dcd4ee2324a30f7e170a871baccef945fa8133ff 100644 --- a/benchmark/fluid/args.py +++ b/benchmark/fluid/args.py @@ -122,5 +122,15 @@ def parse_args(): type=str, default="", help='Directory that contains all the training recordio files.') + parser.add_argument( + '--use_py_reader_op', + action='store_true', + help='Whether to use Python reader op, omitted when use_reader_op is true' + ) + parser.add_argument( + '--feed_queue_capacity', + type=int, + default=64, + help='Capacity of feed queue when use_py_reader_op is true') args = parser.parse_args() return args diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index ece1102dce987cda994ff086b07f756498ce26e6..b5acb6549f4336101cd4a27eb795980dfedd5f1b 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -25,6 +25,9 @@ import paddle.fluid.profiler as profiler import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler from args import * +import threading + +feed_queue = None def append_nccl2_prepare(trainer_id): @@ -131,7 +134,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, exe = fluid.Executor(place) exe.run(startup_prog) - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: feed_var_list = [ var for var in train_prog.global_block().vars.itervalues() if var.is_data @@ -141,12 +144,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, iters, num_samples, start_time = 0, 0, time.time() for pass_id in range(args.pass_num): train_losses = [] - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: reader_generator = train_reader() batch_id = 0 data = None while True: - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: data = next(reader_generator, None) if data == None: break @@ -156,7 +159,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, start_time = time.time() num_samples = 0 - if args.use_reader_op: + if args.use_reader_op or args.use_py_reader_op: try: loss = exe.run(train_prog, fetch_list=[avg_loss]) except fluid.core.EnforceNotMet as ex: @@ -170,7 +173,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, # FIXME(wuyi): For use_reader_op, if the current # pass is not the last, the last batch of this pass # is also equal to args.batch_size. - if args.use_reader_op: + if args.use_reader_op or args.use_py_reader_op: num_samples += args.batch_size * args.gpus else: num_samples += len(data) @@ -180,12 +183,13 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, print_train_time(start_time, time.time(), num_samples) print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))), # evaluation - if not args.no_test and batch_acc and not args.use_reader_op: + if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: pass_test_acc = test(exe, infer_prog, test_reader, feeder, batch_acc) print(", Test Accuracy: %f" % pass_test_acc) print("\n") # TODO(wuyi): add warmup passes to get better perf data. + close_feed_queue() exit(0) @@ -195,7 +199,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, args, train_prog, startup_prog, nccl_id_var, num_trainers, trainer_id): place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: feed_var_list = [ var for var in train_prog.global_block().vars.itervalues() if var.is_data @@ -238,12 +242,12 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, num_samples = 0 iters = 0 start_time = time.time() - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: reader_generator = train_reader() batch_id = 0 data = None while True: - if not args.use_reader_op: + if not args.use_reader_op and not args.use_py_reader_op: data = next(reader_generator, None) if data == None: break @@ -257,14 +261,14 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, if iters == args.skip_batch_num: start_time = time.time() num_samples = 0 - if args.use_fake_data or args.use_reader_op: + if args.use_fake_data or args.use_reader_op or args.use_py_reader_op: try: loss, = exe.run([avg_loss.name]) except fluid.core.EnforceNotMet as ex: break else: loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) - if args.use_reader_op: + if args.use_reader_op or args.use_py_reader_op: num_samples += args.batch_size * args.gpus else: num_samples += len(data) @@ -275,7 +279,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_id += 1 print_train_time(start_time, time.time(), num_samples) - if not args.no_test and batch_acc and not args.use_reader_op: + if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: # we have not implement record io for test # skip test when use args.use_reader_op test_acc = test(startup_exe, infer_prog, test_reader, feeder, @@ -307,7 +311,46 @@ def print_paddle_envs(): print('------------------------------------------------') +def feed_data(feed_queue, train_reader, test_reader, dshapes, args): + train_cnt = 0 + test_cnt = 0 + print_per_train_batch = 1 + train_data_generator = train_reader() + start = time.time() + while True: + next_data = next(train_data_generator, None) + if next_data is None: + break + + next_data = list(next_data) + for i in range(len(next_data)): + if not isinstance(next_data[i], np.ndarray): + next_data[i] = np.array(next_data[i]) + next_data[i] = next_data[i].reshape([-1] + dshapes[i]) + + if not feed_queue.enqueue(next_data): + break + + train_cnt += 1 + ''' + if train_cnt % print_per_train_batch == 0: + end = time.time() + print('Feed queue size: %d, capacity: %d, speed: %.5fsec/batch' + % (feed_queue.size(), feed_queue.capacity(), (end-start)/print_per_train_batch)) + start = end + ''' + feed_queue.close() + + +def close_feed_queue(): + global feed_queue + if feed_queue is not None: + feed_queue.close() + + def main(): + global feed_queue + args = parse_args() print_arguments(args) print_paddle_envs() @@ -321,8 +364,23 @@ def main(): pr = cProfile.Profile() pr.enable() model_def = __import__("models.%s" % args.model, fromlist=["models"]) - train_args = list(model_def.get_model(args)) + model = model_def.get_model(args) + + if not args.use_reader_op and args.use_py_reader_op: + feed_queue = model[-4] + train_reader = model[-3] + test_reader = model[-2] + dshapes = model[-1] + feed_thread = threading.Thread( + target=feed_data, + args=(feed_queue, train_reader, test_reader, dshapes, args)) + #feed_thread.setDaemon(True) + feed_thread.start() + model = model[:-4] + + train_args = list(model) train_args.append(args) + # Run optimizer.minimize(avg_loss) train_args[2].minimize(train_args[0]) if args.memory_optimize: @@ -338,6 +396,7 @@ def main(): train_args.extend([nccl_id_var, num_trainers, trainer_id]) train_parallel(*train_args) train(*train_args) + close_feed_queue() exit(0) # for other update methods, use default programs @@ -362,3 +421,4 @@ def main(): if __name__ == "__main__": main() + close_feed_queue() diff --git a/benchmark/fluid/models/machine_translation.py b/benchmark/fluid/models/machine_translation.py index 17f6b03826ae818a3671ea7f9355a8e8c04b50be..43f0368cd4ee70a3b274ff1473e1efe7d2be95a7 100644 --- a/benchmark/fluid/models/machine_translation.py +++ b/benchmark/fluid/models/machine_translation.py @@ -182,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor): def get_model(args): - if args.use_reader_op: + if args.use_reader_op or args.use_py_reader_op: raise Exception("machine_translation do not support reader op for now.") embedding_dim = 512 encoder_size = 512 diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index 8e740dc6896b7eeeb82170aa13d32987c4df5c48..fa5e1b6d6418ef7788502da5ce9aa38c498e7b61 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -66,13 +66,14 @@ def cnn_model(data): def get_model(args): + dshape = [1, 28, 28] if args.use_reader_op: filelist = [ os.path.join(args.data_path, f) for f in os.listdir(args.data_path) ] data_file = fluid.layers.open_files( filenames=filelist, - shapes=[[-1, 1, 28, 28], (-1, 1)], + shapes=[[-1] + dshape, (-1, 1)], lod_levels=[0, 0], dtypes=["float32", "int64"], thread_num=args.gpus, @@ -81,8 +82,18 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) images, label = fluid.layers.read_file(data_file) + elif args.use_py_reader_op: + data_file, feed_queue = fluid.layers.py_array_reader( + capacity=args.feed_queue_capacity, + shapes=[[-1] + dshape, [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) else: - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + images = fluid.layers.data(name='pixel', shape=dshape, dtype=DTYPE) label = fluid.layers.data(name='label', shape=[1], dtype='int64') if args.device == 'CPU' and args.cpus > 1: @@ -118,8 +129,16 @@ def get_model(args): learning_rate=0.001, beta1=0.9, beta2=0.999) # Reader + underlying_train_reader = paddle.dataset.mnist.train() + underlying_test_reader = paddle.dataset.mnist.test() train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus) + underlying_train_reader, batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=args.batch_size) - return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc + underlying_test_reader, batch_size=args.batch_size) + + if not args.use_reader_op and args.use_py_reader_op: + return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc, \ + feed_queue, underlying_train_reader, underlying_test_reader, \ + (dshape, [1]) + else: + return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc diff --git a/benchmark/fluid/models/resnet.py b/benchmark/fluid/models/resnet.py index 9ed1093c54a501cc93dbbf9c3651fe70914ce26b..7fb81b04fbd8e90964125c63f13570c1edc1bde4 100644 --- a/benchmark/fluid/models/resnet.py +++ b/benchmark/fluid/models/resnet.py @@ -163,6 +163,16 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) input, label = fluid.layers.read_file(data_file) + elif args.use_py_reader_op: + data_file, feed_queue = fluid.layers.py_array_reader( + capacity=args.feed_queue_capacity, + shapes=[[-1] + dshape, [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + input, label = fluid.layers.read_file(data_file) else: input = fluid.layers.data(name='data', shape=dshape, dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') @@ -204,5 +214,11 @@ def get_model(args): batched_test_reader = paddle.batch( train_reader, batch_size=args.batch_size, drop_last=True) - return avg_cost, inference_program, optimizer, batched_train_reader,\ - batched_test_reader, batch_acc + if not args.use_reader_op and args.use_py_reader_op: + return avg_cost, inference_program, optimizer, batched_train_reader,\ + batched_test_reader, batch_acc, \ + feed_queue, train_reader, test_reader, \ + (dshape, [1]) + else: + return avg_cost, inference_program, optimizer, batched_train_reader,\ + batched_test_reader, batch_acc diff --git a/benchmark/fluid/models/stacked_dynamic_lstm.py b/benchmark/fluid/models/stacked_dynamic_lstm.py index 3231542a17ace99a17c9f9b9bdb3c2527637d9ef..64c8cde15f0ec476fd124207d9fa854eeff9009e 100644 --- a/benchmark/fluid/models/stacked_dynamic_lstm.py +++ b/benchmark/fluid/models/stacked_dynamic_lstm.py @@ -44,7 +44,7 @@ def crop_sentence(reader, crop_size): def get_model(args): - if args.use_reader_op: + if args.use_reader_op or args.use_py_reader_op: raise Exception( "stacked_dynamic_lstm do not support reader op for now.") lstm_size = 512 diff --git a/benchmark/fluid/models/vgg.py b/benchmark/fluid/models/vgg.py index 932601302d2f5d56b53e3462af886429034d8989..739681d4b66f0128bea11d5b88909e1bb2ff54e3 100644 --- a/benchmark/fluid/models/vgg.py +++ b/benchmark/fluid/models/vgg.py @@ -54,12 +54,16 @@ def vgg16_bn_drop(input): def get_model(args): if args.data_set == "cifar10": + underlying_train_reader = paddle.dataset.cifar.train10() + underlying_test_reader = paddle.dataset.cifar.test10() classdim = 10 if args.data_format == 'NCHW': data_shape = [3, 32, 32] else: data_shape = [32, 32, 3] else: + underlying_train_reader = paddle.dataset.flowers.train() + underlying_test_reader = paddle.dataset.flowers.test() classdim = 102 if args.data_format == 'NCHW': data_shape = [3, 224, 224] @@ -81,6 +85,16 @@ def get_model(args): fluid.layers.batch( data_file, batch_size=args.batch_size)) images, label = fluid.layers.read_file(data_file) + elif args.use_py_reader_op: + data_file, feed_queue = fluid.layers.py_array_reader( + capacity=args.feed_queue_capacity, + shapes=[[-1] + data_shape, [-1, 1]], + lod_levels=[0, 0], + dtypes=["float32", "int64"]) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) else: images = fluid.layers.data( name='data', shape=data_shape, dtype='float32') @@ -109,13 +123,14 @@ def get_model(args): # data reader train_reader = paddle.batch( paddle.reader.shuffle( - paddle.dataset.cifar.train10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), - buf_size=5120), + underlying_train_reader, buf_size=5120), batch_size=args.batch_size * args.gpus) test_reader = paddle.batch( - paddle.dataset.cifar.test10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), - batch_size=args.batch_size) + underlying_test_reader, batch_size=args.batch_size) - return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc + if not args.use_reader_op and args.use_py_reader_op: + return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, \ + feed_queue, underlying_train_reader, underlying_test_reader, \ + (data_shape, [1]) + else: + return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 62532036f86bfb82465ccd9e0ec526299489932a..b6016e1d2017809223d15b335f2c40ff8271ea98 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -24,6 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) +reader_library(create_py_array_reader_op SRCS create_py_array_reader_op.cc) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) # Export local libraries to parent diff --git a/paddle/fluid/operators/reader/create_py_array_reader_op.cc b/paddle/fluid/operators/reader/create_py_array_reader_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..ed7ef4affb608f0febf128a7162596d883cbb75e --- /dev/null +++ b/paddle/fluid/operators/reader/create_py_array_reader_op.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/reader/py_array_feed_queue.h" + +namespace paddle { +namespace operators { +namespace reader { + +class PyArrayReader : public framework::ReaderBase { + public: + explicit PyArrayReader(const std::shared_ptr& queue) { + PADDLE_ENFORCE(queue != nullptr, "PyArrayFeedQueue must not be null"); + queue_ = queue; + } + + void ReadNext(std::vector* out) override { + *out = queue_->Dequeue(); + } + + void ReInit() override { + // PADDLE_THROW("PyArrayReader does not support ReInit()"); + } + + private: + std::shared_ptr queue_; +}; + +class CreatePyArrayReaderOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + const std::string& feeder_name = Attr("feeder_name"); + auto* feeder_holder_var = scope.FindVar(feeder_name); + PADDLE_ENFORCE(feeder_holder_var != nullptr, + "No PyArrayFeedQueue variable with name %s found", + feeder_name); + auto* feeder_holder = + feeder_holder_var->template GetMutable(); + auto* out = scope.FindVar(Output("Out")) + ->template GetMutable(); + out->Reset(new PyArrayReader(feeder_holder->GetFeeder())); + } +}; + +class CreatePyArrayReaderOpMaker : public FileReaderMakerBase { + protected: + void Apply() override { + AddAttr("feeder_name", + "Name of the `PyArrayFeedQueueHolder` variable"); + + AddComment(R"DOC( + Create PyArrayReader to accept Python data feeding. + )DOC"); + } +}; + +} // namespace reader +} // namespace operators +} // namespace paddle + +namespace reader = ::paddle::operators::reader; + +REGISTER_FILE_READER_OPERATOR(create_py_array_reader, + reader::CreatePyArrayReaderOp, + reader::CreatePyArrayReaderOpMaker); diff --git a/paddle/fluid/operators/reader/py_array_feed_queue.h b/paddle/fluid/operators/reader/py_array_feed_queue.h new file mode 100644 index 0000000000000000000000000000000000000000..f9552f73a66e4d75b9ae60c23efe5d4cb93fa62b --- /dev/null +++ b/paddle/fluid/operators/reader/py_array_feed_queue.h @@ -0,0 +1,207 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include //NOLINT +#include +#include // NOLINT +#include +#include "glog/logging.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/operators/reader/py_blocking_queue.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" +#include "paddle/fluid/pybind/tensor_py.h" + +namespace paddle { +namespace operators { +namespace reader { + +using PyTuple = ::pybind11::tuple; +using PyArray = ::pybind11::array; + +template +using PyArrayT = ::pybind11::array_t; + +class PyArrayToTensorVisitor : public boost::static_visitor { + public: +#define PY_ARRAY_TO_TENSOR_WITH_TYPE(dtype, func_name) \ + pybind::func_name(tensor_, static_cast&>(py_array_), \ + place) + +#define PY_ARRAY_TO_TENSOR(func_name) \ + if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(size_t, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(int64_t, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(int32_t, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(int16_t, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(uint8_t, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(float, func_name); \ + } else if (IsType()) { \ + PY_ARRAY_TO_TENSOR_WITH_TYPE(double, func_name); \ + } else { \ + PADDLE_THROW("unsupported dtype of python array"); \ + } + + PyArrayToTensorVisitor(const PyArray& py_array, framework::Tensor* tensor) + : py_array_(py_array), tensor_(tensor) {} + + void operator()(const platform::CPUPlace& place) { + PY_ARRAY_TO_TENSOR(PyCPUTensorSetFromArray); + } + + void operator()(const platform::CUDAPlace& place) { +#ifdef PADDLE_WITH_CUDA + PY_ARRAY_TO_TENSOR(PyCUDATensorSetFromArray); +#else + PADDLE_THROW("CUDAPlace is not supported in CPU only version"); +#endif + } + + void operator()(const platform::CUDAPinnedPlace& place) { +#ifdef PADDLE_WITH_CUDA + PY_ARRAY_TO_TENSOR(PyCUDAPinnedTensorSetFromArray); +#else + PADDLE_THROW("CUDAPinnedPlace is not supported in CPU only version"); +#endif + } + +#undef PY_ARRAY_TO_TENSOR +#undef PY_ARRAY_TO_TENSOR_WITH_TYPE + + private: + template + inline bool IsType() const { + return ::pybind11::isinstance>(py_array_); + } + + private: + const PyArray& py_array_; + framework::Tensor* tensor_; +}; + +class PyArrayFeedQueueHolder; + +// PyArrayFeedQueue must be thread-safe +class PyArrayFeedQueue { + friend class PyArrayFeedQueueHolder; + + private: + PyArrayFeedQueue(size_t capacity, const std::vector& dims, + const platform::Place& place) + : dims_(dims), place_(place) { + queue_.reset( + new PyBlockingQueue>(capacity)); + } + + public: + ~PyArrayFeedQueue() { Close(); } + + bool Enqueue(const std::vector& py_array_vec) { + auto lod_tensor_vec = PyArrayVecToLoDTensorVec(py_array_vec); + VLOG(5) << "Enqueue at address " << reinterpret_cast(this); + return queue_->Send(std::move(lod_tensor_vec)); + } + + bool Enqueue(const std::vector& tensor_vec) { + VLOG(5) << "Enqueue at address " << reinterpret_cast(this); + return queue_->Send(tensor_vec); + } + + std::vector Dequeue() { + VLOG(5) << "Dequeue at address " << reinterpret_cast(this); + std::vector ret; + return queue_->Receive(&ret) ? ret : std::vector(); + } + + inline size_t Size() const { return queue_->Size(); } + + inline size_t Cap() const { return queue_->Cap(); } + + inline bool IsClosed() const { return queue_->IsClosed(); } + + inline void Close() { queue_->Close(); } + + private: + std::vector PyArrayVecToLoDTensorVec( + const std::vector& py_array_vec) { + PADDLE_ENFORCE(dims_.size() == py_array_vec.size(), + "expected input tensor number %d but found %d", dims_.size(), + py_array_vec.size()); + + size_t i = 0; + if (py_array_vec.size() > 1) { + size_t dim0 = py_array_vec[0].shape()[0]; + for (size_t j = 1; j < py_array_vec.size(); ++j) { + PADDLE_ENFORCE(dim0 == py_array_vec[j].shape()[0], + "0-dim of the %d-th input tensor is %d, but 0-dim of " + "the 0-th input tensor is %d", + j, py_array_vec[j].shape()[0], dim0); + } + } + + std::vector lod_tensor_vec; + lod_tensor_vec.reserve(py_array_vec.size()); + + std::for_each( + py_array_vec.begin(), py_array_vec.end(), [&](const PyArray& py_array) { + for (int64_t j = 1; j < dims_[i].size(); ++j) { + PADDLE_ENFORCE( + dims_[i][j] == static_cast(py_array.shape()[j]), + "expected %d-dim of %d-th input tensor is %d but found %d", j, + i, dims_[i][j], py_array.shape()[j]); + } + + lod_tensor_vec.emplace_back(framework::LoDTensor()); + PyArrayToTensorVisitor visitor(py_array, &(lod_tensor_vec.back())); + boost::apply_visitor(visitor, place_); + ++i; + }); + return lod_tensor_vec; + } + + std::unique_ptr>> queue_; + std::vector dims_; + platform::Place place_; +}; + +class PyArrayFeedQueueHolder { + public: + PyArrayFeedQueueHolder() {} + + void InitOnce(size_t capacity, const std::vector& dims, + const platform::Place& place) { + PADDLE_ENFORCE( + feeder_ == nullptr, + "PyArrayFeedQueueHolder::InitOnce() can only be called once"); + feeder_.reset(new PyArrayFeedQueue(capacity, dims, place)); + } + + std::shared_ptr GetFeeder() { return feeder_; } + const std::shared_ptr& GetFeeder() const { return feeder_; } + + private: + std::shared_ptr feeder_; +}; + +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/py_blocking_queue.h b/paddle/fluid/operators/reader/py_blocking_queue.h new file mode 100644 index 0000000000000000000000000000000000000000..721767102bda83e92073ca03147f4c2925d9dcc8 --- /dev/null +++ b/paddle/fluid/operators/reader/py_blocking_queue.h @@ -0,0 +1,125 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include // NOLINT +#include + +#include "Python.h" +#include "paddle/fluid/platform/enforce.h" +#include "pybind11/pybind11.h" + +namespace paddle { +namespace operators { +namespace reader { + +// PyBlockingQueue is designed for PyArrayFeedQueue +// PyBlockingQueue would release GIL of Python when +// the queue is full to avoid deadlock. +template +class PyBlockingQueue { + public: + explicit PyBlockingQueue(size_t capacity) + : capacity_(capacity), closed_(false) { + PADDLE_ENFORCE_GT( + capacity_, 0, + "The capacity of a reader::PyBlockingQueue must be greater than 0."); + } + + ~PyBlockingQueue() { Close(); } + + bool Send(const T& elem) { + std::unique_lock lock(mutex_); + receive_cv_.notify_one(); + if (queue_.size() >= capacity_ && (!closed_)) { + pybind11::gil_scoped_release release; + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + } + if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlockingQueue."; + return false; + } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + return true; + } + + bool Send(T&& elem) { + std::unique_lock lock(mutex_); + receive_cv_.notify_one(); + if (queue_.size() >= capacity_ && (!closed_)) { + pybind11::gil_scoped_release release; + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + } + if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; + return false; + } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + return true; + } + + bool Receive(T* elem) { + std::unique_lock lock(mutex_); + send_cv_.notify_one(); + receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; }); + if (!queue_.empty()) { + PADDLE_ENFORCE_NOT_NULL(elem); + *elem = queue_.front(); + queue_.pop_front(); + return true; + } else { + PADDLE_ENFORCE(closed_); + return false; + } + } + + void Close() { + std::lock_guard lock(mutex_); + closed_ = true; + send_cv_.notify_all(); + receive_cv_.notify_all(); + } + + bool IsClosed() const { + std::lock_guard lock(mutex_); + return closed_; + } + + size_t Cap() const { + std::lock_guard lock(mutex_); + return capacity_; + } + + size_t Size() const { + std::lock_guard lock(mutex_); + return queue_.size(); + } + + private: + size_t capacity_; + bool closed_; + std::deque queue_; + + mutable std::mutex mutex_; + mutable std::condition_variable receive_cv_; + mutable std::condition_variable send_cv_; +}; +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 5a45e431df993febab676f22da7116d84e441548..472595f6a81bb8138e6aecb7c09451094f9b7012 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -34,6 +34,7 @@ limitations under the License. */ #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/operators/activation_op.h" +#include "paddle/fluid/operators/reader/py_array_feed_queue.h" #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/profiler.h" @@ -297,6 +298,42 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Reader", "") .def("reset", &framework::ReaderHolder::ReInit); + using PyArrayFeedQueue = ::paddle::operators::reader::PyArrayFeedQueue; + using PyArrayFeedQueueHolder = + ::paddle::operators::reader::PyArrayFeedQueueHolder; + using PyArray = ::paddle::operators::reader::PyArray; + py::class_(m, "PyArrayFeedQueue", "") + .def( + "enqueue", + [](PyArrayFeedQueue &self, const std::vector &py_array_vec) { + return self.Enqueue(py_array_vec); + }) + .def("enqueue", + [](PyArrayFeedQueue &self, + const std::vector &lod_tensor_vec) { + return self.Enqueue(lod_tensor_vec); + }) + .def("size", [](const PyArrayFeedQueue &self) { return self.Size(); }) + .def("capacity", [](const PyArrayFeedQueue &self) { return self.Cap(); }) + .def("close", [](PyArrayFeedQueue &self) { return self.Close(); }) + .def("is_closed", + [](const PyArrayFeedQueue &self) { return self.IsClosed(); }); + + m.def("init_py_array_feed_queue", + [](Variable &var, size_t capacity, + const std::vector> &shapes, + const ::paddle::platform::Place &place) -> PyArrayFeedQueue * { + std::vector dims(shapes.size()); + std::transform(shapes.begin(), shapes.end(), dims.begin(), + [](const std::vector &shape) { + return make_ddim(shape); + }); + auto *holder = var.GetMutable(); + holder->InitOnce(capacity, dims, place); + return holder->GetFeeder().get(); + }, + py::return_value_policy::reference); + py::class_(m, "Scope", "") .def("var", [](Scope &self, const std::string &name) -> Variable * { @@ -463,10 +500,11 @@ All parameter, weight, gradient are variables in Paddle. #ifdef PADDLE_WITH_DISTRIBUTE .def("complete", &Executor::Complete) #endif - .def("run", - (void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) & - Executor::Run); - + .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope, + int block_id, bool create_local_scope, bool create_vars) { + pybind11::gil_scoped_release release; + self.Run(prog, scope, block_id, create_local_scope, create_vars); + }); m.def("init_gflags", framework::InitGflags); m.def("init_glog", framework::InitGLOG); m.def("init_devices", diff --git a/paddle/fluid/pybind/tensor_py.h b/paddle/fluid/pybind/tensor_py.h index 6da3846ac69980daac4f0fb7401b2573c21c89bf..3e2ea1ef88b03f5b2576c1cee2b5d26a439943da 100644 --- a/paddle/fluid/pybind/tensor_py.h +++ b/paddle/fluid/pybind/tensor_py.h @@ -146,7 +146,7 @@ void PyCPUTensorSetFromArray( template <> // This following specialization maps uint16_t in the parameter type to // platform::float16. -void PyCPUTensorSetFromArray( +inline void PyCPUTensorSetFromArray( framework::Tensor *self, pybind11::array_t @@ -185,7 +185,7 @@ void PyCUDATensorSetFromArray( template <> // This following specialization maps uint16_t in the parameter type to // platform::float16. -void PyCUDATensorSetFromArray( +inline void PyCUDATensorSetFromArray( framework::Tensor *self, pybind11::array_t @@ -224,7 +224,7 @@ void PyCUDAPinnedTensorSetFromArray( template <> // This following specialization maps uint16_t in the parameter type to // platform::float16. -void PyCUDAPinnedTensorSetFromArray( +inline void PyCUDAPinnedTensorSetFromArray( framework::Tensor *self, pybind11::array_t diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index f3ab47c96b1caa2facfd6d191af014b4c7380cbc..3773653bcdd7a0d79261b077e44783d7e4633e79 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -24,7 +24,8 @@ from layer_function_generator import generate_layer_fn, templatedoc __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', 'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch', - 'double_buffer', 'random_data_generator', 'Preprocessor', 'load' + 'double_buffer', 'random_data_generator', 'py_array_reader', 'Preprocessor', + 'load' ] @@ -448,6 +449,61 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) +# UNCHECK(zengjinle) +def py_array_reader(capacity, + shapes, + lod_levels, + dtypes, + place=None, + for_parallel=True): + + if place is None: + place = core.CPUPlace() + + if not isinstance(place, core.Place): + new_place = core.Place() + new_place.set_place(place) + place = new_place + + dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] + shape_concat = [] + ranks = [] + + for shape in shapes: + shape_concat.extend(shape) + ranks.append(len(shape)) + + feeder_name = unique_name('py_array_feed_queue') + var = global_scope().var(feeder_name) + + #feed_shapes = [shape[1:] for shape in shapes] + feed_queue = core.init_py_array_feed_queue(var, capacity, shapes, place) + + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var( + name=unique_name('create_py_array_reader')) + startup_blk.append_op( + type='create_py_array_reader', + outputs={'Out': [startup_var]}, + attrs={ + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks, + 'feeder_name': feeder_name + }) + + startup_var.desc.set_dtypes(dtypes) + startup_var.persistable = True + + main_prog_var = _copy_reader_var_(default_main_program().current_block(), + startup_var) + + if for_parallel: + main_prog_var = parallel(reader=main_prog_var) + + return monkey_patch_reader_methods(main_prog_var), feed_queue + + def open_files(filenames, shapes, lod_levels,