未验证 提交 adfaf9a6 编写于 作者: W Wu Yi 提交者: GitHub

make transpiler test reliable (#11848)

* make transpiler test reliable

* add more

* follow comments
上级 58560622
...@@ -15,51 +15,248 @@ ...@@ -15,51 +15,248 @@
import unittest import unittest
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import delete_ops from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import traceback
from transpiler_test import TranspilerTest
class TranspilerTest(unittest.TestCase):
class TestDistTranspiler(TranspilerTest):
def setUp(self): def setUp(self):
self.current_pserver_ep = "127.0.0.1:6174" self.trainer_id = 0
self.trainers = 2
self.pservers = 2
# NOTE: we do not actually bind this port
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.pserver1_ep = "127.0.0.1:6174"
self.pserver2_ep = "127.0.0.1:6175"
self.slice_var_up = True
self.sync_mode = True
self.transpiler = None
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
sgd_optimizer.minimize(avg_cost)
return
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
self.origin_prog = main.clone()
return main
def get_trainer(self):
t = self._transpiler_instance()
return t.get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
if not self.transpiler:
main = self.get_main_program()
self.transpiler = fluid.DistributeTranspiler()
self.transpiler.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers,
slice_var_up=self.slice_var_up,
sync_mode=self.sync_mode)
return self.transpiler
class TestBasicModel(TranspilerTest):
def test_transpiler(self): def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
pserver2, startup2 = self.get_pserver(self.pserver2_ep)
trainer = self.get_trainer() trainer = self.get_trainer()
pserver, startup = self.get_pserver(self.current_pserver_ep)
self.assertEqual([op.type for op in trainer.global_block().ops], self.assertEqual([op.type for op in trainer.global_block().ops], [
self.get_expect_trainer_ops()) 'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean',
'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'split_byref', 'send',
'send_barrier', 'recv', 'recv', 'fetch_barrier', 'concat'
])
self.assertEqual(len(pserver.blocks), 3) self.assertEqual(len(pserver.blocks), 3)
# block0: listen_and_serv # block0: listen_and_serv
self.assertEqual([op.type for op in pserver.blocks[0].ops], self.assertEqual([op.type for op in pserver.blocks[0].ops],
["listen_and_serv"]) ["listen_and_serv"])
# block2: optimize pass # block1~2: optimize pass
self.assertEqual([op.type for op in pserver.blocks[1].ops], self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "sgd"]) ["sum", "scale", "sgd"])
# confirm startup program # confirm startup program
self.assertEqual([op.type for op in startup.global_block().ops],
self.assertEqual([op.type for op in startup.global_block().ops], [ ["fill_constant", "fill_constant", "uniform_random"])
"fill_constant", "fill_constant", "uniform_random", "uniform_random"
])
# the variable #fc_w will be split into two blocks # the variable #fc_w will be split into two blocks
fc_w_var = startup.global_block().var("fc_w.block1") fc_w_var = startup.global_block().var("fc_w.block1")
self.assertEqual(fc_w_var.shape, (500, 1000)) self.assertEqual(fc_w_var.shape, (500, 1000))
# all parameters should be optimized on pserver
pserver_params = []
for prog in [pserver, pserver2]:
for blk in prog.blocks:
for op in blk.ops:
if "Param" in op.input_names:
param_name = op.input("Param")[0]
is_block_idx = param_name.find(".block")
if is_block_idx != -1:
origin_param_name = param_name[:is_block_idx]
else:
origin_param_name = param_name
pserver_params.append(origin_param_name)
trainer_params = []
for op in self.origin_prog.global_block().ops:
if "Param" in op.input_names:
trainer_params.append(op.input("Param")[0])
self.assertEqual(set(pserver_params), set(trainer_params))
class TestNoSliceVar(TranspilerTest):
def setUp(self):
super(TestNoSliceVar, self).setUp()
self.slice_var_up = False
def test_transpiler(self):
_, startup = self.get_pserver(self.pserver1_ep)
_, startup2 = self.get_pserver(self.pserver2_ep)
if startup.global_block().vars.has_key("fc_w"):
fc_w_var = startup.global_block().vars["fc_w"]
elif startup2.global_block().vars.has_key("fc_w"):
fc_w_var = startup2.global_block().vars["fc_w"]
self.assertEqual(fc_w_var.shape, (1000, 1000))
def get_expect_trainer_ops(self):
trainer = fluid.Program()
with fluid.program_guard(trainer): class TestLRDecay(TranspilerTest):
optimize_ops, params_grads = self.net_conf() def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay(
learning_rate=1.0,
decay_steps=2100,
decay_rate=0.1,
staircase=True))
sgd_optimizer.minimize(avg_cost)
return
def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer = self.get_trainer()
self.assertEqual(len(pserver.blocks), 4)
lr_decay_ops = [op.type for op in pserver.blocks[1].ops]
self.assertEqual(lr_decay_ops, [
"increment", "cast", "fill_constant", "elementwise_div", "floor",
"fill_constant", "elementwise_pow", "fill_constant",
"elementwise_mul"
])
class TestLRDecayConditional(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(
learning_rate=fluid.layers.piecewise_decay([10000, 20000],
[1.0, 0.5, 1.0]))
sgd_optimizer.minimize(avg_cost)
return
def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer = self.get_trainer()
serv_op = pserver.blocks[0].ops[0]
sub_blocks = []
optimize_blocks = []
for b in serv_op.attrs["optimize_blocks"]:
optimize_blocks.append(b.idx)
for b in pserver.blocks:
if b.idx not in optimize_blocks:
sub_blocks.append(b.idx)
self.assertEqual(len(pserver.blocks), 7)
lr_decay_ops = [op.type for op in pserver.blocks[1].ops]
self.assertEqual(lr_decay_ops, [
"increment", "cast", "fill_constant", "fill_constant", "less_than",
"logical_not", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"conditional_block"
])
# test the condition blocks
for b in sub_blocks:
if b == 0:
continue
block = pserver.blocks[b]
self.assertEqual([op.type for op in block.ops], ["assign"])
class TestL2Decay(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(
input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(
name='fc_w',
regularizer=fluid.regularizer.L2Decay(),
gradient_clip=fluid.clip.GradientClipByValue(0.1)),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
sgd_optimizer.minimize(avg_cost)
return
def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer = self.get_trainer()
self.assertEqual(len(pserver.blocks), 3)
self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "clip", "sgd"])
self.assertEqual(
[op.type for op in pserver.blocks[2].ops],
["sum", "scale", "clip", "scale", "elementwise_add", "sgd"])
# TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer
delete_ops(trainer.global_block(), optimize_ops) # FIXME(typhoonzero): need to add test for async case:
ops = [op.type for op in trainer.global_block().ops] + [ # see https://github.com/PaddlePaddle/Paddle/issues/11691
"split_byref", "send", "send_barrier", "recv", "recv", class TestAsyncSGD(TranspilerTest):
"fetch_barrier", "concat" pass
]
ops.insert(ops.index("elementwise_add_grad") + 1, "send")
return ops
if __name__ == "__main__": if __name__ == "__main__":
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
from transpiler_test import TranspilerTest
class TestSimpleDistTranspiler(TranspilerTest):
def setUp(self):
self.current_pserver_ep = "127.0.0.1:6175"
def test_simple_transpiler(self):
np.random.seed(1)
trainer = self.get_trainer()
pserver, startup = self.get_pserver(self.current_pserver_ep)
self.assertEqual([op.type for op in trainer.global_block().ops],
self.get_expect_trainer_ops())
self.assertEqual(len(pserver.blocks), 2)
# block0: listen_and_serv
self.assertEqual([op.type for op in pserver.blocks[0].ops],
["listen_and_serv"])
# block1: optimize pass
self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "sgd"])
# confirm startup program
self.assertEqual([op.type for op in startup.global_block().ops],
["fill_constant", "uniform_random", "uniform_random"])
# the variable #fc_w will NOT be splited
fc_w_var = startup.global_block().var("fc_w@GRAD")
self.assertEqual(fc_w_var.shape, (1000, 1000))
fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0")
self.assertEqual(fc_w_var.shape, (1000, 1000))
def get_expect_trainer_ops(self):
trainer = fluid.Program()
with fluid.program_guard(trainer):
optimize_ops, params_grads = self.net_conf()
delete_ops(trainer.global_block(), optimize_ops)
ops = [op.type for op in trainer.global_block().ops] + [
"send", "send_barrier", "recv", "recv", "fetch_barrier"
]
ops.insert(ops.index("elementwise_add_grad") + 1, "send")
return ops
def _transpiler_instance(self):
main = self.get_main_program()
t = fluid.DistributeTranspiler()
t.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers,
slice_var_up=False)
return t
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
class TranspilerTest(unittest.TestCase):
@classmethod
def setUpClass(self):
self.trainer_id = 0
self.trainers = 2
self.pservers = 2
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
return optimize_ops, params_grads
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
return main
def get_trainer(self):
return self._transpiler_instance().get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
main = self.get_main_program()
t = fluid.DistributeTranspiler()
t.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers)
return t
...@@ -455,6 +455,8 @@ class DistributeTranspiler(object): ...@@ -455,6 +455,8 @@ class DistributeTranspiler(object):
__append_optimize_op__(op, per_opt_block, grad_to_block_id, __append_optimize_op__(op, per_opt_block, grad_to_block_id,
merged_var, lr_ops) merged_var, lr_ops)
# dedup grad to ids list
grad_to_block_id = list(set(grad_to_block_id))
# append global ops # append global ops
if global_ops: if global_ops:
opt_state_block = pserver_program.create_block( opt_state_block = pserver_program.create_block(
...@@ -960,8 +962,6 @@ class DistributeTranspiler(object): ...@@ -960,8 +962,6 @@ class DistributeTranspiler(object):
if not block_map.has_key(varname): if not block_map.has_key(varname):
block_map[varname] = [] block_map[varname] = []
block_map[varname].append((long(offset), long(size))) block_map[varname].append((long(offset), long(size)))
# Do not remove this important debug message:
print("block map: %s" % block_map)
for varname, splited in block_map.iteritems(): for varname, splited in block_map.iteritems():
orig_var = program.global_block().var(varname) orig_var = program.global_block().var(varname)
...@@ -1401,6 +1401,16 @@ class DistributeTranspiler(object): ...@@ -1401,6 +1401,16 @@ class DistributeTranspiler(object):
break break
return lr_ops return lr_ops
def _is_opt_role_op(self, op):
# NOTE: depend on oprole to find out whether this op is for
# optimize
op_maker = core.op_proto_and_checker_maker
optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize
if op_maker.kOpRoleAttrName() in op.attrs and \
int(op.attrs[op_maker.kOpRoleAttrName()]) == int(optimize_role):
return True
return False
def _get_optimize_pass(self): def _get_optimize_pass(self):
""" """
Get optimizer operators, paramters and gradients from origin_program Get optimizer operators, paramters and gradients from origin_program
...@@ -1413,10 +1423,7 @@ class DistributeTranspiler(object): ...@@ -1413,10 +1423,7 @@ class DistributeTranspiler(object):
params_grads = [] params_grads = []
origin_var_dict = self.origin_program.global_block().vars origin_var_dict = self.origin_program.global_block().vars
for op in block.ops: for op in block.ops:
# NOTE(Yancey1989): we can not use op role to distinguish an optimizer op if self._is_opt_role_op(op):
# or not, because all ops in optimizer sub-graph would
# sign the optimizer op role
if self._is_optimizer_op(op):
opt_ops.append(op) opt_ops.append(op)
# HACK(wuyi): if we find grad vars from input of optimize # HACK(wuyi): if we find grad vars from input of optimize
# ops, we may get the output of clip op. Use syntax "@GRAD" # ops, we may get the output of clip op. Use syntax "@GRAD"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册