未验证 提交 c204f0cc 编写于 作者: Y yuyang18

Refine PyReader

上级 6a46c079
...@@ -13,13 +13,15 @@ ...@@ -13,13 +13,15 @@
# limitations under the License. # limitations under the License.
import contextlib import contextlib
import multiprocessing import multiprocessing
import threading
from ..data_feeder import DataFeeder
from control_flow import BlockGuard from control_flow import BlockGuard
from layer_function_generator import templatedoc from layer_function_generator import templatedoc
from .. import core from .. import core
from ..executor import global_scope from ..executor import global_scope
from ..framework import convert_np_dtype_to_dtype_, default_main_program, \ from ..framework import convert_np_dtype_to_dtype_, default_main_program, \
default_startup_program default_startup_program, program_guard, Program
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name from ..unique_name import generate as unique_name
...@@ -550,7 +552,71 @@ def py_reader(capacity, ...@@ -550,7 +552,71 @@ def py_reader(capacity,
# py_reader. # py_reader.
double_buffer_reader.reset = reader.reset double_buffer_reader.reset = reader.reset
reader = double_buffer_reader reader = double_buffer_reader
return reader, feed_queue
# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp
array.append(item)
feed_queue.push(array)
feed_queue.close()
reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.start()
def __set_tensor_provider__(func):
reader._tensor_provider = func
start_provide_thread(reader._tensor_provider)
def __set_paddle_reader__(reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
reader = feeder.decorate_reader(reader, multi_devices=False)
def __tensor_provider__():
for data in reader():
yield [data[str(idx)] for idx in xrange(counter)]
__set_tensor_provider__(__tensor_provider__)
def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.thread.join()
# restart provider thread.
start_provide_thread(reader.tensor_provider)
reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
return reader
def open_files(filenames, def open_files(filenames,
......
...@@ -12,16 +12,16 @@ ...@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import paddle.fluid as fluid import numpy
import paddle.dataset.mnist as mnist
import paddle import paddle
import paddle.dataset.mnist as mnist
import paddle.fluid as fluid
import paddle.v2 import paddle.v2
import threading
import numpy
def network(is_train): def network(is_train):
reader, queue = fluid.layers.py_reader( reader = fluid.layers.py_reader(
capacity=10, capacity=10,
shapes=((-1, 784), (-1, 1)), shapes=((-1, 784), (-1, 1)),
dtypes=('float32', 'int64'), dtypes=('float32', 'int64'),
...@@ -37,32 +37,7 @@ def network(is_train): ...@@ -37,32 +37,7 @@ def network(is_train):
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label) loss = fluid.layers.cross_entropy(input=prediction, label=label)
return fluid.layers.mean(loss), queue, reader return fluid.layers.mean(loss), reader
def pipe_reader_to_queue(reader_creator, queue):
with fluid.program_guard(fluid.Program(), fluid.Program()):
feeder = fluid.DataFeeder(
feed_list=[
fluid.layers.data(
name='img', dtype='float32', shape=[784]),
fluid.layers.data(
name='label', dtype='int64', shape=[1])
],
place=fluid.CPUPlace())
def __thread_main__():
for data in feeder.decorate_reader(
reader_creator, multi_devices=False)():
tmp = fluid.core.LoDTensorArray()
tmp.append(data['img'])
tmp.append(data['label'])
queue.push(tmp)
queue.close()
th = threading.Thread(target=__thread_main__)
th.start()
return th
def main(): def main():
...@@ -71,7 +46,7 @@ def main(): ...@@ -71,7 +46,7 @@ def main():
with fluid.program_guard(train_prog, startup_prog): with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
loss, train_queue, train_reader = network(True) loss, train_reader = network(True)
adam = fluid.optimizer.Adam(learning_rate=0.01) adam = fluid.optimizer.Adam(learning_rate=0.01)
adam.minimize(loss) adam.minimize(loss)
...@@ -79,7 +54,7 @@ def main(): ...@@ -79,7 +54,7 @@ def main():
test_startup = fluid.Program() test_startup = fluid.Program()
with fluid.program_guard(test_prog, test_startup): with fluid.program_guard(test_prog, test_startup):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
test_loss, test_queue, test_reader = network(False) test_loss, test_reader = network(False)
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) fluid.Executor(fluid.CUDAPlace(0)).run(test_startup)
...@@ -90,10 +65,13 @@ def main(): ...@@ -90,10 +65,13 @@ def main():
tester = fluid.ParallelExecutor( tester = fluid.ParallelExecutor(
use_cuda=True, share_vars_from=trainer, main_program=test_prog) use_cuda=True, share_vars_from=trainer, main_program=test_prog)
train_reader.decorate_paddle_reader(
paddle.v2.reader.shuffle(
paddle.batch(mnist.train(), 256), buf_size=8192))
test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 256))
for epoch_id in xrange(10): for epoch_id in xrange(10):
train_data_thread = pipe_reader_to_queue(
paddle.batch(paddle.v2.reader.firstn(mnist.train(), 32), 64),
train_queue)
try: try:
while True: while True:
print 'train_loss', numpy.array( print 'train_loss', numpy.array(
...@@ -101,10 +79,7 @@ def main(): ...@@ -101,10 +79,7 @@ def main():
except fluid.core.EOFException: except fluid.core.EOFException:
print 'End of epoch', epoch_id print 'End of epoch', epoch_id
train_reader.reset() train_reader.reset()
train_data_thread.join()
test_data_thread = pipe_reader_to_queue(
paddle.batch(mnist.test(), 32), test_queue)
try: try:
while True: while True:
print 'test loss', numpy.array( print 'test loss', numpy.array(
...@@ -113,11 +88,6 @@ def main(): ...@@ -113,11 +88,6 @@ def main():
print 'End of testing' print 'End of testing'
test_reader.reset() test_reader.reset()
test_data_thread.join()
break
del trainer
del tester
if __name__ == '__main__': if __name__ == '__main__':
main() main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册