diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index e35775ed18b8e304c8f861f4fd28df27d813e8a5..bb4856e86a78bd262350d98aff0359de967e420c 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -65,9 +65,8 @@ class CreateCustomReaderOp : public framework::OperatorBase { }; class CreateCustomReaderOpMaker : public DecoratedReaderMakerBase { - public: - CreateCustomReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) - : DecoratedReaderMakerBase(op_proto, op_checker) { + protected: + void Apply() override { AddAttr("sub_block", ""); AddAttr>("source_var_names", ""); AddAttr>("sink_var_names", ""); @@ -86,13 +85,14 @@ class CustomReaderInferShape : public framework::InferShapeBase { "compile time."); PADDLE_ENFORCE(ctx->HasOutput("Out"), "The output decorated reader should not be null."); + const auto* sub_block = + ctx->Attrs().Get("sub_block"); const auto sink_var_names = ctx->Attrs().Get>("sink_var_names"); std::vector> res_dims; std::vector res_lod_levels; for (const std::string& var_name : sink_var_names) { - auto* sink_var = - boost::get(ctx->GetVarPtr(var_name)); + auto* sink_var = sub_block->FindVar(var_name); PADDLE_ENFORCE_NOT_NULL(sink_var); res_dims.emplace_back(sink_var->GetShape()); res_lod_levels.push_back(sink_var->GetLoDLevel()); @@ -114,9 +114,11 @@ class CustomReaderInferVarType : public framework::VarTypeInference { auto sink_var_names = boost::get>(op_desc.GetAttr("sink_var_names")); + const auto* sub_block = + boost::get(op_desc.GetAttr("sub_block")); std::vector res_data_types; for (const std::string& var_name : sink_var_names) { - framework::VarDesc* var = block->FindVar(var_name); + framework::VarDesc* var = sub_block->FindVar(var_name); PADDLE_ENFORCE_NOT_NULL(var); res_data_types.emplace_back(var->GetDataType()); } @@ -152,8 +154,7 @@ void CustomReader::ReadNext(std::vector* out) { framework::Executor executor(dev_place_); framework::ProgramDesc* program = sub_block_.Program(); framework::Scope* exe_scope = &scope_.NewScope(); - executor.Run(*program, exe_scope, sub_block_.ID(), - false /*create_local_scope*/, true); + executor.Run(*program, exe_scope, sub_block_.ID(), false, true); scope_.DeleteScope(exe_scope); // 3. Copy LoDTensors from sink variables to out. out->resize(sink_var_names_.size()); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 4d6ee3c51b7cccdaa3303b5a4cd8e7219b753ccb..b48bfc9ece70ad4af4d6784481fc4a91a09735e1 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib from .. import core from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program @@ -21,7 +22,8 @@ from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer' + 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', + 'Preprocessor' ] @@ -468,8 +470,6 @@ def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None): inputs={'UnderlyingReader': reader}, outputs={'Out': [new_reader]}, attrs=attrs) - new_reader.persistable = True - new_reader.stop_gradient = True return monkey_patch_reader_methods(new_reader) @@ -514,3 +514,81 @@ def read_file(file_obj): return out[0] else: return out + + +class Preprocessor(object): + BEFORE_SUB_BLOCK = 0 + IN_SUB_BLOCK = 1 + AFTER_SUB_BLOCK = 2 + + def __init__(self, reader, name=None): + self.underlying_reader = reader + new_reader_name = name if name is not None else unique_name( + "create_custom_reader") + self.main_prog = default_main_program() + self.reader = self.main_prog.current_block().create_var( + name=new_reader_name) + self.sub_block = None + self.source_var_names = None + self.sink_var_names = None + self.status = Preprocessor.BEFORE_SUB_BLOCK + + def is_completed(self): + return self.sub_block and self.source_var_names and self.sink_var_names + + @contextlib.contextmanager + def block(self): + self.status = Preprocessor.IN_SUB_BLOCK + self.sub_block = self.main_prog.create_block() + yield + self.main_prog.rollback() + self.status = Preprocessor.AFTER_SUB_BLOCK + if not self.is_completed(): + raise RuntimeError( + "The definition of preprocessor is incompleted! " + "Please make sure that you have set input and output " + "variables by invoking 'inputs' and 'outputs' in " + "Preprocessor's sub-block.") + + def inputs(self): + if self.status != Preprocessor.IN_SUB_BLOCK: + raise RuntimeError( + "Preprocessor.inputs() can only be invoked inside the sub-block." + ) + + source_shapes = self.underlying_reader.desc.shapes() + source_dtypes = self.underlying_reader.desc.dtypes() + source_lod_levels = self.underlying_reader.desc.lod_levels() + self.source_var_names = [] + source_vars = [] + for idx in xrange(len(source_shapes)): + self.source_var_names.append(unique_name("preprocessor_source")) + source_vars.append(self.main_prog.current_block().create_var( + name=self.source_var_names[-1], + shape=source_shapes[idx], + dtype=source_dtypes[idx], + lod_level=source_lod_levels[idx])) + return source_vars + + def outputs(self, *outs): + if self.status != Preprocessor.IN_SUB_BLOCK: + raise RuntimeError( + "Preprocessor.outputs() can only be invoked inside the sub-block." + ) + self.sink_var_names = [var.name for var in outs] + + def __call__(self, *args, **kwargs): + if self.status != Preprocessor.AFTER_SUB_BLOCK: + raise RuntimeError( + "Preprocessor output can only be retrieved after rnn block.") + + self.main_prog.current_block().append_op( + type="create_custom_reader", + inputs={'UnderlyingReader': self.underlying_reader}, + outputs={'Out': [self.reader]}, + attrs={ + "sub_block": self.sub_block, + "source_var_names": self.source_var_names, + "sink_var_names": self.sink_var_names + }) + return monkey_patch_reader_methods(self.reader) diff --git a/python/paddle/fluid/tests/unittests/test_preprocessor.py b/python/paddle/fluid/tests/unittests/test_preprocessor.py new file mode 100644 index 0000000000000000000000000000000000000000..8d6905987084fb0129515d0edeb426b00e73fa59 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_preprocessor.py @@ -0,0 +1,66 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.fluid as fluid +import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist + + +class TestPreprocessor(unittest.TestCase): + def setUp(self): + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch(mnist.train(), batch_size=32) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( + './mnist_for_preprocessor_test.recordio', reader, feeder) + + def test_main(self): + with fluid.program_guard(fluid.Program(), fluid.Program()): + data_file = fluid.layers.io.open_recordio_file( + './mnist_for_preprocessor_test.recordio', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + preprocessor = fluid.layers.io.Preprocessor(reader=data_file) + with preprocessor.block(): + img, lbl = preprocessor.inputs() + img_out = img / 2 + lbl_out = lbl + 1 + preprocessor.outputs(img_out, lbl_out) + + img_before, lbl_before = fluid.layers.io.read_file(data_file) + img_after, lbl_after = fluid.layers.io.read_file(preprocessor()) + + if fluid.core.is_compiled_with_cuda(): + place = fluid.CUDAPlace(0) + else: + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + for _ in range(5): + img_b, lbl_b, img_a, lbl_a = exe.run( + fetch_list=[img_before, lbl_before, img_after, lbl_after]) + + self.assertEqual(img_b / 2, img_a) + self.assertEqual(lbl_b + 1, lbl_a)