提交 b17541a9 编写于 作者: S sneaxiy

fix hang bug

上级 1e4c0a6f
...@@ -546,19 +546,39 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -546,19 +546,39 @@ All parameter, weight, gradient are variables in Paddle.
::paddle::operators::reader::LoDTensorBlockingQueue; ::paddle::operators::reader::LoDTensorBlockingQueue;
using LoDTensorBlockingQueueHolder = using LoDTensorBlockingQueueHolder =
::paddle::operators::reader::LoDTensorBlockingQueueHolder; ::paddle::operators::reader::LoDTensorBlockingQueueHolder;
using LockFreeLoDTensorBlockingQueue =
::paddle::operators::reader::LockFreeLoDTensorBlockingQueue;
using LockFreeLoDTensorBlockingQueueHolder =
::paddle::operators::reader::LockFreeLoDTensorBlockingQueueHolder;
py::class_<LoDTensorBlockingQueue, std::shared_ptr<LoDTensorBlockingQueue>>( py::class_<LoDTensorBlockingQueue, std::shared_ptr<LoDTensorBlockingQueue>>(
m, "LoDTensorBlockingQueue", "") m, "LoDTensorBlockingQueue", "")
.def("push", .def("push",
[](LoDTensorBlockingQueue &self, [](LoDTensorBlockingQueue &self,
const std::vector<framework::LoDTensor> &lod_tensor_vec) { std::vector<framework::LoDTensor> &lod_tensor_vec) {
pybind11::gil_scoped_release release; pybind11::gil_scoped_release release;
return self.Push(lod_tensor_vec); return self.Push(std::move(lod_tensor_vec));
}) })
.def("size", &LoDTensorBlockingQueue::Size) .def("size", &LoDTensorBlockingQueue::Size)
.def("capacity", &LoDTensorBlockingQueue::Cap) .def("capacity", &LoDTensorBlockingQueue::Cap)
.def("close", &LoDTensorBlockingQueue::Close) .def("close", &LoDTensorBlockingQueue::Close)
.def("is_closed", &LoDTensorBlockingQueue::IsClosed); .def("is_closed", &LoDTensorBlockingQueue::IsClosed);
py::class_<LockFreeLoDTensorBlockingQueue,
std::shared_ptr<LockFreeLoDTensorBlockingQueue>>(
m, "LockFreeLoDTensorBlockingQueue", "")
.def("push",
[](LockFreeLoDTensorBlockingQueue &self,
std::vector<framework::LoDTensor> &lod_tensor_vec) {
pybind11::gil_scoped_release release;
return self.Push(std::move(lod_tensor_vec));
})
.def("size", &LockFreeLoDTensorBlockingQueue::Size)
.def("capacity", &LockFreeLoDTensorBlockingQueue::Cap)
.def("close", &LockFreeLoDTensorBlockingQueue::Close)
.def("is_closed", &LockFreeLoDTensorBlockingQueue::IsClosed);
m.def("init_lod_tensor_blocking_queue", m.def("init_lod_tensor_blocking_queue",
[](Variable &var, [](Variable &var,
size_t capacity) -> std::shared_ptr<LoDTensorBlockingQueue> { size_t capacity) -> std::shared_ptr<LoDTensorBlockingQueue> {
...@@ -568,6 +588,15 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -568,6 +588,15 @@ All parameter, weight, gradient are variables in Paddle.
}, },
py::return_value_policy::copy); py::return_value_policy::copy);
m.def("init_lock_free_lod_tensor_blocking_queue",
[](Variable &var,
size_t capacity) -> std::shared_ptr<LockFreeLoDTensorBlockingQueue> {
auto *holder = var.GetMutable<LockFreeLoDTensorBlockingQueueHolder>();
holder->InitOnce(capacity);
return holder->GetQueue();
},
py::return_value_policy::copy);
py::class_<Scope>(m, "_Scope", R"DOC( py::class_<Scope>(m, "_Scope", R"DOC(
Scope is an association of a name to Variable. All variables belong to Scope. Scope is an association of a name to Variable. All variables belong to Scope.
......
...@@ -25,77 +25,107 @@ ...@@ -25,77 +25,107 @@
namespace paddle { namespace paddle {
namespace pybind { namespace pybind {
class FeedReader { class MultiDeviceFeedReader {
public:
using ResultDictList = using ResultDictList =
std::vector<std::unordered_map<std::string, framework::LoDTensor>>; std::vector<std::unordered_map<std::string, framework::LoDTensor>>;
public: MultiDeviceFeedReader(
FeedReader(std::unique_ptr<framework::ReaderHolder> reader, const std::shared_ptr<operators::reader::LoDTensorBlockingQueue> &queue,
const std::vector<std::string> &names, size_t num_places, const std::vector<std::string> &names,
bool drop_last = true) const std::vector<platform::Place> &dst_places, bool use_double_buffer)
: reader_(std::move(reader)), : queue_(queue),
names_(names), names_(names),
num_places_(num_places), pool_(new ::ThreadPool(dst_places.size())) {
drop_last_(drop_last) {} std::shared_ptr<framework::ReaderBase> reader(
new operators::reader::PyReader(queue));
ResultDictList ReadNext() { readers_.reserve(dst_places.size());
std::vector<framework::LoDTensor> tensors; for (auto &p : dst_places) {
reader_->ReadNext(&tensors); auto *holder = new framework::ReaderHolder();
if (tensors.empty()) return ResultDictList(); if (use_double_buffer) {
holder->Reset(
framework::MakeDecoratedReader<operators::reader::BufferedReader>(
reader, p, 2));
} else {
if (platform::is_gpu_place(p)) {
PADDLE_THROW(
"Place cannot be CUDAPlace when use_double_buffer is False");
}
holder->Reset(reader);
}
readers_.emplace_back(holder);
}
PADDLE_ENFORCE(tensors.size() % names_.size() == 0, futures_.resize(dst_places.size());
"Tensor size: %d, names size: %d", tensors.size(), ret_.resize(dst_places.size());
names_.size()); ReadAsync();
}
size_t read_place_num = tensors.size() / names_.size(); ResultDictList ReadNext() {
bool success = WaitFutures();
if (drop_last_ && read_place_num != num_places_) { if (!success) {
return ResultDictList(); return {};
} }
ResultDictList ret(read_place_num); ResultDictList result(ret_.size());
for (size_t i = 0; i < tensors.size(); ++i) { for (size_t i = 0; i < ret_.size(); ++i) {
ret[i / names_.size()].emplace(names_[i % names_.size()], for (size_t j = 0; j < names_.size(); ++j) {
std::move(tensors[i])); result[i].emplace(names_[j], std::move(ret_[i][j]));
} }
return ret; }
ReadAsync();
return result;
} }
void Start() { reader_->Start(); } void Reset() {
Shutdown();
Start();
void Reset() { reader_->ResetAll(); } ReadAsync();
}
~MultiDeviceFeedReader() {
queue_->Close();
pool_.reset();
}
private: private:
std::unique_ptr<framework::ReaderHolder> reader_; bool WaitFutures() {
std::vector<std::string> names_; bool success = true;
size_t num_places_; for (auto &f : futures_) {
bool drop_last_; success &= f.get();
}; }
return success;
}
static std::unique_ptr<framework::ReaderHolder> CreatePyReader( void Shutdown() {
const std::vector< for (auto &r : readers_) r->Shutdown();
std::shared_ptr<operators::reader::LoDTensorBlockingQueue>> &queues,
const std::vector<platform::Place> &dst_places) {
std::shared_ptr<framework::ReaderBase> reader;
if (queues.size() == 1) {
reader.reset(new operators::reader::PyReader(queues[0]));
} else {
reader.reset(new operators::reader::MultiQueuePyReader(queues));
} }
std::vector<std::shared_ptr<framework::ReaderBase>> buffered_reader;
buffered_reader.reserve(dst_places.size()); void Start() {
for (auto &p : dst_places) { for (auto &r : readers_) r->Start();
buffered_reader.emplace_back(
framework::MakeDecoratedReader<operators::reader::BufferedReader>(
reader, p, 2));
} }
reader = framework::MakeDecoratedReader<operators::reader::ComposeReader>(
buffered_reader);
auto *holder = new framework::ReaderHolder(); void ReadAsync() {
holder->Reset(reader); for (size_t i = 0; i < readers_.size(); ++i) {
return std::unique_ptr<framework::ReaderHolder>(holder); futures_[i] = pool_->enqueue([this, i] {
} readers_[i]->ReadNext(&ret_[i]);
return !ret_[i].empty();
});
}
}
std::vector<std::string> names_;
std::unique_ptr<::ThreadPool> pool_;
std::shared_ptr<operators::reader::LoDTensorBlockingQueue> queue_;
std::vector<std::unique_ptr<framework::ReaderHolder>> readers_;
std::vector<std::future<bool>> futures_;
std::vector<std::vector<framework::LoDTensor>> ret_;
bool drop_last_;
};
namespace py = pybind11; namespace py = pybind11;
...@@ -108,22 +138,20 @@ void BindReader(py::module *module) { ...@@ -108,22 +138,20 @@ void BindReader(py::module *module) {
.def("start", &framework::ReaderHolder::Start) .def("start", &framework::ReaderHolder::Start)
.def("reset", &framework::ReaderHolder::ResetAll); .def("reset", &framework::ReaderHolder::ResetAll);
py::class_<FeedReader>(m, "FeedReader", "") py::class_<MultiDeviceFeedReader>(m, "MultiDeviceFeedReader", "")
.def("read_next", &FeedReader::ReadNext, .def("read_next", &MultiDeviceFeedReader::ReadNext,
py::call_guard<py::gil_scoped_release>())
.def("start", &FeedReader::Start,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("reset", &FeedReader::Reset, .def("reset", &MultiDeviceFeedReader::Reset,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
m.def("create_py_reader", m.def("create_py_reader",
[](const std::vector< [](const std::shared_ptr<operators::reader::LoDTensorBlockingQueue>
std::shared_ptr<operators::reader::LoDTensorBlockingQueue>> &queue,
queues,
const std::vector<std::string> &names, const std::vector<std::string> &names,
const std::vector<platform::Place> &dst_places, bool drop_last) { const std::vector<platform::Place> &dst_places,
return new FeedReader(CreatePyReader(queues, dst_places), names, bool use_double_buffer) {
dst_places.size(), drop_last); return new MultiDeviceFeedReader(queues, names, dst_places,
use_double_buffer);
}, },
py::return_value_policy::take_ownership); py::return_value_policy::take_ownership);
} }
......
...@@ -486,7 +486,8 @@ def _py_reader(capacity, ...@@ -486,7 +486,8 @@ def _py_reader(capacity,
lod_levels=None, lod_levels=None,
name=None, name=None,
use_double_buffer=True, use_double_buffer=True,
feed_list=None): feed_list=None,
lock_free=False):
if feed_list is not None: if feed_list is not None:
if not isinstance(feed_list, list): if not isinstance(feed_list, list):
...@@ -526,12 +527,17 @@ def _py_reader(capacity, ...@@ -526,12 +527,17 @@ def _py_reader(capacity,
double_buffer_name = "_".join([name, "double_buffer"]) double_buffer_name = "_".join([name, "double_buffer"])
var = global_scope().var(queue_name) var = global_scope().var(queue_name)
if not lock_free:
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity) feed_queue = core.init_lod_tensor_blocking_queue(var, capacity)
else:
feed_queue = core.init_lock_free_lod_tensor_blocking_queue(var,
capacity)
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name) startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op( startup_blk.append_op(
type='create_py_reader', type='create_py_reader'
if not lock_free else 'create_lock_free_py_reader',
inputs={'blocking_queue': [queue_name]}, inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]}, outputs={'Out': [startup_var]},
attrs={ attrs={
...@@ -638,7 +644,8 @@ def py_reader(capacity, ...@@ -638,7 +644,8 @@ def py_reader(capacity,
dtypes, dtypes,
lod_levels=None, lod_levels=None,
name=None, name=None,
use_double_buffer=True): use_double_buffer=True,
lock_free=False):
""" """
Create a Python reader for data feeding in Python Create a Python reader for data feeding in Python
...@@ -763,7 +770,8 @@ def py_reader(capacity, ...@@ -763,7 +770,8 @@ def py_reader(capacity,
dtypes=dtypes, dtypes=dtypes,
lod_levels=lod_levels, lod_levels=lod_levels,
name=name, name=name,
use_double_buffer=use_double_buffer) use_double_buffer=use_double_buffer,
lock_free=lock_free)
def create_py_reader_by_data(capacity, def create_py_reader_by_data(capacity,
......
...@@ -17,6 +17,7 @@ import six ...@@ -17,6 +17,7 @@ import six
import threading import threading
from .framework import Program, Variable, program_guard from .framework import Program, Variable, program_guard
from .data_feeder import DataFeeder from .data_feeder import DataFeeder
import paddle.reader.decorator as decorator
__all__ = ['PyReader'] __all__ = ['PyReader']
...@@ -36,8 +37,8 @@ def _convert_places(places): ...@@ -36,8 +37,8 @@ def _convert_places(places):
return ret return ret
class PyReader(object): class PyReader(Reader):
def __init__(self, feed_list, places, capacity, multi_queue=True): def __init__(self, feed_list, places, capacity):
self._tensor_reader = None self._tensor_reader = None
self._thread = None self._thread = None
...@@ -53,15 +54,11 @@ class PyReader(object): ...@@ -53,15 +54,11 @@ class PyReader(object):
self._queue_capacity = capacity self._queue_capacity = capacity
queue_num = len(self._places) if multi_queue else 1 self.queue = core.init_lod_tensor_blocking_queue(core.Variable(),
for _ in six.moves.range(queue_num): self._queue_capacity)
self._queues.append(
core.init_lod_tensor_blocking_queue(core.Variable(),
self._queue_capacity))
self._reader = core.create_py_reader(self._queues, self._var_names, self._reader = core.create_py_reader(self._queue, self._var_names,
self._places, self._drop_last) self._places, self._drop_last)
self._exited = True
def __call__(self): def __call__(self):
assert self._tensor_reader is not None, \ assert self._tensor_reader is not None, \
...@@ -77,7 +74,7 @@ class PyReader(object): ...@@ -77,7 +74,7 @@ class PyReader(object):
def next(self): def next(self):
ret = self._reader.read_next() ret = self._reader.read_next()
if len(ret): if ret:
return ret return ret
else: else:
self._reset() self._reset()
...@@ -86,18 +83,11 @@ class PyReader(object): ...@@ -86,18 +83,11 @@ class PyReader(object):
return Iterator(self) return Iterator(self)
def _reset(self): def _reset(self):
if not self._exited:
for q in self._queues:
q.close()
if self._thread: if self._thread:
self._thread.join()
self._reader.reset() self._reader.reset()
self._thread.join()
def __thread_main__(): def __thread_main__():
queue_num = len(self._queues)
idx = 0
for tensors in self._tensor_reader(): for tensors in self._tensor_reader():
array = core.LoDTensorArray() array = core.LoDTensorArray()
for item in tensors: for item in tensors:
...@@ -108,19 +98,13 @@ class PyReader(object): ...@@ -108,19 +98,13 @@ class PyReader(object):
array.append(item) array.append(item)
if not self._queues[idx].push(array): if not self.queue.push(array):
break break
idx = (idx + 1) % queue_num self.queue.close()
for q in self._queues:
q.close()
self._exited = True
self._thread = threading.Thread(target=__thread_main__) self._thread = threading.Thread(target=__thread_main__)
self._thread.daemon = True self._thread.daemon = True
self._exited = False
self._thread.start() self._thread.start()
def set_numpy_reader(self, reader): def set_numpy_reader(self, reader):
......
...@@ -31,7 +31,7 @@ def random_reader(): ...@@ -31,7 +31,7 @@ def random_reader():
yield image, label yield image, label
def simple_fc_net(places, use_legacy_py_reader): def simple_fc_net(places, use_legacy_py_reader, lock_free=False):
startup_prog = fluid.Program() startup_prog = fluid.Program()
main_prog = fluid.Program() main_prog = fluid.Program()
startup_prog.random_seed = 1 startup_prog.random_seed = 1
...@@ -55,7 +55,8 @@ def simple_fc_net(places, use_legacy_py_reader): ...@@ -55,7 +55,8 @@ def simple_fc_net(places, use_legacy_py_reader):
py_reader = fluid.layers.py_reader( py_reader = fluid.layers.py_reader(
capacity=4, capacity=4,
shapes=[(-1, 784), (-1, 1)], shapes=[(-1, 784), (-1, 1)],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'],
lock_free=lock_free)
image, label = fluid.layers.read_file(py_reader) image, label = fluid.layers.read_file(py_reader)
py_reader.decorate_paddle_reader(reader) py_reader.decorate_paddle_reader(reader)
...@@ -82,7 +83,8 @@ def simple_fc_net(places, use_legacy_py_reader): ...@@ -82,7 +83,8 @@ def simple_fc_net(places, use_legacy_py_reader):
class TestBase(unittest.TestCase): class TestBase(unittest.TestCase):
def run_main(self, use_legacy_py_reader, with_data_parallel, places): def run_main(self, use_legacy_py_reader, with_data_parallel, places):
with fluid.scope_guard(fluid.Scope()): scope = fluid.Scope()
with fluid.scope_guard(scope):
startup_prog, main_prog, py_reader, loss = simple_fc_net( startup_prog, main_prog, py_reader, loss = simple_fc_net(
places, use_legacy_py_reader) places, use_legacy_py_reader)
exe = fluid.Executor(place=places[0]) exe = fluid.Executor(place=places[0])
...@@ -94,21 +96,29 @@ class TestBase(unittest.TestCase): ...@@ -94,21 +96,29 @@ class TestBase(unittest.TestCase):
loss_name=loss.name, places=places) loss_name=loss.name, places=places)
step = 0 step = 0
step_list = []
start_t = time.time() start_t = time.time()
if use_legacy_py_reader: if use_legacy_py_reader:
for _ in six.moves.range(EPOCH_NUM): for _ in six.moves.range(EPOCH_NUM):
step = 0
py_reader.start() py_reader.start()
while True: while True:
try: try:
L, = exe.run(program=prog, fetch_list=[loss]) L, = exe.run(program=prog,
fetch_list=[loss],
use_program_cache=True)
# print('runned', step, py_reader.queue.is_closed(), py_reader.queue.size())
step += 1 step += 1
except fluid.core.EOFException: except fluid.core.EOFException:
# print('try to reset')
py_reader.reset() py_reader.reset()
# print('reseted')
break break
step_list.append(step)
else: else:
for _ in six.moves.range(EPOCH_NUM): for _ in six.moves.range(EPOCH_NUM):
step = 0
for d in py_reader(): for d in py_reader():
'''
assert len(d) == len(places) assert len(d) == len(places)
for i, item in enumerate(d): for i, item in enumerate(d):
image = item['image'] image = item['image']
...@@ -117,18 +127,25 @@ class TestBase(unittest.TestCase): ...@@ -117,18 +127,25 @@ class TestBase(unittest.TestCase):
assert label.shape() == [BATCH_SIZE, 1] assert label.shape() == [BATCH_SIZE, 1]
assert image._place()._equals(places[i]) assert image._place()._equals(places[i])
assert label._place()._equals(places[i]) assert label._place()._equals(places[i])
''' L, = exe.run(program=prog,
L, = exe.run(program=prog, feed=d, fetch_list=[loss]) feed=d,
fetch_list=[loss],
use_program_cache=True)
step += 1 step += 1
step_list.append(step)
end_t = time.time() end_t = time.time()
return {"time": end_t - start_t, "step": step} ret = {"time": end_t - start_t, "step": step_list}
scope._remove_from_pool()
def prepare_places(self, with_data_parallel): return ret
places = [[fluid.CPUPlace()], ]
def prepare_places(self, with_data_parallel, with_cpu=False, with_gpu=True):
places = []
if with_cpu:
places.append([fluid.CPUPlace()])
if with_data_parallel: if with_data_parallel:
places.append([fluid.CPUPlace()] * 2) places.append([fluid.CPUPlace()] * 2)
if fluid.core.is_compiled_with_cuda(): if with_gpu and fluid.core.is_compiled_with_cuda():
tmp = fluid.cuda_places() tmp = fluid.cuda_places()
assert len(tmp) > 0, "no gpu detected" assert len(tmp) > 0, "no gpu detected"
if with_data_parallel: if with_data_parallel:
...@@ -140,7 +157,10 @@ class TestBase(unittest.TestCase): ...@@ -140,7 +157,10 @@ class TestBase(unittest.TestCase):
for with_data_parallel in [True, False]: for with_data_parallel in [True, False]:
for p in self.prepare_places(with_data_parallel): for p in self.prepare_places(with_data_parallel):
t = [] t = []
for use_legacy_py_reader in [False, True]: for use_legacy_py_reader in [
False
]: #[True, False]: #[False, True]:
print(p, use_legacy_py_reader)
ret = self.run_main( ret = self.run_main(
use_legacy_py_reader=use_legacy_py_reader, use_legacy_py_reader=use_legacy_py_reader,
with_data_parallel=with_data_parallel, with_data_parallel=with_data_parallel,
...@@ -148,7 +168,7 @@ class TestBase(unittest.TestCase): ...@@ -148,7 +168,7 @@ class TestBase(unittest.TestCase):
ret['legacy'] = use_legacy_py_reader ret['legacy'] = use_legacy_py_reader
ret['data_parallel'] = with_data_parallel ret['data_parallel'] = with_data_parallel
ret['places'] = p ret['places'] = p
t.append(ret) t.append([ret['step'], ]) #, ret['places']])
print(t) print(t)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册