diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index e1d26474e63c8da174bebe3b639f356c2ef655b4..1ec670de07062057ba09e15ac1e4da026d035a53 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -790,101 +790,3 @@ def get_parameter_value_by_name(name, executor, program=None): program = default_main_program() var = program.global_block().var(name) return get_parameter_value(var, executor) - - -def get_test_program(filelist, program=None, startup_program=None): - """ - Transpile current train program to a program to read test dataset - if the program is using reader ops like "open_files_op". - """ - - def _copy_reader_var_(block, var, new_name=None): - if new_name == None: - new_name = var.name - new_var = block.create_var( - name=str(new_name), type=core.VarDesc.VarType.READER) - new_var.desc.set_shapes(var.desc.shapes()) - new_var.desc.set_dtypes(var.desc.dtypes()) - new_var.persistable = True - return new_var - - def _get_test_reader_name(train_reader_name): - return train_reader_name + "_test" - - def _is_reader_op(op): - block = op.block - if "Out" in op.output_names: - reader_out = block.vars[op.output("Out")[0]] - if reader_out.type == core.VarDesc.VarType.READER: - return True - return False - - if program == None: - program = default_main_program() - if startup_program == None: - startup_program = default_startup_program() - startup_block = startup_program.global_block() - - # 1. find out the orignal reader var name - startup_reader_op_list = [] - - for op in startup_block.ops: - if _is_reader_op(op): - startup_reader_op_list.append(op) - - if len(startup_reader_op_list) == 0: - return program - - root_reader_op = startup_reader_op_list[0] - train_test_reader_map = {} - # 2. add operators to startup to read open and read test data files - for op in startup_reader_op_list: - assert (len(op.output("Out")) == 1) - train_reader_name = op.output("Out")[0] - train_reader = startup_block.vars[train_reader_name] - test_reader = _copy_reader_var_( - startup_block, - train_reader, - new_name=_get_test_reader_name(train_reader_name)) - train_test_reader_map[train_reader.name] = test_reader - - test_op_inputs = {} - for name in op.input_names: - train_arg_names = op.input(name) - test_arg_vars = [] - for arg_name in train_arg_names: - arg_var = train_test_reader_map[ - arg_name] if name == "UnderlyingReader" else startup_block.vars[ - arg_name] - test_arg_vars.append(arg_var) - test_op_inputs[name] = test_arg_vars - - test_op = startup_block.append_op( - type=op.type, - inputs=test_op_inputs, - outputs={'Out': [test_reader]}, - attrs=op.attrs) - # root reader op's filelist attr for read test files - if op.type == root_reader_op.type: - test_op.set_attr("file_names", filelist) - if op.type == "create_multi_pass_reader": - test_op.set_attr("pass_num", 1) - - # 3. rename reader vars in inference program to different name - # to avoid read from train data. - main_block = program.global_block() - for var in main_block.vars.values(): - if var.type == core.VarDesc.VarType.READER: - main_block._rename_var( - str(var.name), str(_get_test_reader_name(var.name))) - - for op in main_block.ops: - if op.type == root_reader_op.type: - test_op.set_attr("file_names", filelist) - if op.type == "create_multi_pass_reader": - test_op.set_attr("pass_num", 1) - - startup_program._sync_with_cpp() - program._sync_with_cpp() - - return program diff --git a/python/paddle/fluid/tests/demo/text_classification/.gitignore b/python/paddle/fluid/tests/demo/file_reader/.gitignore similarity index 100% rename from python/paddle/fluid/tests/demo/text_classification/.gitignore rename to python/paddle/fluid/tests/demo/file_reader/.gitignore diff --git a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py similarity index 94% rename from python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py rename to python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py index 8244617711138d590193b2898de5d2f3aeb1e11e..b839e14889884bca8d27586aa8c1d76fba3458c1 100644 --- a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py +++ b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py @@ -35,7 +35,7 @@ if len(sys.argv) == 1: word_dict = paddle.dataset.imdb.word_dict() else: word_dict = load_vocab(sys.argv[1]) -word_dict[""] = len(word_dict) + word_dict[""] = len(word_dict) print "Dict dim = ", len(word_dict) # input text data @@ -50,7 +50,7 @@ feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace()) BATCH_SIZE = 128 train_reader = paddle.batch( paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=10000), + paddle.dataset.imdb.train(word_dict), buf_size=25000), batch_size=BATCH_SIZE) test_reader = paddle.batch( diff --git a/python/paddle/fluid/tests/demo/file_reader/train.py b/python/paddle/fluid/tests/demo/file_reader/train.py new file mode 100644 index 0000000000000000000000000000000000000000..bc3a6dc81d24afec66ed1489aead1cff79a59bca --- /dev/null +++ b/python/paddle/fluid/tests/demo/file_reader/train.py @@ -0,0 +1,138 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import numpy +import sys + +TRAIN_FILES = ['train.recordio'] +TEST_FILES = ['test.recordio'] + +DICT_DIM = 5147 + +# embedding dim +emb_dim = 128 + +# hidden dim +hid_dim = 128 + +# class num +class_dim = 2 + +# epoch num +epoch_num = 10 + + +def build_program(is_train): + file_obj_handle = fluid.layers.io.open_files( + filenames=TRAIN_FILES if is_train else TEST_FILES, + shapes=[[-1, 1], [-1, 1]], + lod_levels=[1, 0], + dtypes=['int64', 'int64']) + + file_obj = fluid.layers.io.double_buffer(file_obj_handle) + + with fluid.unique_name.guard(): + + data, label = fluid.layers.read_file(file_obj) + + emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim]) + + conv_3 = fluid.nets.sequence_conv_pool( + input=emb, + num_filters=hid_dim, + filter_size=3, + act="tanh", + pool_type="sqrt") + + conv_4 = fluid.nets.sequence_conv_pool( + input=emb, + num_filters=hid_dim, + filter_size=4, + act="tanh", + pool_type="sqrt") + + prediction = fluid.layers.fc(input=[conv_3, conv_4], + size=class_dim, + act="softmax") + + # cross entropy loss + cost = fluid.layers.cross_entropy(input=prediction, label=label) + + # mean loss + avg_cost = fluid.layers.mean(x=cost) + acc = fluid.layers.accuracy(input=prediction, label=label) + + if is_train: + # SGD optimizer + sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + return {'loss': avg_cost, 'log': [avg_cost, acc], 'file': file_obj_handle} + + +def main(): + train = fluid.Program() + startup = fluid.Program() + test = fluid.Program() + + with fluid.program_guard(train, startup): + train_args = build_program(is_train=True) + + with fluid.program_guard(test, startup): + test_args = build_program(is_train=False) + + use_cuda = fluid.core.is_compiled_with_cuda() + # startup + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place=place) + exe.run(startup) + + train_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + loss_name=train_args['loss'].name, + main_program=train) + test_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, main_program=test, share_vars_from=train_exe) + + fetch_var_list = [var.name for var in train_args['log']] + for epoch_id in range(epoch_num): + # train + try: + batch_id = 0 + while True: + loss, acc = map(numpy.array, + train_exe.run(fetch_list=fetch_var_list)) + print 'Train epoch', epoch_id, 'batch', batch_id, 'loss:', loss, 'acc:', acc + batch_id += 1 + except fluid.core.EOFException: + print 'End of epoch', epoch_id + train_args['file'].reset() + + # test + loss = [] + acc = [] + try: + while True: + loss_np, acc_np = map(numpy.array, + test_exe.run(fetch_list=fetch_var_list)) + loss.append(loss_np[0]) + acc.append(acc_np[0]) + except: + test_args['file'].reset() + print 'Test loss:', numpy.mean(loss), 'acc:', numpy.mean(acc) + + +if __name__ == '__main__': + main() diff --git a/python/paddle/fluid/tests/demo/text_classification/train.py b/python/paddle/fluid/tests/demo/text_classification/train.py deleted file mode 100644 index 281c2869d642c7fe41a386c42208ca2da1dc2891..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/demo/text_classification/train.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.fluid as fluid -import numpy -import sys - -TRAIN_FILES = ['train.recordio'] -TEST_FILES = ['test.recordio'] - -DICT_DIM = 89528 - -# embedding dim -emb_dim = 128 - -# hidden dim -hid_dim = 128 - -# hidden dim2 -hid_dim2 = 96 - -# class num -class_dim = 2 - - -def network_cfg(is_train, pass_num=100): - with fluid.unique_name.guard(): - train_file_obj = fluid.layers.open_files( - filenames=TRAIN_FILES, - pass_num=pass_num, - shapes=[[-1, 1], [-1, 1]], - lod_levels=[1, 0], - dtypes=['int64', 'int64']) - - test_file_obj = fluid.layers.open_files( - filenames=TEST_FILES, - pass_num=1, - shapes=[[-1, 1], [-1, 1]], - lod_levels=[1, 0], - dtypes=['int64', 'int64']) - - if is_train: - file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) - else: - file_obj = test_file_obj - - file_obj = fluid.layers.double_buffer( - file_obj, - name="train_double_buffer" if is_train else 'test_double_buffer') - - data, label = fluid.layers.read_file(file_obj) - - emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim]) - - # sequence conv with window size = 3 - win_size = 3 - conv_3 = fluid.nets.sequence_conv_pool( - input=emb, - num_filters=hid_dim, - filter_size=win_size, - act="tanh", - pool_type="max") - - # fc layer after conv - fc_1 = fluid.layers.fc(input=[conv_3], size=hid_dim2) - - # probability of each class - prediction = fluid.layers.fc(input=[fc_1], - size=class_dim, - act="softmax") - # cross entropy loss - cost = fluid.layers.cross_entropy(input=prediction, label=label) - - # mean loss - avg_cost = fluid.layers.mean(x=cost) - acc = fluid.layers.accuracy(input=prediction, label=label) - - if is_train: - # SGD optimizer - sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.01) - sgd_optimizer.minimize(avg_cost) - - return { - 'loss': avg_cost, - 'log': [avg_cost, acc], - 'file': train_file_obj if is_train else test_file_obj - } - - -def main(): - train = fluid.Program() - startup = fluid.Program() - - with fluid.program_guard(train, startup): - train_args = network_cfg(is_train=True) - - test = fluid.Program() - - with fluid.program_guard(test, fluid.Program()): - test_args = network_cfg(is_train=False) - - # startup - place = fluid.CUDAPlace(0) - exe = fluid.Executor(place=place) - exe.run(startup) - - train_exe = fluid.ParallelExecutor( - use_cuda=True, loss_name=train_args['loss'].name, main_program=train) - - fetch_var_list = [var.name for var in train_args['log']] - for i in xrange(sys.maxint): - result = map(numpy.array, - train_exe.run(fetch_list=fetch_var_list - if i % 1000 == 0 else [])) - if len(result) != 0: - print 'Train: ', result - - if i % 1000 == 0: - test_exe = fluid.ParallelExecutor( - use_cuda=True, main_program=test, share_vars_from=train_exe) - loss = [] - acc = [] - try: - while True: - loss_np, acc_np = map( - numpy.array, test_exe.run(fetch_list=fetch_var_list)) - loss.append(loss_np[0]) - acc.append(acc_np[0]) - except: - test_args['file'].reset() - print 'TEST: ', numpy.mean(loss), numpy.mean(acc) - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index b21e16439a5070e5f6d763e1617d4cfffe8bd618..76389d916fc39f470a22aed4792bf7b754600436 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -107,44 +107,24 @@ class TestMNIST(TestParallelExecutorBase): label = np.ones(shape=[32, 1], dtype='int64') return img, label - # simple_fc - def check_simple_fc_convergence(self, use_cuda, use_reduce=False): + def _compare_reduce_and_allreduce(self, model, use_cuda, random_data=True): if use_cuda and not core.is_compiled_with_cuda(): return - self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) self.check_network_convergence( - simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) - - img, label = self._init_data() - + model, use_cuda=use_cuda, use_reduce=True) self.check_network_convergence( - simple_fc_net, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=use_reduce) + model, use_cuda=use_cuda, allow_op_delay=True, use_reduce=True) - def check_simple_fc_convergence_with_Reduce(self, use_cuda): - if use_cuda and not core.is_compiled_with_cuda(): - return - self.check_network_convergence( - simple_fc_net, use_cuda=use_cuda, use_reduce=True) - self.check_network_convergence( - simple_fc_net, - use_cuda=use_cuda, - allow_op_delay=True, - use_reduce=True) - - img, label = self._init_data() + img, label = self._init_data(random_data) all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( - simple_fc_net, + model, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, use_reduce=False) reduce_first_loss, reduce_last_loss = self.check_network_convergence( - simple_fc_net, + model, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, @@ -153,7 +133,24 @@ class TestMNIST(TestParallelExecutorBase): for loss in zip(all_reduce_first_loss, reduce_first_loss): self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + self.assertAlmostEquals(loss[0], loss[1], delta=1e-4) + + # simple_fc + def check_simple_fc_convergence(self, use_cuda, use_reduce=False): + if use_cuda and not core.is_compiled_with_cuda(): + return + self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) + self.check_network_convergence( + simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) + + img, label = self._init_data() + + self.check_network_convergence( + simple_fc_net, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + use_reduce=use_reduce) def test_simple_fc(self): # use_cuda @@ -162,8 +159,8 @@ class TestMNIST(TestParallelExecutorBase): def test_simple_fc_with_new_strategy(self): # use_cuda, use_reduce - self.check_simple_fc_convergence_with_Reduce(True) - self.check_simple_fc_convergence_with_Reduce(False) + self._compare_reduce_and_allreduce(simple_fc_net, True) + self._compare_reduce_and_allreduce(simple_fc_net, False) def check_simple_fc_parallel_accuracy(self, use_cuda): if use_cuda and not core.is_compiled_with_cuda(): @@ -209,39 +206,13 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda) - def check_batchnorm_fc_convergence_use_reduce(self, use_cuda): - if use_cuda and not core.is_compiled_with_cuda(): - return - self.check_network_convergence( - fc_with_batchnorm, use_cuda=use_cuda, use_reduce=True) - - img, label = self._init_data() - - all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=False) - reduce_first_loss, reduce_last_loss = self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=True) - - for loss in zip(all_reduce_first_loss, reduce_first_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) - for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-4) - def test_batchnorm_fc(self): self.check_batchnorm_fc_convergence(True) self.check_batchnorm_fc_convergence(False) def test_batchnorm_fc_with_new_strategy(self): - self.check_batchnorm_fc_convergence_use_reduce(True) - self.check_batchnorm_fc_convergence_use_reduce(False) + self._compare_reduce_and_allreduce(fc_with_batchnorm, True) + self._compare_reduce_and_allreduce(fc_with_batchnorm, False) if __name__ == '__main__': diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index fc58703eca73addca109506aa60c0099ff31e1b5..e7698d8c52b411fd0afe919625034107081726b5 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -887,7 +887,8 @@ class DistributeTranspiler(object): # create table optimize block in pserver program table_opt_op = [ op for op in self.optimize_ops - if op.input("Param")[0] == self.table_name + if 'Param' in op.input_names and op.input("Param")[0] == + self.table_name ][0] table_opt_block = pserver_program.create_block(pre_block_idx) # only support sgd now