提交 502faf62 编写于 作者: S sneaxiy

complete_py_reader_cpp

上级 7b2339d7
...@@ -122,15 +122,5 @@ def parse_args(): ...@@ -122,15 +122,5 @@ def parse_args():
type=str, type=str,
default="", default="",
help='Directory that contains all the training recordio files.') help='Directory that contains all the training recordio files.')
parser.add_argument(
'--use_py_reader_op',
action='store_true',
help='Whether to use Python reader op, omitted when use_reader_op is true'
)
parser.add_argument(
'--feed_queue_capacity',
type=int,
default=64,
help='Capacity of feed queue when use_py_reader_op is true')
args = parser.parse_args() args = parser.parse_args()
return args return args
...@@ -25,9 +25,6 @@ import paddle.fluid.profiler as profiler ...@@ -25,9 +25,6 @@ import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
from args import * from args import *
import threading
feed_queue = None
def append_nccl2_prepare(trainer_id): def append_nccl2_prepare(trainer_id):
...@@ -134,7 +131,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -134,7 +131,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_prog) exe.run(startup_prog)
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
feed_var_list = [ feed_var_list = [
var for var in train_prog.global_block().vars.itervalues() var for var in train_prog.global_block().vars.itervalues()
if var.is_data if var.is_data
...@@ -144,12 +141,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -144,12 +141,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
iters, num_samples, start_time = 0, 0, time.time() iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
train_losses = [] train_losses = []
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
reader_generator = train_reader() reader_generator = train_reader()
batch_id = 0 batch_id = 0
data = None data = None
while True: while True:
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
data = next(reader_generator, None) data = next(reader_generator, None)
if data == None: if data == None:
break break
...@@ -159,7 +156,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -159,7 +156,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
if args.use_reader_op or args.use_py_reader_op: if args.use_reader_op:
try: try:
loss = exe.run(train_prog, fetch_list=[avg_loss]) loss = exe.run(train_prog, fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex: except fluid.core.EnforceNotMet as ex:
...@@ -173,7 +170,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -173,7 +170,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
# FIXME(wuyi): For use_reader_op, if the current # FIXME(wuyi): For use_reader_op, if the current
# pass is not the last, the last batch of this pass # pass is not the last, the last batch of this pass
# is also equal to args.batch_size. # is also equal to args.batch_size.
if args.use_reader_op or args.use_py_reader_op: if args.use_reader_op:
num_samples += args.batch_size * args.gpus num_samples += args.batch_size * args.gpus
else: else:
num_samples += len(data) num_samples += len(data)
...@@ -183,13 +180,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -183,13 +180,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))), print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
# evaluation # evaluation
if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: if not args.no_test and batch_acc and not args.use_reader_op:
pass_test_acc = test(exe, infer_prog, test_reader, feeder, pass_test_acc = test(exe, infer_prog, test_reader, feeder,
batch_acc) batch_acc)
print(", Test Accuracy: %f" % pass_test_acc) print(", Test Accuracy: %f" % pass_test_acc)
print("\n") print("\n")
# TODO(wuyi): add warmup passes to get better perf data. # TODO(wuyi): add warmup passes to get better perf data.
close_feed_queue()
exit(0) exit(0)
...@@ -199,7 +195,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -199,7 +195,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_acc, args, train_prog, startup_prog, nccl_id_var, batch_acc, args, train_prog, startup_prog, nccl_id_var,
num_trainers, trainer_id): num_trainers, trainer_id):
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
feed_var_list = [ feed_var_list = [
var for var in train_prog.global_block().vars.itervalues() var for var in train_prog.global_block().vars.itervalues()
if var.is_data if var.is_data
...@@ -242,12 +238,12 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -242,12 +238,12 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
num_samples = 0 num_samples = 0
iters = 0 iters = 0
start_time = time.time() start_time = time.time()
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
reader_generator = train_reader() reader_generator = train_reader()
batch_id = 0 batch_id = 0
data = None data = None
while True: while True:
if not args.use_reader_op and not args.use_py_reader_op: if not args.use_reader_op:
data = next(reader_generator, None) data = next(reader_generator, None)
if data == None: if data == None:
break break
...@@ -261,14 +257,14 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -261,14 +257,14 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if iters == args.skip_batch_num: if iters == args.skip_batch_num:
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
if args.use_fake_data or args.use_reader_op or args.use_py_reader_op: if args.use_fake_data or args.use_reader_op:
try: try:
loss, = exe.run([avg_loss.name]) loss, = exe.run([avg_loss.name])
except fluid.core.EnforceNotMet as ex: except fluid.core.EnforceNotMet as ex:
break break
else: else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.use_reader_op or args.use_py_reader_op: if args.use_reader_op:
num_samples += args.batch_size * args.gpus num_samples += args.batch_size * args.gpus
else: else:
num_samples += len(data) num_samples += len(data)
...@@ -279,7 +275,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -279,7 +275,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_id += 1 batch_id += 1
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op: if not args.no_test and batch_acc and not args.use_reader_op:
# we have not implement record io for test # we have not implement record io for test
# skip test when use args.use_reader_op # skip test when use args.use_reader_op
test_acc = test(startup_exe, infer_prog, test_reader, feeder, test_acc = test(startup_exe, infer_prog, test_reader, feeder,
...@@ -311,46 +307,7 @@ def print_paddle_envs(): ...@@ -311,46 +307,7 @@ def print_paddle_envs():
print('------------------------------------------------') print('------------------------------------------------')
def feed_data(feed_queue, train_reader, test_reader, dshapes, args):
train_cnt = 0
test_cnt = 0
print_per_train_batch = 1
train_data_generator = train_reader()
start = time.time()
while True:
next_data = next(train_data_generator, None)
if next_data is None:
break
next_data = list(next_data)
for i in range(len(next_data)):
if not isinstance(next_data[i], np.ndarray):
next_data[i] = np.array(next_data[i])
next_data[i] = next_data[i].reshape([-1] + dshapes[i])
if not feed_queue.enqueue(next_data):
break
train_cnt += 1
'''
if train_cnt % print_per_train_batch == 0:
end = time.time()
print('Feed queue size: %d, capacity: %d, speed: %.5fsec/batch'
% (feed_queue.size(), feed_queue.capacity(), (end-start)/print_per_train_batch))
start = end
'''
feed_queue.close()
def close_feed_queue():
global feed_queue
if feed_queue is not None:
feed_queue.close()
def main(): def main():
global feed_queue
args = parse_args() args = parse_args()
print_arguments(args) print_arguments(args)
print_paddle_envs() print_paddle_envs()
...@@ -364,23 +321,8 @@ def main(): ...@@ -364,23 +321,8 @@ def main():
pr = cProfile.Profile() pr = cProfile.Profile()
pr.enable() pr.enable()
model_def = __import__("models.%s" % args.model, fromlist=["models"]) model_def = __import__("models.%s" % args.model, fromlist=["models"])
model = model_def.get_model(args) train_args = list(model_def.get_model(args))
if not args.use_reader_op and args.use_py_reader_op:
feed_queue = model[-4]
train_reader = model[-3]
test_reader = model[-2]
dshapes = model[-1]
feed_thread = threading.Thread(
target=feed_data,
args=(feed_queue, train_reader, test_reader, dshapes, args))
#feed_thread.setDaemon(True)
feed_thread.start()
model = model[:-4]
train_args = list(model)
train_args.append(args) train_args.append(args)
# Run optimizer.minimize(avg_loss) # Run optimizer.minimize(avg_loss)
train_args[2].minimize(train_args[0]) train_args[2].minimize(train_args[0])
if args.memory_optimize: if args.memory_optimize:
...@@ -396,7 +338,6 @@ def main(): ...@@ -396,7 +338,6 @@ def main():
train_args.extend([nccl_id_var, num_trainers, trainer_id]) train_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*train_args) train_parallel(*train_args)
train(*train_args) train(*train_args)
close_feed_queue()
exit(0) exit(0)
# for other update methods, use default programs # for other update methods, use default programs
...@@ -421,4 +362,3 @@ def main(): ...@@ -421,4 +362,3 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()
close_feed_queue()
...@@ -182,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor): ...@@ -182,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor):
def get_model(args): def get_model(args):
if args.use_reader_op or args.use_py_reader_op: if args.use_reader_op:
raise Exception("machine_translation do not support reader op for now.") raise Exception("machine_translation do not support reader op for now.")
embedding_dim = 512 embedding_dim = 512
encoder_size = 512 encoder_size = 512
......
...@@ -66,14 +66,13 @@ def cnn_model(data): ...@@ -66,14 +66,13 @@ def cnn_model(data):
def get_model(args): def get_model(args):
dshape = [1, 28, 28]
if args.use_reader_op: if args.use_reader_op:
filelist = [ filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path) os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
] ]
data_file = fluid.layers.open_files( data_file = fluid.layers.open_files(
filenames=filelist, filenames=filelist,
shapes=[[-1] + dshape, (-1, 1)], shapes=[[-1, 1, 28, 28], (-1, 1)],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=["float32", "int64"], dtypes=["float32", "int64"],
thread_num=args.gpus, thread_num=args.gpus,
...@@ -82,18 +81,8 @@ def get_model(args): ...@@ -82,18 +81,8 @@ def get_model(args):
fluid.layers.batch( fluid.layers.batch(
data_file, batch_size=args.batch_size)) data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file) images, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + dshape, [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else: else:
images = fluid.layers.data(name='pixel', shape=dshape, dtype=DTYPE) images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.device == 'CPU' and args.cpus > 1: if args.device == 'CPU' and args.cpus > 1:
...@@ -129,16 +118,8 @@ def get_model(args): ...@@ -129,16 +118,8 @@ def get_model(args):
learning_rate=0.001, beta1=0.9, beta2=0.999) learning_rate=0.001, beta1=0.9, beta2=0.999)
# Reader # Reader
underlying_train_reader = paddle.dataset.mnist.train()
underlying_test_reader = paddle.dataset.mnist.test()
train_reader = paddle.batch( train_reader = paddle.batch(
underlying_train_reader, batch_size=args.batch_size * args.gpus) paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch( test_reader = paddle.batch(
underlying_test_reader, batch_size=args.batch_size) paddle.dataset.mnist.test(), batch_size=args.batch_size)
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc
if not args.use_reader_op and args.use_py_reader_op:
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc, \
feed_queue, underlying_train_reader, underlying_test_reader, \
(dshape, [1])
else:
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc
...@@ -163,16 +163,6 @@ def get_model(args): ...@@ -163,16 +163,6 @@ def get_model(args):
fluid.layers.batch( fluid.layers.batch(
data_file, batch_size=args.batch_size)) data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file) input, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + dshape, [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file)
else: else:
input = fluid.layers.data(name='data', shape=dshape, dtype='float32') input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
...@@ -214,11 +204,5 @@ def get_model(args): ...@@ -214,11 +204,5 @@ def get_model(args):
batched_test_reader = paddle.batch( batched_test_reader = paddle.batch(
train_reader, batch_size=args.batch_size, drop_last=True) train_reader, batch_size=args.batch_size, drop_last=True)
if not args.use_reader_op and args.use_py_reader_op: return avg_cost, inference_program, optimizer, batched_train_reader,\
return avg_cost, inference_program, optimizer, batched_train_reader,\ batched_test_reader, batch_acc
batched_test_reader, batch_acc, \
feed_queue, train_reader, test_reader, \
(dshape, [1])
else:
return avg_cost, inference_program, optimizer, batched_train_reader,\
batched_test_reader, batch_acc
...@@ -44,7 +44,7 @@ def crop_sentence(reader, crop_size): ...@@ -44,7 +44,7 @@ def crop_sentence(reader, crop_size):
def get_model(args): def get_model(args):
if args.use_reader_op or args.use_py_reader_op: if args.use_reader_op:
raise Exception( raise Exception(
"stacked_dynamic_lstm do not support reader op for now.") "stacked_dynamic_lstm do not support reader op for now.")
lstm_size = 512 lstm_size = 512
......
...@@ -54,16 +54,12 @@ def vgg16_bn_drop(input): ...@@ -54,16 +54,12 @@ def vgg16_bn_drop(input):
def get_model(args): def get_model(args):
if args.data_set == "cifar10": if args.data_set == "cifar10":
underlying_train_reader = paddle.dataset.cifar.train10()
underlying_test_reader = paddle.dataset.cifar.test10()
classdim = 10 classdim = 10
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
data_shape = [3, 32, 32] data_shape = [3, 32, 32]
else: else:
data_shape = [32, 32, 3] data_shape = [32, 32, 3]
else: else:
underlying_train_reader = paddle.dataset.flowers.train()
underlying_test_reader = paddle.dataset.flowers.test()
classdim = 102 classdim = 102
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
data_shape = [3, 224, 224] data_shape = [3, 224, 224]
...@@ -85,16 +81,6 @@ def get_model(args): ...@@ -85,16 +81,6 @@ def get_model(args):
fluid.layers.batch( fluid.layers.batch(
data_file, batch_size=args.batch_size)) data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file) images, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + data_shape, [-1, 1]],
lod_levels=[0, 0],
dtypes=["float32", "int64"])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else: else:
images = fluid.layers.data( images = fluid.layers.data(
name='data', shape=data_shape, dtype='float32') name='data', shape=data_shape, dtype='float32')
...@@ -123,14 +109,13 @@ def get_model(args): ...@@ -123,14 +109,13 @@ def get_model(args):
# data reader # data reader
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
underlying_train_reader, buf_size=5120), paddle.dataset.cifar.train10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
buf_size=5120),
batch_size=args.batch_size * args.gpus) batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch( test_reader = paddle.batch(
underlying_test_reader, batch_size=args.batch_size) paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
batch_size=args.batch_size)
if not args.use_reader_op and args.use_py_reader_op: return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, \
feed_queue, underlying_train_reader, underlying_test_reader, \
(data_shape, [1])
else:
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
...@@ -24,7 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o ...@@ -24,7 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
reader_library(create_py_array_reader_op SRCS create_py_array_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc)
cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
# Export local libraries to parent # Export local libraries to parent
......
...@@ -38,6 +38,8 @@ class BlockingQueue { ...@@ -38,6 +38,8 @@ class BlockingQueue {
"The capacity of a reader::BlockingQueue must be greater than 0."); "The capacity of a reader::BlockingQueue must be greater than 0.");
} }
~BlockingQueue() { Close(); }
bool Send(const T& elem) { bool Send(const T& elem) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
...@@ -88,24 +90,29 @@ class BlockingQueue { ...@@ -88,24 +90,29 @@ class BlockingQueue {
receive_cv_.notify_all(); receive_cv_.notify_all();
} }
bool IsClosed() { bool IsClosed() const {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return closed_; return closed_;
} }
size_t Cap() { size_t Cap() const {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return capacity_; return capacity_;
} }
size_t Size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
private: private:
size_t capacity_; size_t capacity_;
bool closed_; bool closed_;
std::deque<T> queue_; std::deque<T> queue_;
std::mutex mutex_; mutable std::mutex mutex_;
std::condition_variable receive_cv_; mutable std::condition_variable receive_cv_;
std::condition_variable send_cv_; mutable std::condition_variable send_cv_;
}; };
} // namespace reader } // namespace reader
} // namespace operators } // namespace operators
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
class PyReader : public framework::ReaderBase {
public:
explicit PyReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue) {
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
queue_ = queue;
}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
bool success;
*out = queue_->Dequeue(&success);
if (!success) out->clear();
}
void ReInit() override {}
private:
std::shared_ptr<LoDTensorBlockingQueue> queue_;
};
class CreatePyReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
const std::string& queue_name = Input("blocking_queue");
auto* queue_holder_var = scope.FindVar(queue_name);
PADDLE_ENFORCE(
queue_holder_var != nullptr,
"No LoDTensorBlockingQueueHolder variable with name %s found",
queue_name);
auto* queue_holder =
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
out->Reset(new PyReader(queue_holder->GetQueue()));
}
};
class CreatePyReaderOpMaker : public FileReaderMakerBase {
protected:
void Apply() override {
AddInput("blocking_queue",
"Name of the `LoDTensorBlockingQueueHolder` variable");
AddComment(R"DOC(
Create PyReader to support LoDTensor data feeding in Python side.
)DOC");
}
};
} // namespace reader
} // namespace operators
} // namespace paddle
namespace reader = ::paddle::operators::reader;
REGISTER_FILE_READER_OPERATOR(create_py_reader, reader::CreatePyReaderOp,
reader::CreatePyReaderOpMaker);
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <vector>
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace operators {
namespace reader {
class LoDTensorBlockingQueueHolder;
class LoDTensorBlockingQueue {
friend class LoDTensorBlockingQueueHolder;
private:
LoDTensorBlockingQueue(size_t capacity,
const std::vector<framework::DDim>& dims)
: dims_(dims) {
queue_.reset(
new BlockingQueue<std::vector<framework::LoDTensor>>(capacity));
}
public:
bool Enqueue(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
CheckDims(lod_tensor_vec);
return queue_->Send(lod_tensor_vec);
}
bool Enqueue(std::vector<framework::LoDTensor>&& lod_tensor_vec) {
CheckDims(lod_tensor_vec);
return queue_->Send(std::move(lod_tensor_vec));
}
std::vector<framework::LoDTensor> Dequeue(bool* ok = nullptr) {
std::vector<framework::LoDTensor> lod_tensor_vec;
bool success = queue_->Receive(&lod_tensor_vec);
if (ok != nullptr) *ok = success;
return lod_tensor_vec;
}
inline size_t Cap() const { return queue_->Cap(); }
inline size_t Size() const { return queue_->Size(); }
inline void Close() { return queue_->Close(); }
inline bool IsClosed() const { return queue_->IsClosed(); }
private:
void CheckDims(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
PADDLE_ENFORCE(dims_.size() == lod_tensor_vec.size(),
"Expect input size is %d but found %s", dims_.size(),
lod_tensor_vec.size());
for (size_t i = 0; i < dims_.size(); ++i) {
const auto& in_dims = lod_tensor_vec[i].dims();
const auto& expect_dims =
framework::slice_ddim(dims_[i], 1, dims_[i].size());
PADDLE_ENFORCE(in_dims == expect_dims,
"Dims of the %d-th input tensor does not match", i);
}
}
std::unique_ptr<BlockingQueue<std::vector<framework::LoDTensor>>> queue_;
std::vector<framework::DDim> dims_;
};
class LoDTensorBlockingQueueHolder {
public:
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims) {
PADDLE_ENFORCE(
queue_ == nullptr,
"LoDTensorBlockingQueueHolder::InitOnce() can only be called once");
queue_.reset(new LoDTensorBlockingQueue(capacity, dims));
}
inline std::shared_ptr<LoDTensorBlockingQueue> GetQueue() { return queue_; }
inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const {
return queue_;
}
private:
std::shared_ptr<LoDTensorBlockingQueue> queue_;
};
} // namespace reader
} // namespace operators
} // namespace paddle
...@@ -34,7 +34,7 @@ limitations under the License. */ ...@@ -34,7 +34,7 @@ limitations under the License. */
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/activation_op.h"
#include "paddle/fluid/operators/reader/py_array_feed_queue.h" #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -298,40 +298,38 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -298,40 +298,38 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::ReaderHolder>(m, "Reader", "") py::class_<framework::ReaderHolder>(m, "Reader", "")
.def("reset", &framework::ReaderHolder::ReInit); .def("reset", &framework::ReaderHolder::ReInit);
using PyArrayFeedQueue = ::paddle::operators::reader::PyArrayFeedQueue; using LoDTensorBlockingQueue =
using PyArrayFeedQueueHolder = ::paddle::operators::reader::LoDTensorBlockingQueue;
::paddle::operators::reader::PyArrayFeedQueueHolder; using LoDTensorBlockingQueueHolder =
using PyArray = ::paddle::operators::reader::PyArray; ::paddle::operators::reader::LoDTensorBlockingQueueHolder;
py::class_<PyArrayFeedQueue>(m, "PyArrayFeedQueue", "") py::class_<LoDTensorBlockingQueue>(m, "LoDTensorBlockingQueue", "")
.def(
"enqueue",
[](PyArrayFeedQueue &self, const std::vector<PyArray> &py_array_vec) {
return self.Enqueue(py_array_vec);
})
.def("enqueue", .def("enqueue",
[](PyArrayFeedQueue &self, [](LoDTensorBlockingQueue &self,
const std::vector<framework::LoDTensor> &lod_tensor_vec) { const std::vector<framework::LoDTensor> &lod_tensor_vec) {
pybind11::gil_scoped_release release;
return self.Enqueue(lod_tensor_vec); return self.Enqueue(lod_tensor_vec);
}) })
.def("size", [](const PyArrayFeedQueue &self) { return self.Size(); }) .def("size",
.def("capacity", [](const PyArrayFeedQueue &self) { return self.Cap(); }) [](const LoDTensorBlockingQueue &self) { return self.Size(); })
.def("close", [](PyArrayFeedQueue &self) { return self.Close(); }) .def("capacity",
[](const LoDTensorBlockingQueue &self) { return self.Cap(); })
.def("close", [](LoDTensorBlockingQueue &self) { return self.Close(); })
.def("is_closed", .def("is_closed",
[](const PyArrayFeedQueue &self) { return self.IsClosed(); }); [](const LoDTensorBlockingQueue &self) { return self.IsClosed(); });
m.def("init_py_array_feed_queue", m.def("init_lod_tensor_blocking_queue",
[](Variable &var, size_t capacity, [](Variable &var, size_t capacity,
const std::vector<std::vector<int64_t>> &shapes, const std::vector<std::vector<int64_t>> &shapes)
const ::paddle::platform::Place &place) -> PyArrayFeedQueue * { -> LoDTensorBlockingQueue * {
std::vector<DDim> dims(shapes.size()); std::vector<DDim> dims(shapes.size());
std::transform(shapes.begin(), shapes.end(), dims.begin(), std::transform(shapes.begin(), shapes.end(), dims.begin(),
[](const std::vector<int64_t> &shape) { [](const std::vector<int64_t> &shape) {
return make_ddim(shape); return make_ddim(shape);
}); });
auto *holder = var.GetMutable<PyArrayFeedQueueHolder>(); auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>();
holder->InitOnce(capacity, dims, place); holder->InitOnce(capacity, dims);
return holder->GetFeeder().get(); return holder->GetQueue().get();
}, },
py::return_value_policy::reference); py::return_value_policy::reference);
py::class_<Scope>(m, "Scope", "") py::class_<Scope>(m, "Scope", "")
...@@ -505,6 +503,7 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -505,6 +503,7 @@ All parameter, weight, gradient are variables in Paddle.
pybind11::gil_scoped_release release; pybind11::gil_scoped_release release;
self.Run(prog, scope, block_id, create_local_scope, create_vars); self.Run(prog, scope, block_id, create_local_scope, create_vars);
}); });
m.def("init_gflags", framework::InitGflags); m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG); m.def("init_glog", framework::InitGLOG);
m.def("init_devices", m.def("init_devices",
...@@ -669,7 +668,12 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -669,7 +668,12 @@ All parameter, weight, gradient are variables in Paddle.
&ParallelExecutor::FeedTensorsIntoLocalScopes) &ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes", .def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run); .def("run", [](ParallelExecutor &self,
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
pybind11::gil_scoped_release release;
self.Run(fetch_tensors, fetched_var_name);
});
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
return m.ptr(); return m.ptr();
......
...@@ -24,8 +24,7 @@ from layer_function_generator import generate_layer_fn, templatedoc ...@@ -24,8 +24,7 @@ from layer_function_generator import generate_layer_fn, templatedoc
__all__ = [ __all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv',
'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch', 'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch',
'double_buffer', 'random_data_generator', 'py_array_reader', 'Preprocessor', 'double_buffer', 'random_data_generator', 'Preprocessor', 'load'
'load'
] ]
...@@ -449,60 +448,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): ...@@ -449,60 +448,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var) return monkey_patch_reader_methods(main_prog_var)
def py_array_reader(capacity,
shapes,
lod_levels,
dtypes,
place=None,
for_parallel=True):
if place is None:
place = core.CPUPlace()
if not isinstance(place, core.Place):
new_place = core.Place()
new_place.set_place(place)
place = new_place
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []
for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))
feeder_name = unique_name('py_array_feed_queue')
var = global_scope().var(feeder_name)
#feed_shapes = [shape[1:] for shape in shapes]
feed_queue = core.init_py_array_feed_queue(var, capacity, shapes, place)
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(
name=unique_name('create_py_array_reader'))
startup_blk.append_op(
type='create_py_array_reader',
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'feeder_name': feeder_name
})
startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
if for_parallel:
main_prog_var = parallel(reader=main_prog_var)
return monkey_patch_reader_methods(main_prog_var), feed_queue
def open_files(filenames, def open_files(filenames,
shapes, shapes,
lod_levels, lod_levels,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册