提交 697ba4b1 编写于 作者: S sneaxiy

Add Python array reader op

上级 2c12af76
......@@ -122,5 +122,15 @@ def parse_args():
type=str,
default="",
help='Directory that contains all the training recordio files.')
parser.add_argument(
'--use_py_reader_op',
action='store_true',
help='Whether to use Python reader op, omitted when use_reader_op is true'
)
parser.add_argument(
'--feed_queue_capacity',
type=int,
default=64,
help='Capacity of feed queue when use_py_reader_op is true')
args = parser.parse_args()
return args
......@@ -25,6 +25,9 @@ import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
from args import *
import threading
feed_queue = None
def append_nccl2_prepare(trainer_id):
......@@ -131,7 +134,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
exe = fluid.Executor(place)
exe.run(startup_prog)
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
......@@ -141,12 +144,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num):
train_losses = []
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
data = next(reader_generator, None)
if data == None:
break
......@@ -156,7 +159,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
start_time = time.time()
num_samples = 0
if args.use_reader_op:
if args.use_reader_op or args.use_py_reader_op:
try:
loss = exe.run(train_prog, fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex:
......@@ -170,7 +173,7 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
# FIXME(wuyi): For use_reader_op, if the current
# pass is not the last, the last batch of this pass
# is also equal to args.batch_size.
if args.use_reader_op:
if args.use_reader_op or args.use_py_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
......@@ -180,12 +183,13 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
print_train_time(start_time, time.time(), num_samples)
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
# evaluation
if not args.no_test and batch_acc and not args.use_reader_op:
if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op:
pass_test_acc = test(exe, infer_prog, test_reader, feeder,
batch_acc)
print(", Test Accuracy: %f" % pass_test_acc)
print("\n")
# TODO(wuyi): add warmup passes to get better perf data.
close_feed_queue()
exit(0)
......@@ -195,7 +199,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_acc, args, train_prog, startup_prog, nccl_id_var,
num_trainers, trainer_id):
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
......@@ -238,12 +242,12 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
num_samples = 0
iters = 0
start_time = time.time()
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
if not args.use_reader_op and not args.use_py_reader_op:
data = next(reader_generator, None)
if data == None:
break
......@@ -257,14 +261,14 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
if args.use_fake_data or args.use_reader_op:
if args.use_fake_data or args.use_reader_op or args.use_py_reader_op:
try:
loss, = exe.run([avg_loss.name])
except fluid.core.EnforceNotMet as ex:
break
else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.use_reader_op:
if args.use_reader_op or args.use_py_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
......@@ -275,7 +279,7 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_id += 1
print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc and not args.use_reader_op:
if not args.no_test and batch_acc and not args.use_reader_op and not args.use_py_reader_op:
# we have not implement record io for test
# skip test when use args.use_reader_op
test_acc = test(startup_exe, infer_prog, test_reader, feeder,
......@@ -307,7 +311,46 @@ def print_paddle_envs():
print('------------------------------------------------')
def feed_data(feed_queue, train_reader, test_reader, dshapes, args):
train_cnt = 0
test_cnt = 0
print_per_train_batch = 1
train_data_generator = train_reader()
start = time.time()
while True:
next_data = next(train_data_generator, None)
if next_data is None:
break
next_data = list(next_data)
for i in range(len(next_data)):
if not isinstance(next_data[i], np.ndarray):
next_data[i] = np.array(next_data[i])
next_data[i] = next_data[i].reshape([-1] + dshapes[i])
if not feed_queue.enqueue(next_data):
break
train_cnt += 1
'''
if train_cnt % print_per_train_batch == 0:
end = time.time()
print('Feed queue size: %d, capacity: %d, speed: %.5fsec/batch'
% (feed_queue.size(), feed_queue.capacity(), (end-start)/print_per_train_batch))
start = end
'''
feed_queue.close()
def close_feed_queue():
global feed_queue
if feed_queue is not None:
feed_queue.close()
def main():
global feed_queue
args = parse_args()
print_arguments(args)
print_paddle_envs()
......@@ -321,8 +364,23 @@ def main():
pr = cProfile.Profile()
pr.enable()
model_def = __import__("models.%s" % args.model, fromlist=["models"])
train_args = list(model_def.get_model(args))
model = model_def.get_model(args)
if not args.use_reader_op and args.use_py_reader_op:
feed_queue = model[-4]
train_reader = model[-3]
test_reader = model[-2]
dshapes = model[-1]
feed_thread = threading.Thread(
target=feed_data,
args=(feed_queue, train_reader, test_reader, dshapes, args))
#feed_thread.setDaemon(True)
feed_thread.start()
model = model[:-4]
train_args = list(model)
train_args.append(args)
# Run optimizer.minimize(avg_loss)
train_args[2].minimize(train_args[0])
if args.memory_optimize:
......@@ -338,6 +396,7 @@ def main():
train_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*train_args)
train(*train_args)
close_feed_queue()
exit(0)
# for other update methods, use default programs
......@@ -362,3 +421,4 @@ def main():
if __name__ == "__main__":
main()
close_feed_queue()
......@@ -182,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor):
def get_model(args):
if args.use_reader_op:
if args.use_reader_op or args.use_py_reader_op:
raise Exception("machine_translation do not support reader op for now.")
embedding_dim = 512
encoder_size = 512
......
......@@ -66,13 +66,14 @@ def cnn_model(data):
def get_model(args):
dshape = [1, 28, 28]
if args.use_reader_op:
filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1, 1, 28, 28], (-1, 1)],
shapes=[[-1] + dshape, (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
......@@ -81,8 +82,18 @@ def get_model(args):
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + dshape, [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
images = fluid.layers.data(name='pixel', shape=dshape, dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.device == 'CPU' and args.cpus > 1:
......@@ -118,8 +129,16 @@ def get_model(args):
learning_rate=0.001, beta1=0.9, beta2=0.999)
# Reader
underlying_train_reader = paddle.dataset.mnist.train()
underlying_test_reader = paddle.dataset.mnist.test()
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus)
underlying_train_reader, batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=args.batch_size)
underlying_test_reader, batch_size=args.batch_size)
if not args.use_reader_op and args.use_py_reader_op:
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc, \
feed_queue, underlying_train_reader, underlying_test_reader, \
(dshape, [1])
else:
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc
......@@ -163,6 +163,16 @@ def get_model(args):
fluid.layers.batch(
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + dshape, [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file)
else:
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
......@@ -204,5 +214,11 @@ def get_model(args):
batched_test_reader = paddle.batch(
train_reader, batch_size=args.batch_size, drop_last=True)
if not args.use_reader_op and args.use_py_reader_op:
return avg_cost, inference_program, optimizer, batched_train_reader,\
batched_test_reader, batch_acc, \
feed_queue, train_reader, test_reader, \
(dshape, [1])
else:
return avg_cost, inference_program, optimizer, batched_train_reader,\
batched_test_reader, batch_acc
......@@ -44,7 +44,7 @@ def crop_sentence(reader, crop_size):
def get_model(args):
if args.use_reader_op:
if args.use_reader_op or args.use_py_reader_op:
raise Exception(
"stacked_dynamic_lstm do not support reader op for now.")
lstm_size = 512
......
......@@ -54,12 +54,16 @@ def vgg16_bn_drop(input):
def get_model(args):
if args.data_set == "cifar10":
underlying_train_reader = paddle.dataset.cifar.train10()
underlying_test_reader = paddle.dataset.cifar.test10()
classdim = 10
if args.data_format == 'NCHW':
data_shape = [3, 32, 32]
else:
data_shape = [32, 32, 3]
else:
underlying_train_reader = paddle.dataset.flowers.train()
underlying_test_reader = paddle.dataset.flowers.test()
classdim = 102
if args.data_format == 'NCHW':
data_shape = [3, 224, 224]
......@@ -81,6 +85,16 @@ def get_model(args):
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
elif args.use_py_reader_op:
data_file, feed_queue = fluid.layers.py_array_reader(
capacity=args.feed_queue_capacity,
shapes=[[-1] + data_shape, [-1, 1]],
lod_levels=[0, 0],
dtypes=["float32", "int64"])
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(
name='data', shape=data_shape, dtype='float32')
......@@ -109,13 +123,14 @@ def get_model(args):
# data reader
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.cifar.train10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
buf_size=5120),
underlying_train_reader, buf_size=5120),
batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch(
paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
batch_size=args.batch_size)
underlying_test_reader, batch_size=args.batch_size)
if not args.use_reader_op and args.use_py_reader_op:
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc, \
feed_queue, underlying_train_reader, underlying_test_reader, \
(data_shape, [1])
else:
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
......@@ -24,6 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
reader_library(create_py_array_reader_op SRCS create_py_array_reader_op.cc)
cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
# Export local libraries to parent
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/reader/py_array_feed_queue.h"
namespace paddle {
namespace operators {
namespace reader {
class PyArrayReader : public framework::ReaderBase {
public:
explicit PyArrayReader(const std::shared_ptr<PyArrayFeedQueue>& queue) {
PADDLE_ENFORCE(queue != nullptr, "PyArrayFeedQueue must not be null");
queue_ = queue;
}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
*out = queue_->Dequeue();
}
void ReInit() override {
// PADDLE_THROW("PyArrayReader does not support ReInit()");
}
private:
std::shared_ptr<PyArrayFeedQueue> queue_;
};
class CreatePyArrayReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
const std::string& feeder_name = Attr<std::string>("feeder_name");
auto* feeder_holder_var = scope.FindVar(feeder_name);
PADDLE_ENFORCE(feeder_holder_var != nullptr,
"No PyArrayFeedQueue variable with name %s found",
feeder_name);
auto* feeder_holder =
feeder_holder_var->template GetMutable<PyArrayFeedQueueHolder>();
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
out->Reset(new PyArrayReader(feeder_holder->GetFeeder()));
}
};
class CreatePyArrayReaderOpMaker : public FileReaderMakerBase {
protected:
void Apply() override {
AddAttr<std::string>("feeder_name",
"Name of the `PyArrayFeedQueueHolder` variable");
AddComment(R"DOC(
Create PyArrayReader to accept Python data feeding.
)DOC");
}
};
} // namespace reader
} // namespace operators
} // namespace paddle
namespace reader = ::paddle::operators::reader;
REGISTER_FILE_READER_OPERATOR(create_py_array_reader,
reader::CreatePyArrayReaderOp,
reader::CreatePyArrayReaderOpMaker);
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <algorithm>
#include <condition_variable> //NOLINT
#include <memory>
#include <mutex> // NOLINT
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/operators/reader/py_blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/pybind/tensor_py.h"
namespace paddle {
namespace operators {
namespace reader {
using PyTuple = ::pybind11::tuple;
using PyArray = ::pybind11::array;
template <typename T>
using PyArrayT = ::pybind11::array_t<T, ::pybind11::array::c_style |
::pybind11::array::forcecast>;
class PyArrayToTensorVisitor : public boost::static_visitor<void> {
public:
#define PY_ARRAY_TO_TENSOR_WITH_TYPE(dtype, func_name) \
pybind::func_name(tensor_, static_cast<const PyArrayT<dtype>&>(py_array_), \
place)
#define PY_ARRAY_TO_TENSOR(func_name) \
if (IsType<size_t>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(size_t, func_name); \
} else if (IsType<int64_t>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(int64_t, func_name); \
} else if (IsType<int32_t>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(int32_t, func_name); \
} else if (IsType<int16_t>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(int16_t, func_name); \
} else if (IsType<uint8_t>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(uint8_t, func_name); \
} else if (IsType<float>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(float, func_name); \
} else if (IsType<double>()) { \
PY_ARRAY_TO_TENSOR_WITH_TYPE(double, func_name); \
} else { \
PADDLE_THROW("unsupported dtype of python array"); \
}
PyArrayToTensorVisitor(const PyArray& py_array, framework::Tensor* tensor)
: py_array_(py_array), tensor_(tensor) {}
void operator()(const platform::CPUPlace& place) {
PY_ARRAY_TO_TENSOR(PyCPUTensorSetFromArray);
}
void operator()(const platform::CUDAPlace& place) {
#ifdef PADDLE_WITH_CUDA
PY_ARRAY_TO_TENSOR(PyCUDATensorSetFromArray);
#else
PADDLE_THROW("CUDAPlace is not supported in CPU only version");
#endif
}
void operator()(const platform::CUDAPinnedPlace& place) {
#ifdef PADDLE_WITH_CUDA
PY_ARRAY_TO_TENSOR(PyCUDAPinnedTensorSetFromArray);
#else
PADDLE_THROW("CUDAPinnedPlace is not supported in CPU only version");
#endif
}
#undef PY_ARRAY_TO_TENSOR
#undef PY_ARRAY_TO_TENSOR_WITH_TYPE
private:
template <typename T>
inline bool IsType() const {
return ::pybind11::isinstance<PyArrayT<T>>(py_array_);
}
private:
const PyArray& py_array_;
framework::Tensor* tensor_;
};
class PyArrayFeedQueueHolder;
// PyArrayFeedQueue must be thread-safe
class PyArrayFeedQueue {
friend class PyArrayFeedQueueHolder;
private:
PyArrayFeedQueue(size_t capacity, const std::vector<framework::DDim>& dims,
const platform::Place& place)
: dims_(dims), place_(place) {
queue_.reset(
new PyBlockingQueue<std::vector<framework::LoDTensor>>(capacity));
}
public:
~PyArrayFeedQueue() { Close(); }
bool Enqueue(const std::vector<PyArray>& py_array_vec) {
auto lod_tensor_vec = PyArrayVecToLoDTensorVec(py_array_vec);
VLOG(5) << "Enqueue at address " << reinterpret_cast<void*>(this);
return queue_->Send(std::move(lod_tensor_vec));
}
bool Enqueue(const std::vector<framework::LoDTensor>& tensor_vec) {
VLOG(5) << "Enqueue at address " << reinterpret_cast<void*>(this);
return queue_->Send(tensor_vec);
}
std::vector<framework::LoDTensor> Dequeue() {
VLOG(5) << "Dequeue at address " << reinterpret_cast<void*>(this);
std::vector<framework::LoDTensor> ret;
return queue_->Receive(&ret) ? ret : std::vector<framework::LoDTensor>();
}
inline size_t Size() const { return queue_->Size(); }
inline size_t Cap() const { return queue_->Cap(); }
inline bool IsClosed() const { return queue_->IsClosed(); }
inline void Close() { queue_->Close(); }
private:
std::vector<framework::LoDTensor> PyArrayVecToLoDTensorVec(
const std::vector<PyArray>& py_array_vec) {
PADDLE_ENFORCE(dims_.size() == py_array_vec.size(),
"expected input tensor number %d but found %d", dims_.size(),
py_array_vec.size());
size_t i = 0;
if (py_array_vec.size() > 1) {
size_t dim0 = py_array_vec[0].shape()[0];
for (size_t j = 1; j < py_array_vec.size(); ++j) {
PADDLE_ENFORCE(dim0 == py_array_vec[j].shape()[0],
"0-dim of the %d-th input tensor is %d, but 0-dim of "
"the 0-th input tensor is %d",
j, py_array_vec[j].shape()[0], dim0);
}
}
std::vector<framework::LoDTensor> lod_tensor_vec;
lod_tensor_vec.reserve(py_array_vec.size());
std::for_each(
py_array_vec.begin(), py_array_vec.end(), [&](const PyArray& py_array) {
for (int64_t j = 1; j < dims_[i].size(); ++j) {
PADDLE_ENFORCE(
dims_[i][j] == static_cast<int64_t>(py_array.shape()[j]),
"expected %d-dim of %d-th input tensor is %d but found %d", j,
i, dims_[i][j], py_array.shape()[j]);
}
lod_tensor_vec.emplace_back(framework::LoDTensor());
PyArrayToTensorVisitor visitor(py_array, &(lod_tensor_vec.back()));
boost::apply_visitor(visitor, place_);
++i;
});
return lod_tensor_vec;
}
std::unique_ptr<PyBlockingQueue<std::vector<framework::LoDTensor>>> queue_;
std::vector<framework::DDim> dims_;
platform::Place place_;
};
class PyArrayFeedQueueHolder {
public:
PyArrayFeedQueueHolder() {}
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims,
const platform::Place& place) {
PADDLE_ENFORCE(
feeder_ == nullptr,
"PyArrayFeedQueueHolder::InitOnce() can only be called once");
feeder_.reset(new PyArrayFeedQueue(capacity, dims, place));
}
std::shared_ptr<PyArrayFeedQueue> GetFeeder() { return feeder_; }
const std::shared_ptr<PyArrayFeedQueue>& GetFeeder() const { return feeder_; }
private:
std::shared_ptr<PyArrayFeedQueue> feeder_;
};
} // namespace reader
} // namespace operators
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <condition_variable> // NOLINT
#include <deque>
#include "Python.h"
#include "paddle/fluid/platform/enforce.h"
#include "pybind11/pybind11.h"
namespace paddle {
namespace operators {
namespace reader {
// PyBlockingQueue is designed for PyArrayFeedQueue
// PyBlockingQueue would release GIL of Python when
// the queue is full to avoid deadlock.
template <typename T>
class PyBlockingQueue {
public:
explicit PyBlockingQueue(size_t capacity)
: capacity_(capacity), closed_(false) {
PADDLE_ENFORCE_GT(
capacity_, 0,
"The capacity of a reader::PyBlockingQueue must be greater than 0.");
}
~PyBlockingQueue() { Close(); }
bool Send(const T& elem) {
std::unique_lock<std::mutex> lock(mutex_);
receive_cv_.notify_one();
if (queue_.size() >= capacity_ && (!closed_)) {
pybind11::gil_scoped_release release;
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
}
if (closed_) {
VLOG(5)
<< "WARNING: Sending an element to a closed reader::BlockingQueue.";
return false;
}
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
queue_.push_back(elem);
return true;
}
bool Send(T&& elem) {
std::unique_lock<std::mutex> lock(mutex_);
receive_cv_.notify_one();
if (queue_.size() >= capacity_ && (!closed_)) {
pybind11::gil_scoped_release release;
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
}
if (closed_) {
VLOG(5)
<< "WARNING: Sending an element to a closed reader::BlokcingQueue.";
return false;
}
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
queue_.emplace_back(std::move(elem));
return true;
}
bool Receive(T* elem) {
std::unique_lock<std::mutex> lock(mutex_);
send_cv_.notify_one();
receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; });
if (!queue_.empty()) {
PADDLE_ENFORCE_NOT_NULL(elem);
*elem = queue_.front();
queue_.pop_front();
return true;
} else {
PADDLE_ENFORCE(closed_);
return false;
}
}
void Close() {
std::lock_guard<std::mutex> lock(mutex_);
closed_ = true;
send_cv_.notify_all();
receive_cv_.notify_all();
}
bool IsClosed() const {
std::lock_guard<std::mutex> lock(mutex_);
return closed_;
}
size_t Cap() const {
std::lock_guard<std::mutex> lock(mutex_);
return capacity_;
}
size_t Size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
private:
size_t capacity_;
bool closed_;
std::deque<T> queue_;
mutable std::mutex mutex_;
mutable std::condition_variable receive_cv_;
mutable std::condition_variable send_cv_;
};
} // namespace reader
} // namespace operators
} // namespace paddle
......@@ -34,6 +34,7 @@ limitations under the License. */
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/operators/activation_op.h"
#include "paddle/fluid/operators/reader/py_array_feed_queue.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
......@@ -297,6 +298,42 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::ReaderHolder>(m, "Reader", "")
.def("reset", &framework::ReaderHolder::ReInit);
using PyArrayFeedQueue = ::paddle::operators::reader::PyArrayFeedQueue;
using PyArrayFeedQueueHolder =
::paddle::operators::reader::PyArrayFeedQueueHolder;
using PyArray = ::paddle::operators::reader::PyArray;
py::class_<PyArrayFeedQueue>(m, "PyArrayFeedQueue", "")
.def(
"enqueue",
[](PyArrayFeedQueue &self, const std::vector<PyArray> &py_array_vec) {
return self.Enqueue(py_array_vec);
})
.def("enqueue",
[](PyArrayFeedQueue &self,
const std::vector<framework::LoDTensor> &lod_tensor_vec) {
return self.Enqueue(lod_tensor_vec);
})
.def("size", [](const PyArrayFeedQueue &self) { return self.Size(); })
.def("capacity", [](const PyArrayFeedQueue &self) { return self.Cap(); })
.def("close", [](PyArrayFeedQueue &self) { return self.Close(); })
.def("is_closed",
[](const PyArrayFeedQueue &self) { return self.IsClosed(); });
m.def("init_py_array_feed_queue",
[](Variable &var, size_t capacity,
const std::vector<std::vector<int64_t>> &shapes,
const ::paddle::platform::Place &place) -> PyArrayFeedQueue * {
std::vector<DDim> dims(shapes.size());
std::transform(shapes.begin(), shapes.end(), dims.begin(),
[](const std::vector<int64_t> &shape) {
return make_ddim(shape);
});
auto *holder = var.GetMutable<PyArrayFeedQueueHolder>();
holder->InitOnce(capacity, dims, place);
return holder->GetFeeder().get();
},
py::return_value_policy::reference);
py::class_<Scope>(m, "Scope", "")
.def("var",
[](Scope &self, const std::string &name) -> Variable * {
......@@ -463,10 +500,11 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_DISTRIBUTE
.def("complete", &Executor::Complete)
#endif
.def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run);
.def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope,
int block_id, bool create_local_scope, bool create_vars) {
pybind11::gil_scoped_release release;
self.Run(prog, scope, block_id, create_local_scope, create_vars);
});
m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG);
m.def("init_devices",
......
......@@ -146,7 +146,7 @@ void PyCPUTensorSetFromArray(
template <>
// This following specialization maps uint16_t in the parameter type to
// platform::float16.
void PyCPUTensorSetFromArray(
inline void PyCPUTensorSetFromArray(
framework::Tensor *self,
pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast>
......@@ -185,7 +185,7 @@ void PyCUDATensorSetFromArray(
template <>
// This following specialization maps uint16_t in the parameter type to
// platform::float16.
void PyCUDATensorSetFromArray(
inline void PyCUDATensorSetFromArray(
framework::Tensor *self,
pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast>
......@@ -224,7 +224,7 @@ void PyCUDAPinnedTensorSetFromArray(
template <>
// This following specialization maps uint16_t in the parameter type to
// platform::float16.
void PyCUDAPinnedTensorSetFromArray(
inline void PyCUDAPinnedTensorSetFromArray(
framework::Tensor *self,
pybind11::array_t<uint16_t,
pybind11::array::c_style | pybind11::array::forcecast>
......
......@@ -24,7 +24,8 @@ from layer_function_generator import generate_layer_fn, templatedoc
__all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv',
'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch',
'double_buffer', 'random_data_generator', 'Preprocessor', 'load'
'double_buffer', 'random_data_generator', 'py_array_reader', 'Preprocessor',
'load'
]
......@@ -448,6 +449,61 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var)
# UNCHECK(zengjinle)
def py_array_reader(capacity,
shapes,
lod_levels,
dtypes,
place=None,
for_parallel=True):
if place is None:
place = core.CPUPlace()
if not isinstance(place, core.Place):
new_place = core.Place()
new_place.set_place(place)
place = new_place
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []
for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))
feeder_name = unique_name('py_array_feed_queue')
var = global_scope().var(feeder_name)
#feed_shapes = [shape[1:] for shape in shapes]
feed_queue = core.init_py_array_feed_queue(var, capacity, shapes, place)
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(
name=unique_name('create_py_array_reader'))
startup_blk.append_op(
type='create_py_array_reader',
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'feeder_name': feeder_name
})
startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
if for_parallel:
main_prog_var = parallel(reader=main_prog_var)
return monkey_patch_reader_methods(main_prog_var), feed_queue
def open_files(filenames,
shapes,
lod_levels,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册