未验证 提交 fecbe522 编写于 作者: Y yuyang18

Rewrite open_files

上级 ba997b8c
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include <cmath> #include <cmath>
#include <stdexcept>
#include <thread> // NOLINT #include <thread> // NOLINT
#include "ThreadPool.h" #include "ThreadPool.h"
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
...@@ -77,6 +78,7 @@ class PreemptiveReaderContainer : public IReaderContainer { ...@@ -77,6 +78,7 @@ class PreemptiveReaderContainer : public IReaderContainer {
struct FutureItem { struct FutureItem {
std::vector<framework::LoDTensor> data_; std::vector<framework::LoDTensor> data_;
ReaderList::iterator reader_it_; ReaderList::iterator reader_it_;
std::exception_ptr exception_;
}; };
using FutureList = std::list<std::future<FutureItem>>; using FutureList = std::list<std::future<FutureItem>>;
...@@ -115,7 +117,15 @@ class PreemptiveReaderContainer : public IReaderContainer { ...@@ -115,7 +117,15 @@ class PreemptiveReaderContainer : public IReaderContainer {
if (!pending_.empty()) { if (!pending_.empty()) {
auto future_it = complete_queue_.Pop(); auto future_it = complete_queue_.Pop();
FutureItem item = future_it->get(); FutureItem item = future_it->get();
if (item.data_.empty()) { // reader done. if (item.exception_) {
for (auto it = futures_.begin(); it != futures_.end(); ++it) {
if (it != future_it) {
it->wait(); // Wait all other threads complete.
}
}
std::rethrow_exception(item.exception_);
} else if (item.data_.empty()) { // reader done.
done_.emplace_back(std::move(*item.reader_it_)); done_.emplace_back(std::move(*item.reader_it_));
pending_.erase(item.reader_it_); pending_.erase(item.reader_it_);
futures_.erase(future_it); futures_.erase(future_it);
...@@ -131,8 +141,8 @@ class PreemptiveReaderContainer : public IReaderContainer { ...@@ -131,8 +141,8 @@ class PreemptiveReaderContainer : public IReaderContainer {
} }
private: private:
void AppendReader(std::unique_ptr<framework::ReaderBase>&& readers) override { void AppendReader(std::unique_ptr<framework::ReaderBase>&& reader) override {
pending_.emplace_back(); pending_.emplace_back(std::move(reader));
auto reader_it = pending_.end(); auto reader_it = pending_.end();
--reader_it; --reader_it;
...@@ -147,15 +157,22 @@ class PreemptiveReaderContainer : public IReaderContainer { ...@@ -147,15 +157,22 @@ class PreemptiveReaderContainer : public IReaderContainer {
FutureList::iterator* future_it_ptr) { FutureList::iterator* future_it_ptr) {
auto& future_it = *future_it_ptr; auto& future_it = *future_it_ptr;
*future_it = pool_.enqueue([reader_it, future_it, this] { *future_it = pool_.enqueue([reader_it, future_it, this] {
FutureItem item; try {
item.reader_it_ = reader_it; FutureItem item;
(*reader_it)->ReadNext(&item.data_); item.reader_it_ = reader_it;
if (item.data_.empty()) { (*reader_it)->ReadNext(&item.data_);
(*reader_it)->Shutdown(); if (item.data_.empty()) {
(*reader_it)->Start(); (*reader_it)->Shutdown();
(*reader_it)->Start();
}
complete_queue_.Push(future_it);
return item;
} catch (...) {
FutureItem item;
item.exception_ = std::current_exception();
complete_queue_.Push(future_it);
return item;
} }
complete_queue_.Push(future_it);
return item;
}); });
} }
......
...@@ -28,6 +28,7 @@ Scanner::Scanner(std::unique_ptr<std::istream> &&stream) ...@@ -28,6 +28,7 @@ Scanner::Scanner(std::unique_ptr<std::istream> &&stream)
Scanner::Scanner(const std::string &filename) Scanner::Scanner(const std::string &filename)
: stream_(new std::ifstream(filename)), parser_(*stream_) { : stream_(new std::ifstream(filename)), parser_(*stream_) {
PADDLE_ENFORCE(static_cast<bool>(*stream_), "Cannot open file %s", filename);
Reset(); Reset();
} }
......
...@@ -20,6 +20,7 @@ from control_flow import BlockGuard ...@@ -20,6 +20,7 @@ from control_flow import BlockGuard
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
from ..executor import global_scope from ..executor import global_scope
from layer_function_generator import generate_layer_fn, templatedoc from layer_function_generator import generate_layer_fn, templatedoc
import sys
__all__ = [ __all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv',
...@@ -532,10 +533,10 @@ def open_files(filenames, ...@@ -532,10 +533,10 @@ def open_files(filenames,
shapes, shapes,
lod_levels, lod_levels,
dtypes, dtypes,
thread_num=1, thread_num=None,
buffer_size=None, buffer_size=None,
pass_num=1, pass_num=1,
for_parallel=True): is_test=None):
""" """
Open files Open files
...@@ -548,14 +549,15 @@ def open_files(filenames, ...@@ -548,14 +549,15 @@ def open_files(filenames,
shapes(list): List of tuples which declaring data shapes. shapes(list): List of tuples which declaring data shapes.
lod_levels(list): List of ints which declaring data lod_level. lod_levels(list): List of ints which declaring data lod_level.
dtypes(list): List of strs which declaring data type. dtypes(list): List of strs which declaring data type.
thread_num(int): The maximal concurrent prefetch thread number. thread_num(None): Deprecated argument. It will be set by open_files
buffer_size(int|None): The size of prefetch buffer. If it is setted None, automatically.
buffer size will be thread_num * 3. buffer_size(None): Deprecated argument. It will be set by open_files
Default: None automatically.
pass_num(int): Number of passes to run. pass_num(int): Number of passes to run.
for_parallel(Bool): Set it as True if you are going to run is_test(bool|None): Whether `open_files` used for testing or not. If it
subsequent operators in parallel. is used for testing, the order of data generated is same as the file
Default: True order. Otherwise, it is not guaranteed the order of data is same
between every epoch. [Default: False].
Returns: Returns:
Variable: A Reader Variable via which we can get file data. Variable: A Reader Variable via which we can get file data.
...@@ -567,15 +569,20 @@ def open_files(filenames, ...@@ -567,15 +569,20 @@ def open_files(filenames,
'./data2.recordio'], './data2.recordio'],
shapes=[(3,224,224), (1)], shapes=[(3,224,224), (1)],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64'], dtypes=['float32', 'int64'])
thread_num=2,
buffer_size=2)
# Via the reader, we can use 'read_file' layer to get data: # Via the reader, we can use 'read_file' layer to get data:
image, label = fluid.layers.io.read_file(reader) image, label = fluid.layers.io.read_file(reader)
""" """
if buffer_size is None: if thread_num is not None:
buffer_size = thread_num * 3 print >> sys.stderr, "thread_num parameter of open_files is " \
"deprecated. It will be ignored and set " \
"automatically by open_files "
if buffer_size is not None:
print >> sys.stderr, "buffer_size parameter of open_files is " \
"deprecated. It will be ignored and set " \
"automatically by open_files "
if isinstance(filenames, basestring): if isinstance(filenames, basestring):
filenames = [filenames] filenames = [filenames]
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
...@@ -589,17 +596,16 @@ def open_files(filenames, ...@@ -589,17 +596,16 @@ def open_files(filenames,
multi_file_reader_name = unique_name('multi_file_reader') multi_file_reader_name = unique_name('multi_file_reader')
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_reader = startup_blk.create_var(name=multi_file_reader_name) startup_reader = startup_blk.create_var(name=multi_file_reader_name)
attrs = {
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'file_names': filenames
}
if is_test is not None:
attrs['is_test'] = is_test
startup_blk.append_op( startup_blk.append_op(
type='open_files', type='open_files', outputs={'Out': [startup_reader]}, attrs=attrs)
outputs={'Out': [startup_reader]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'file_names': filenames,
'thread_num': thread_num,
'buffer_size': buffer_size
})
startup_reader.desc.set_dtypes(dtypes) startup_reader.desc.set_dtypes(dtypes)
startup_reader.persistable = True startup_reader.persistable = True
......
...@@ -31,7 +31,10 @@ def load_vocab(filename): ...@@ -31,7 +31,10 @@ def load_vocab(filename):
# load word dict with paddle inner function # load word dict with paddle inner function
word_dict = load_vocab(sys.argv[1]) if len(sys.argv) == 1:
word_dict = paddle.dataset.imdb.word_dict()
else:
word_dict = load_vocab(sys.argv[1])
word_dict["<unk>"] = len(word_dict) word_dict["<unk>"] = len(word_dict)
print "Dict dim = ", len(word_dict) print "Dict dim = ", len(word_dict)
......
...@@ -41,16 +41,14 @@ def network_cfg(is_train, pass_num=100): ...@@ -41,16 +41,14 @@ def network_cfg(is_train, pass_num=100):
pass_num=pass_num, pass_num=pass_num,
shapes=[[-1, 1], [-1, 1]], shapes=[[-1, 1], [-1, 1]],
lod_levels=[1, 0], lod_levels=[1, 0],
dtypes=['int64', 'int64'], dtypes=['int64', 'int64'])
thread_num=1)
test_file_obj = fluid.layers.open_files( test_file_obj = fluid.layers.open_files(
filenames=TEST_FILES, filenames=TEST_FILES,
pass_num=1, pass_num=1,
shapes=[[-1, 1], [-1, 1]], shapes=[[-1, 1], [-1, 1]],
lod_levels=[1, 0], lod_levels=[1, 0],
dtypes=['int64', 'int64'], dtypes=['int64', 'int64'])
thread_num=1)
if is_train: if is_train:
file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000)
......
...@@ -39,17 +39,17 @@ class TestMultipleReader(unittest.TestCase): ...@@ -39,17 +39,17 @@ class TestMultipleReader(unittest.TestCase):
copyfile('./mnist_0.recordio', './mnist_1.recordio') copyfile('./mnist_0.recordio', './mnist_1.recordio')
copyfile('./mnist_0.recordio', './mnist_2.recordio') copyfile('./mnist_0.recordio', './mnist_2.recordio')
def main(self, thread_num): def main(self, is_test=False):
file_list = [ file_list = [
'./mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio' './mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio'
] ]
with fluid.program_guard(fluid.Program(), fluid.Program()): with fluid.program_guard(fluid.Program(), fluid.Program()):
data_files = fluid.layers.open_files( data_files = fluid.layers.open_files(
filenames=file_list, filenames=file_list,
thread_num=thread_num,
shapes=[(-1, 784), (-1, 1)], shapes=[(-1, 784), (-1, 1)],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'],
is_test=is_test)
img, label = fluid.layers.read_file(data_files) img, label = fluid.layers.read_file(data_files)
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
...@@ -71,6 +71,9 @@ class TestMultipleReader(unittest.TestCase): ...@@ -71,6 +71,9 @@ class TestMultipleReader(unittest.TestCase):
self.assertEqual(batch_count, self.num_batch * 3) self.assertEqual(batch_count, self.num_batch * 3)
def test_main(self): def test_main(self):
self.main(thread_num=3) # thread number equals to file number self.main(is_test=False)
self.main(thread_num=10) # thread number is larger than file number self.main(is_test=True)
self.main(thread_num=2) # thread number is less than file number
if __name__ == '__main__':
unittest.main()
...@@ -32,9 +32,7 @@ def simple_fc_net(use_feed): ...@@ -32,9 +32,7 @@ def simple_fc_net(use_feed):
filenames=[MNIST_RECORDIO_FILE], filenames=[MNIST_RECORDIO_FILE],
shapes=[[-1, 784], [-1, 1]], shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64'], dtypes=['float32', 'int64'])
thread_num=1,
for_parallel=True)
reader = fluid.layers.io.double_buffer(reader) reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader) img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
...@@ -60,9 +58,7 @@ def fc_with_batchnorm(use_feed): ...@@ -60,9 +58,7 @@ def fc_with_batchnorm(use_feed):
filenames=[MNIST_RECORDIO_FILE], filenames=[MNIST_RECORDIO_FILE],
shapes=[[-1, 784], [-1, 1]], shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64'], dtypes=['float32', 'int64'])
thread_num=1,
for_parallel=True)
reader = fluid.layers.io.double_buffer(reader) reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader) img, label = fluid.layers.read_file(reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册