提交 e90bfd56 编写于 作者: M minqiyang

1. Make a base unittest class for dist transpiler unittest

2. Merge the develop repo
上级 b31647c6
...@@ -12,40 +12,15 @@ ...@@ -12,40 +12,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid.transpiler.distribute_transpiler import delete_ops from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import numpy
from transpiler_test import TranspilerTest
class TestDistTranspiler(unittest.TestCase): class TestDistTranspiler(TranspilerTest):
def setUp(self): def setUp(self):
self.trainer_id = 0
self.trainers = 2
self.pservers = 2
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.current_pserver_ep = "127.0.0.1:6174" self.current_pserver_ep = "127.0.0.1:6174"
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
return optimize_ops, params_grads
def test_transpiler(self): def test_transpiler(self):
trainer = self.get_trainer() trainer = self.get_trainer()
pserver, startup = self.get_pserver(self.current_pserver_ep) pserver, startup = self.get_pserver(self.current_pserver_ep)
...@@ -70,14 +45,6 @@ class TestDistTranspiler(unittest.TestCase): ...@@ -70,14 +45,6 @@ class TestDistTranspiler(unittest.TestCase):
fc_w_var = startup.global_block().var("fc_w.block1") fc_w_var = startup.global_block().var("fc_w.block1")
self.assertEqual(fc_w_var.shape, (500, 1000)) self.assertEqual(fc_w_var.shape, (500, 1000))
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
return main
def get_expect_trainer_ops(self): def get_expect_trainer_ops(self):
trainer = fluid.Program() trainer = fluid.Program()
...@@ -92,25 +59,6 @@ class TestDistTranspiler(unittest.TestCase): ...@@ -92,25 +59,6 @@ class TestDistTranspiler(unittest.TestCase):
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars") ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
return ops return ops
def get_trainer(self):
return self._transpiler_instance().get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
main = self.get_main_program()
t = fluid.DistributeTranspiler()
t.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers)
return t
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -12,40 +12,18 @@ ...@@ -12,40 +12,18 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import unittest import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid.transpiler.distribute_transpiler import delete_ops from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import numpy as np
from transpiler_test import TranspilerTest
class TestSimpleDistTranspiler(unittest.TestCase):
class TestSimpleDistTranspiler(TranspilerTest):
def setUp(self): def setUp(self):
self.trainer_id = 0
self.trainers = 2
self.pservers = 2
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.current_pserver_ep = "127.0.0.1:6175" self.current_pserver_ep = "127.0.0.1:6175"
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
return optimize_ops, params_grads
def test_simple_transpiler(self): def test_simple_transpiler(self):
np.random.seed(1) np.random.seed(1)
...@@ -73,14 +51,6 @@ class TestSimpleDistTranspiler(unittest.TestCase): ...@@ -73,14 +51,6 @@ class TestSimpleDistTranspiler(unittest.TestCase):
fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0") fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0")
self.assertEqual(fc_w_var.shape, (1000, 1000)) self.assertEqual(fc_w_var.shape, (1000, 1000))
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
return main
def get_expect_trainer_ops(self): def get_expect_trainer_ops(self):
trainer = fluid.Program() trainer = fluid.Program()
...@@ -94,15 +64,6 @@ class TestSimpleDistTranspiler(unittest.TestCase): ...@@ -94,15 +64,6 @@ class TestSimpleDistTranspiler(unittest.TestCase):
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars") ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
return ops return ops
def get_trainer(self):
return self._transpiler_instance().get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self): def _transpiler_instance(self):
main = self.get_main_program() main = self.get_main_program()
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
class TranspilerTest(unittest.TestCase):
@classmethod
def setUpClass(self):
self.trainer_id = 0
self.trainers = 2
self.pservers = 2
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
return optimize_ops, params_grads
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
return main
def get_trainer(self):
return self._transpiler_instance().get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
main = self.get_main_program()
t = fluid.DistributeTranspiler()
t.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers)
return t
...@@ -178,7 +178,7 @@ class DistributeTranspiler: ...@@ -178,7 +178,7 @@ class DistributeTranspiler:
for index in range(len(self.pserver_endpoints)) for index in range(len(self.pserver_endpoints))
] ]
def _init_splited_vars(self, split_method): def _init_splited_vars(self, split_method, align_var_to_block=True):
# update these mappings for further transpile: # update these mappings for further transpile:
# 1. param_var_mapping: param var name -> [splited params vars] # 1. param_var_mapping: param var name -> [splited params vars]
# 2. grad_var_mapping: grad var name -> [splited grads vars] # 2. grad_var_mapping: grad var name -> [splited grads vars]
...@@ -198,15 +198,14 @@ class DistributeTranspiler: ...@@ -198,15 +198,14 @@ class DistributeTranspiler:
self.params_grads) self.params_grads)
if align_var_to_block: if align_var_to_block:
grad_blocks = split_dense_variable(grad_list, grad_blocks = split_variable(grad_list, len(self.pserver_endpoints))
len(pserver_endpoints)) param_blocks = split_variable(param_list,
param_blocks = split_dense_variable(param_list, len(self.pserver_endpoints))
len(pserver_endpoints))
else: else:
# when we do NOT align var to block, we will always split params # when we do NOT align var to block, we will always split params
# grads into one block. # grads into one block.
grad_blocks = split_dense_variable(grad_list, 1) grad_blocks = split_variable(grad_list, 1)
param_blocks = split_dense_variable(param_list, 1) param_blocks = split_variable(param_list, 1)
assert (len(grad_blocks) == len(param_blocks)) assert (len(grad_blocks) == len(param_blocks))
# origin_varname -> [splited_var] # origin_varname -> [splited_var]
self.param_var_mapping = self._create_vars_from_blocklist( self.param_var_mapping = self._create_vars_from_blocklist(
...@@ -272,7 +271,7 @@ class DistributeTranspiler: ...@@ -272,7 +271,7 @@ class DistributeTranspiler:
self.has_distributed_lookup_table = self._has_distributed_lookup_table() self.has_distributed_lookup_table = self._has_distributed_lookup_table()
# split and create vars, then put splited vars in dicts for later use. # split and create vars, then put splited vars in dicts for later use.
self._init_splited_vars(split_method) self._init_splited_vars(split_method, align_var_to_block)
# step 3.1: insert send op to send gradient vars to parameter servers # step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher.reset() ps_dispatcher.reset()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册