未验证 提交 d36e13ef 编写于 作者: Y yuyang18

Merge branch 'feature/add_pyreader_demo' into feature/combine_open_files_and_double_buffer

...@@ -168,7 +168,13 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( ...@@ -168,7 +168,13 @@ void ThreadedSSAGraphExecutor::InsertFetchOps(
for (size_t i = 0; i < fetch_tensors.size(); ++i) { for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i]; auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto fetched_var_it = fetched_vars.find(var_name);
PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(),
"Cannot find fetched variable.(Perhaps the main_program "
"is not set to ParallelExecutor)");
auto &vars = fetched_var_it->second;
auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_); auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_);
fetch_ops->emplace_back(op); fetch_ops->emplace_back(op);
......
...@@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader { ...@@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader {
private: private:
void ShutdownImpl() override { void ShutdownImpl() override {
reader_->Shutdown();
buffer_.clear(); buffer_.clear();
iteration_pos_ = 0; iteration_pos_ = 0;
reader_->Shutdown();
} }
void StartImpl() override { void StartImpl() override {
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "ThreadPool.h" #include "ThreadPool.h"
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle { namespace paddle {
...@@ -233,17 +232,12 @@ class OpenFilesOp : public framework::OperatorBase { ...@@ -233,17 +232,12 @@ class OpenFilesOp : public framework::OperatorBase {
container.reset(new OrderedReaderContainer()); container.reset(new OrderedReaderContainer());
} else { } else {
container.reset(new PreemptiveReaderContainer( container.reset(new PreemptiveReaderContainer(
static_cast<size_t>(Attr<int>("thread_num")))); std::min(file_names.size(),
static_cast<size_t>(std::thread::hardware_concurrency()))));
} }
auto reader = out->Reset(
std::make_shared<MultiFileReader>(file_names, std::move(container)); std::make_shared<MultiFileReader>(file_names, std::move(container)));
auto buffer_size = Attr<int>("buffer_size");
if (buffer_size > 1) {
reader = framework::MakeDecoratedReader<BufferedReader>(
reader, platform::CPUPlace(), buffer_size);
}
out->Reset(reader);
} }
}; };
...@@ -259,8 +253,6 @@ class OpenFilesOpMaker : public FileReaderMakerBase { ...@@ -259,8 +253,6 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
An OpenFilesOp creates a MultiFileReader, which is able to An OpenFilesOp creates a MultiFileReader, which is able to
read data multi-threaded from multiple files. read data multi-threaded from multiple files.
)DOC"); )DOC");
AddAttr<int>("thread_num", "Number of thread to read files.");
AddAttr<int>("buffer_size", "The reading buffer of these files.");
} }
}; };
......
...@@ -12,16 +12,16 @@ ...@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import contextlib import contextlib
import multiprocessing
from .. import core
from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program
from ..unique_name import generate as unique_name
from control_flow import BlockGuard from control_flow import BlockGuard
from ..layer_helper import LayerHelper from layer_function_generator import templatedoc
from .. import core
from ..executor import global_scope from ..executor import global_scope
from layer_function_generator import generate_layer_fn, templatedoc from ..framework import convert_np_dtype_to_dtype_, default_main_program, \
import sys default_startup_program
import multiprocessing from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
__all__ = [ __all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv',
...@@ -448,7 +448,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): ...@@ -448,7 +448,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var) return monkey_patch_reader_methods(main_prog_var)
def py_reader(capacity, shapes, dtypes, lod_levels=None): def py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True):
""" """
Create a reader and blocking queue for data feeding in Python Create a reader and blocking queue for data feeding in Python
...@@ -461,10 +466,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): ...@@ -461,10 +466,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
using `close()` method when unused. using `close()` method when unused.
Args: Args:
use_double_buffer(bool): Whether use double buffer or not.
capacity(int): The maximum capacity of the BlockingQueue. capacity(int): The maximum capacity of the BlockingQueue.
shapes(list): List of tuples which declaring data shapes. shapes(list|tuple): List of tuples which declaring data shapes.
dtypes(list): List of strs which declaring data type. dtypes(list|tuple): List of strs which declaring data type.
lod_levels(list): List of ints which declaring data lod_level. lod_levels(list|tuple): List of ints which declaring data lod_level.
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
Returns: Returns:
tuple(Variable, BlockingQueue): tuple(Variable, BlockingQueue):
...@@ -505,15 +513,23 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): ...@@ -505,15 +513,23 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
if lod_levels is None: if lod_levels is None:
lod_levels = [0] * len(shapes) lod_levels = [0] * len(shapes)
queue_name = unique_name('lod_tensor_blocking_queue') if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])
var = global_scope().var(queue_name) var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=unique_name('create_py_reader')) startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op( startup_blk.append_op(
type='create_py_reader', type='create_py_reader',
inputs={'blocking_queue': queue_name}, inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]}, outputs={'Out': [startup_var]},
attrs={ attrs={
'shape_concat': shape_concat, 'shape_concat': shape_concat,
...@@ -527,7 +543,10 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): ...@@ -527,7 +543,10 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
main_prog_var = _copy_reader_var_(default_main_program().current_block(), main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var) startup_var)
return monkey_patch_reader_methods(main_prog_var), feed_queue reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
reader = double_buffer(reader, name=double_buffer_name)
return reader, feed_queue
def open_files(filenames, def open_files(filenames,
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist
import paddle
import paddle.v2
import threading
import numpy
def network(is_train):
reader, queue = fluid.layers.py_reader(
capacity=10,
shapes=((-1, 784), (-1, 1)),
dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader")
img, label = fluid.layers.read_file(reader)
hidden = img
for i in xrange(2):
hidden = fluid.layers.fc(input=hidden, size=100, act='tanh')
hidden = fluid.layers.dropout(
hidden, dropout_prob=0.5, is_test=not is_train)
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
return fluid.layers.mean(loss), queue, reader
def pipe_reader_to_queue(reader_creator, queue):
with fluid.program_guard(fluid.Program(), fluid.Program()):
feeder = fluid.DataFeeder(
feed_list=[
fluid.layers.data(
name='img', dtype='float32', shape=[784]),
fluid.layers.data(
name='label', dtype='int64', shape=[1])
],
place=fluid.CPUPlace())
def __thread_main__():
for data in feeder.decorate_reader(
reader_creator, multi_devices=False)():
tmp = fluid.core.LoDTensorArray()
tmp.append(data['img'])
tmp.append(data['label'])
queue.push(tmp)
queue.close()
th = threading.Thread(target=__thread_main__)
th.start()
return th
def main():
train_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
loss, train_queue, train_reader = network(True)
adam = fluid.optimizer.Adam(learning_rate=0.01)
adam.minimize(loss)
test_prog = fluid.Program()
test_startup = fluid.Program()
with fluid.program_guard(test_prog, test_startup):
with fluid.unique_name.guard():
test_loss, test_queue, test_reader = network(False)
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup)
trainer = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=train_prog)
tester = fluid.ParallelExecutor(
use_cuda=True, share_vars_from=trainer, main_program=test_prog)
for epoch_id in xrange(10):
train_data_thread = pipe_reader_to_queue(
paddle.batch(paddle.v2.reader.firstn(mnist.train(), 32), 64),
train_queue)
try:
while True:
print 'train_loss', numpy.array(
trainer.run(fetch_list=[loss.name]))
except fluid.core.EOFException:
print 'End of epoch', epoch_id
train_reader.reset()
train_data_thread.join()
test_data_thread = pipe_reader_to_queue(
paddle.batch(mnist.test(), 32), test_queue)
try:
while True:
print 'test loss', numpy.array(
tester.run(fetch_list=[test_loss.name]))
except fluid.core.EOFException:
print 'End of testing'
test_reader.reset()
test_data_thread.join()
break
del trainer
del tester
if __name__ == '__main__':
main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册