From adfaf9a6657b677fab796ea223640e1375150fde Mon Sep 17 00:00:00 2001 From: Wu Yi Date: Mon, 2 Jul 2018 17:04:29 +0800 Subject: [PATCH] make transpiler test reliable (#11848) * make transpiler test reliable * add more * follow comments --- .../tests/unittests/test_dist_transpiler.py | 247 ++++++++++++++++-- .../unittests/test_simple_dist_transpiler.py | 80 ------ .../fluid/tests/unittests/transpiler_test.py | 73 ------ .../fluid/transpiler/distribute_transpiler.py | 19 +- 4 files changed, 235 insertions(+), 184 deletions(-) delete mode 100644 python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py delete mode 100644 python/paddle/fluid/tests/unittests/transpiler_test.py diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index b4379ad447e..75b4b4e50da 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -15,51 +15,248 @@ import unittest import paddle.fluid as fluid from paddle.fluid.transpiler.distribute_transpiler import delete_ops +import traceback -from transpiler_test import TranspilerTest - -class TestDistTranspiler(TranspilerTest): +class TranspilerTest(unittest.TestCase): def setUp(self): - self.current_pserver_ep = "127.0.0.1:6174" + self.trainer_id = 0 + self.trainers = 2 + self.pservers = 2 + # NOTE: we do not actually bind this port + self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" + self.pserver1_ep = "127.0.0.1:6174" + self.pserver2_ep = "127.0.0.1:6175" + self.slice_var_up = True + self.sync_mode = True + self.transpiler = None + + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) + sgd_optimizer.minimize(avg_cost) + return + + def get_main_program(self): + main = fluid.Program() + with fluid.program_guard(main): + self.net_conf() + self.origin_prog = main.clone() + return main + + def get_trainer(self): + t = self._transpiler_instance() + return t.get_trainer_program() + + def get_pserver(self, ep): + t = self._transpiler_instance() + pserver = t.get_pserver_program(ep) + startup = t.get_startup_program(ep, pserver) + return pserver, startup + + def _transpiler_instance(self): + if not self.transpiler: + main = self.get_main_program() + self.transpiler = fluid.DistributeTranspiler() + self.transpiler.transpile( + self.trainer_id, + program=main, + pservers=self.pserver_eps, + trainers=self.trainers, + slice_var_up=self.slice_var_up, + sync_mode=self.sync_mode) + return self.transpiler + +class TestBasicModel(TranspilerTest): def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + pserver2, startup2 = self.get_pserver(self.pserver2_ep) + trainer = self.get_trainer() - pserver, startup = self.get_pserver(self.current_pserver_ep) - self.assertEqual([op.type for op in trainer.global_block().ops], - self.get_expect_trainer_ops()) + + self.assertEqual([op.type for op in trainer.global_block().ops], [ + 'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean', + 'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad', + 'elementwise_add_grad', 'send', 'mul_grad', 'split_byref', 'send', + 'send_barrier', 'recv', 'recv', 'fetch_barrier', 'concat' + ]) self.assertEqual(len(pserver.blocks), 3) # block0: listen_and_serv self.assertEqual([op.type for op in pserver.blocks[0].ops], ["listen_and_serv"]) - # block2: optimize pass + # block1~2: optimize pass self.assertEqual([op.type for op in pserver.blocks[1].ops], ["sum", "scale", "sgd"]) - # confirm startup program - - self.assertEqual([op.type for op in startup.global_block().ops], [ - "fill_constant", "fill_constant", "uniform_random", "uniform_random" - ]) - + self.assertEqual([op.type for op in startup.global_block().ops], + ["fill_constant", "fill_constant", "uniform_random"]) # the variable #fc_w will be split into two blocks fc_w_var = startup.global_block().var("fc_w.block1") self.assertEqual(fc_w_var.shape, (500, 1000)) + # all parameters should be optimized on pserver + + pserver_params = [] + for prog in [pserver, pserver2]: + for blk in prog.blocks: + for op in blk.ops: + if "Param" in op.input_names: + param_name = op.input("Param")[0] + is_block_idx = param_name.find(".block") + if is_block_idx != -1: + origin_param_name = param_name[:is_block_idx] + else: + origin_param_name = param_name + pserver_params.append(origin_param_name) + trainer_params = [] + for op in self.origin_prog.global_block().ops: + if "Param" in op.input_names: + trainer_params.append(op.input("Param")[0]) + self.assertEqual(set(pserver_params), set(trainer_params)) + + +class TestNoSliceVar(TranspilerTest): + def setUp(self): + super(TestNoSliceVar, self).setUp() + self.slice_var_up = False + + def test_transpiler(self): + _, startup = self.get_pserver(self.pserver1_ep) + _, startup2 = self.get_pserver(self.pserver2_ep) + + if startup.global_block().vars.has_key("fc_w"): + fc_w_var = startup.global_block().vars["fc_w"] + elif startup2.global_block().vars.has_key("fc_w"): + fc_w_var = startup2.global_block().vars["fc_w"] + + self.assertEqual(fc_w_var.shape, (1000, 1000)) - def get_expect_trainer_ops(self): - trainer = fluid.Program() - with fluid.program_guard(trainer): - optimize_ops, params_grads = self.net_conf() +class TestLRDecay(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=1.0, + decay_steps=2100, + decay_rate=0.1, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + return + + def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + trainer = self.get_trainer() + + self.assertEqual(len(pserver.blocks), 4) + lr_decay_ops = [op.type for op in pserver.blocks[1].ops] + self.assertEqual(lr_decay_ops, [ + "increment", "cast", "fill_constant", "elementwise_div", "floor", + "fill_constant", "elementwise_pow", "fill_constant", + "elementwise_mul" + ]) + + +class TestLRDecayConditional(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.piecewise_decay([10000, 20000], + [1.0, 0.5, 1.0])) + sgd_optimizer.minimize(avg_cost) + return + + def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + trainer = self.get_trainer() + + serv_op = pserver.blocks[0].ops[0] + sub_blocks = [] + optimize_blocks = [] + for b in serv_op.attrs["optimize_blocks"]: + optimize_blocks.append(b.idx) + for b in pserver.blocks: + if b.idx not in optimize_blocks: + sub_blocks.append(b.idx) + + self.assertEqual(len(pserver.blocks), 7) + lr_decay_ops = [op.type for op in pserver.blocks[1].ops] + self.assertEqual(lr_decay_ops, [ + "increment", "cast", "fill_constant", "fill_constant", "less_than", + "logical_not", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "conditional_block" + ]) + # test the condition blocks + for b in sub_blocks: + if b == 0: + continue + block = pserver.blocks[b] + self.assertEqual([op.type for op in block.ops], ["assign"]) + + +class TestL2Decay(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc( + input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr( + name='fc_w', + regularizer=fluid.regularizer.L2Decay(), + gradient_clip=fluid.clip.GradientClipByValue(0.1)), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) + sgd_optimizer.minimize(avg_cost) + return + + def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + trainer = self.get_trainer() + + self.assertEqual(len(pserver.blocks), 3) + self.assertEqual([op.type for op in pserver.blocks[1].ops], + ["sum", "scale", "clip", "sgd"]) + self.assertEqual( + [op.type for op in pserver.blocks[2].ops], + ["sum", "scale", "clip", "scale", "elementwise_add", "sgd"]) + # TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer + - delete_ops(trainer.global_block(), optimize_ops) - ops = [op.type for op in trainer.global_block().ops] + [ - "split_byref", "send", "send_barrier", "recv", "recv", - "fetch_barrier", "concat" - ] - ops.insert(ops.index("elementwise_add_grad") + 1, "send") - return ops + # FIXME(typhoonzero): need to add test for async case: + # see https://github.com/PaddlePaddle/Paddle/issues/11691 +class TestAsyncSGD(TranspilerTest): + pass if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py deleted file mode 100644 index f4aa7426bc3..00000000000 --- a/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np - -import paddle.fluid as fluid -from paddle.fluid.transpiler.distribute_transpiler import delete_ops - -from transpiler_test import TranspilerTest - - -class TestSimpleDistTranspiler(TranspilerTest): - def setUp(self): - self.current_pserver_ep = "127.0.0.1:6175" - - def test_simple_transpiler(self): - np.random.seed(1) - - trainer = self.get_trainer() - pserver, startup = self.get_pserver(self.current_pserver_ep) - self.assertEqual([op.type for op in trainer.global_block().ops], - self.get_expect_trainer_ops()) - - self.assertEqual(len(pserver.blocks), 2) - # block0: listen_and_serv - self.assertEqual([op.type for op in pserver.blocks[0].ops], - ["listen_and_serv"]) - # block1: optimize pass - self.assertEqual([op.type for op in pserver.blocks[1].ops], - ["sum", "scale", "sgd"]) - - # confirm startup program - self.assertEqual([op.type for op in startup.global_block().ops], - ["fill_constant", "uniform_random", "uniform_random"]) - - # the variable #fc_w will NOT be splited - fc_w_var = startup.global_block().var("fc_w@GRAD") - self.assertEqual(fc_w_var.shape, (1000, 1000)) - - fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0") - self.assertEqual(fc_w_var.shape, (1000, 1000)) - - def get_expect_trainer_ops(self): - trainer = fluid.Program() - - with fluid.program_guard(trainer): - optimize_ops, params_grads = self.net_conf() - - delete_ops(trainer.global_block(), optimize_ops) - ops = [op.type for op in trainer.global_block().ops] + [ - "send", "send_barrier", "recv", "recv", "fetch_barrier" - ] - ops.insert(ops.index("elementwise_add_grad") + 1, "send") - return ops - - def _transpiler_instance(self): - main = self.get_main_program() - t = fluid.DistributeTranspiler() - t.transpile( - self.trainer_id, - program=main, - pservers=self.pserver_eps, - trainers=self.trainers, - slice_var_up=False) - return t - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/transpiler_test.py b/python/paddle/fluid/tests/unittests/transpiler_test.py deleted file mode 100644 index d84c5d9c41c..00000000000 --- a/python/paddle/fluid/tests/unittests/transpiler_test.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import numpy as np - -import paddle.fluid as fluid -import paddle.fluid.core as core -import paddle.fluid.layers as layers - - -class TranspilerTest(unittest.TestCase): - @classmethod - def setUpClass(self): - self.trainer_id = 0 - self.trainers = 2 - self.pservers = 2 - self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" - - def net_conf(self): - x = fluid.layers.data(name='x', shape=[1000], dtype='float32') - - y_predict = fluid.layers.fc(input=x, - size=1000, - act=None, - param_attr=fluid.ParamAttr(name='fc_w')) - - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - - cost = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_cost = fluid.layers.mean(cost) - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) - - optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) - return optimize_ops, params_grads - - def get_main_program(self): - main = fluid.Program() - - with fluid.program_guard(main): - self.net_conf() - - return main - - def get_trainer(self): - return self._transpiler_instance().get_trainer_program() - - def get_pserver(self, ep): - t = self._transpiler_instance() - pserver = t.get_pserver_program(ep) - startup = t.get_startup_program(ep, pserver) - return pserver, startup - - def _transpiler_instance(self): - main = self.get_main_program() - t = fluid.DistributeTranspiler() - t.transpile( - self.trainer_id, - program=main, - pservers=self.pserver_eps, - trainers=self.trainers) - return t diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 343901cda3f..05fed72ee64 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -455,6 +455,8 @@ class DistributeTranspiler(object): __append_optimize_op__(op, per_opt_block, grad_to_block_id, merged_var, lr_ops) + # dedup grad to ids list + grad_to_block_id = list(set(grad_to_block_id)) # append global ops if global_ops: opt_state_block = pserver_program.create_block( @@ -960,8 +962,6 @@ class DistributeTranspiler(object): if not block_map.has_key(varname): block_map[varname] = [] block_map[varname].append((long(offset), long(size))) - # Do not remove this important debug message: - print("block map: %s" % block_map) for varname, splited in block_map.iteritems(): orig_var = program.global_block().var(varname) @@ -1401,6 +1401,16 @@ class DistributeTranspiler(object): break return lr_ops + def _is_opt_role_op(self, op): + # NOTE: depend on oprole to find out whether this op is for + # optimize + op_maker = core.op_proto_and_checker_maker + optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize + if op_maker.kOpRoleAttrName() in op.attrs and \ + int(op.attrs[op_maker.kOpRoleAttrName()]) == int(optimize_role): + return True + return False + def _get_optimize_pass(self): """ Get optimizer operators, paramters and gradients from origin_program @@ -1413,10 +1423,7 @@ class DistributeTranspiler(object): params_grads = [] origin_var_dict = self.origin_program.global_block().vars for op in block.ops: - # NOTE(Yancey1989): we can not use op role to distinguish an optimizer op - # or not, because all ops in optimizer sub-graph would - # sign the optimizer op role - if self._is_optimizer_op(op): + if self._is_opt_role_op(op): opt_ops.append(op) # HACK(wuyi): if we find grad vars from input of optimize # ops, we may get the output of clip op. Use syntax "@GRAD" -- GitLab