From 069ff14756f798f52f4af746588014b93d01f839 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 13 Aug 2018 15:52:16 +0800 Subject: [PATCH] polish dist unitest --- .../fluid/tests/unittests/dist_mnist.py | 103 +++++++++ .../fluid/tests/unittests/dist_se_resnext.py | 186 ++++------------- .../fluid/tests/unittests/dist_word2vec.py | 119 +++++++++++ .../fluid/tests/unittests/test_dist_base.py | 113 +++++++++- .../fluid/tests/unittests/test_dist_mnist.py | 195 +----------------- .../tests/unittests/test_dist_se_resnext.py | 2 +- .../tests/unittests/test_dist_word2vec.py | 187 +---------------- 7 files changed, 377 insertions(+), 528 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/dist_mnist.py create mode 100644 python/paddle/fluid/tests/unittests/dist_word2vec.py diff --git a/python/paddle/fluid/tests/unittests/dist_mnist.py b/python/paddle/fluid/tests/unittests/dist_mnist.py new file mode 100644 index 00000000000..8f5ba33f7cb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_mnist.py @@ -0,0 +1,103 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import time +import math + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from functools import reduce +from test_dist_base import TestDistRunnerBase, runtime_main + +DTYPE = "float32" +paddle.dataset.mnist.fetch() + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale, seed=1))) + return predict + + +class TestDistMnist2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + # Input data + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + predict = cnn_model(images) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + # Optimization + opt = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, beta1=0.9, beta2=0.999) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.train(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + opt.minimize(avg_cost) + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +if __name__ == "__main__": + runtime_main(TestDistMnist2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_se_resnext.py b/python/paddle/fluid/tests/unittests/dist_se_resnext.py index 62ca67ae4c0..d576a173ce2 100644 --- a/python/paddle/fluid/tests/unittests/dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/dist_se_resnext.py @@ -27,6 +27,7 @@ from multiprocessing import Process import os import sys import signal +from test_dist_base import TestDistRunnerBase, runtime_main # Fix seed for test fluid.default_startup_program().random_seed = 1 @@ -196,161 +197,52 @@ class SE_ResNeXt(): return scale -def get_model(batch_size): - # Input data - image = fluid.layers.data(name="data", shape=[3, 224, 224], dtype='float32') - label = fluid.layers.data(name="int64", shape=[1], dtype='int64') +class DistSeResneXt2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + # Input data + image = fluid.layers.data( + name="data", shape=[3, 224, 224], dtype='float32') + label = fluid.layers.data(name="int64", shape=[1], dtype='int64') - # Train program - model = SE_ResNeXt(layers=50) - out = model.net(input=image, class_dim=102) - cost = fluid.layers.cross_entropy(input=out, label=label) + # Train program + model = SE_ResNeXt(layers=50) + out = model.net(input=image, class_dim=102) + cost = fluid.layers.cross_entropy(input=out, label=label) - avg_cost = fluid.layers.mean(x=cost) - acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1) - acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5) + avg_cost = fluid.layers.mean(x=cost) + acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1) + acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5) - # Evaluator - test_program = fluid.default_main_program().clone(for_test=True) + # Evaluator + test_program = fluid.default_main_program().clone(for_test=True) - # Optimization - total_images = 6149 # flowers - epochs = [30, 60, 90] - step = int(total_images / batch_size + 1) + # Optimization + total_images = 6149 # flowers + epochs = [30, 60, 90] + step = int(total_images / batch_size + 1) - bd = [step * e for e in epochs] - base_lr = 0.1 - lr = [] - lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + bd = [step * e for e in epochs] + base_lr = 0.1 + lr = [] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] - optimizer = fluid.optimizer.Momentum( - # FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed. - #learning_rate=fluid.layers.piecewise_decay( - # boundaries=bd, values=lr), - learning_rate=base_lr, - momentum=0.9, - regularization=fluid.regularizer.L2Decay(1e-4)) - optimizer.minimize(avg_cost) + optimizer = fluid.optimizer.Momentum( + # FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed. + #learning_rate=fluid.layers.piecewise_decay( + # boundaries=bd, values=lr), + learning_rate=base_lr, + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + optimizer.minimize(avg_cost) - # Reader - train_reader = paddle.batch( - paddle.dataset.flowers.train(), batch_size=batch_size) - test_reader = paddle.batch( - paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size) + # Reader + train_reader = paddle.batch( + paddle.dataset.flowers.train(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size) - return test_program, avg_cost, train_reader, test_reader, acc_top1, out - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -class DistSeResneXt2x2: - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): - get_model(batch_size=2) - t = get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - exe.run(pserver_prog) - - def _wait_ps_ready(self, pid): - retry_times = 20 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(3) - print("waiting ps ready: ", pid) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): - test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model( - batch_size=2) - if is_dist: - t = get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers) - trainer_prog = t.get_trainer_program() - else: - trainer_prog = fluid.default_main_program() - - startup_exe = fluid.Executor(place) - startup_exe.run(fluid.default_startup_program()) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - strategy.allow_op_delay = False - exe = fluid.ParallelExecutor( - True, loss_name=avg_cost.name, exec_strategy=strategy) - - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] - - feeder = fluid.DataFeeder(feed_var_list, place) - reader_generator = test_reader() - - data = next(reader_generator) - first_loss, = exe.run(fetch_list=[avg_cost.name], - feed=feeder.feed(data)) - print(first_loss) - - for i in six.moves.xrange(5): - data = next(reader_generator) - loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) - - data = next(reader_generator) - last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) - print(last_loss) - - -def main(role="pserver", - endpoints="127.0.0.1:9123", - trainer_id=0, - current_endpoint="127.0.0.1:9123", - trainers=1, - is_dist=True): - model = DistSeResneXt2x2() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) - else: - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + return test_program, avg_cost, train_reader, test_reader, acc_top1, out if __name__ == "__main__": - if len(sys.argv) != 7: - print( - "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" - ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False - main( - role=role, - endpoints=endpoints, - trainer_id=trainer_id, - current_endpoint=current_endpoint, - trainers=trainers, - is_dist=is_dist) + runtime_main(DistSeResneXt2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_word2vec.py b/python/paddle/fluid/tests/unittests/dist_word2vec.py new file mode 100644 index 00000000000..54a70f4adb4 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_word2vec.py @@ -0,0 +1,119 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import time +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from test_dist_base import TestDistRunnerBase, runtime_main + +IS_SPARSE = True +EMBED_SIZE = 32 +HIDDEN_SIZE = 256 +N = 5 + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +class TestDistWord2vec2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + BATCH_SIZE = batch_size + + def __network__(words): + embed_first = fluid.layers.embedding( + input=words[0], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_second = fluid.layers.embedding( + input=words[1], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_third = fluid.layers.embedding( + input=words[2], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_forth = fluid.layers.embedding( + input=words[3], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + + concat_embed = fluid.layers.concat( + input=[embed_first, embed_second, embed_third, embed_forth], + axis=1) + hidden1 = fluid.layers.fc( + input=concat_embed, + size=HIDDEN_SIZE, + act='sigmoid', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant())) + predict_word = fluid.layers.fc( + input=hidden1, + size=dict_size, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant())) + cost = fluid.layers.cross_entropy( + input=predict_word, label=words[4]) + avg_cost = fluid.layers.mean(cost) + return avg_cost, predict_word + + word_dict = paddle.dataset.imikolov.build_dict() + dict_size = len(word_dict) + + first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') + second_word = fluid.layers.data( + name='secondw', shape=[1], dtype='int64') + third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') + forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') + next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') + avg_cost, predict_word = __network__( + [first_word, second_word, third_word, forth_word, next_word]) + + inference_program = paddle.fluid.default_main_program().clone() + + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + train_reader = paddle.batch( + paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) + test_reader = paddle.batch( + paddle.dataset.imikolov.test(word_dict, N), BATCH_SIZE) + + return inference_program, avg_cost, train_reader, test_reader, None, predict_word + + +if __name__ == "__main__": + runtime_main(TestDistWord2vec2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 58cfd4e1fd9..1deccbe4af5 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -18,6 +18,109 @@ import os import sys import signal import subprocess +import six + + +class TestDistRunnerBase(object): + def get_model(self, batch_size=2): + raise NotImplementedError( + "get_model should be implemented by child classes.") + + def get_transpiler(self, trainer_id, main_program, pserver_endpoints, + trainers): + # NOTE: import fluid until runtime, or else forking processes will cause error. + import paddle + import paddle.fluid as fluid + t = fluid.DistributeTranspiler() + t.transpile( + trainer_id=trainer_id, + program=main_program, + pservers=pserver_endpoints, + trainers=trainers) + return t + + def run_pserver(self, pserver_endpoints, trainers, current_endpoint, + trainer_id): + import paddle + import paddle.fluid as fluid + self.get_model(batch_size=2) + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), pserver_endpoints, + trainers) + pserver_prog = t.get_pserver_program(current_endpoint) + startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + exe.run(pserver_prog) + + def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): + import paddle + import paddle.fluid as fluid + test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ + self.get_model(batch_size=2) + if is_dist: + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), endpoints, + trainers) + trainer_prog = t.get_trainer_program() + else: + trainer_prog = fluid.default_main_program() + + startup_exe = fluid.Executor(place) + startup_exe.run(fluid.default_startup_program()) + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + strategy.allow_op_delay = False + exe = fluid.ParallelExecutor( + True, loss_name=avg_cost.name, exec_strategy=strategy) + + feed_var_list = [ + var for var in trainer_prog.global_block().vars.values() + if var.is_data + ] + + feeder = fluid.DataFeeder(feed_var_list, place) + reader_generator = test_reader() + + data = next(reader_generator) + first_loss, = exe.run(fetch_list=[avg_cost.name], + feed=feeder.feed(data)) + print(first_loss) + + for i in six.moves.xrange(5): + data = next(reader_generator) + loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) + + data = next(reader_generator) + last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) + print(last_loss) + + +def runtime_main(test_class): + import paddle + import paddle.fluid as fluid + import paddle.fluid.core as core + + if len(sys.argv) != 7: + print( + "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" + ) + role = sys.argv[1] + endpoints = sys.argv[2] + trainer_id = int(sys.argv[3]) + current_endpoint = sys.argv[4] + trainers = int(sys.argv[5]) + is_dist = True if sys.argv[6] == "TRUE" else False + + model = test_class() + if role == "pserver": + model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) + else: + p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( + ) else fluid.CPUPlace() + model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) class TestDistBase(unittest.TestCase): @@ -127,12 +230,10 @@ class TestDistBase(unittest.TestCase): local_first_loss = eval(local_lines[0])[0] local_last_loss = eval(local_lines[1])[0] - self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) - self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta) - - # check tr0_out - # FIXME: ensure the server process is killed - # replace with ps0.terminate() + # FIXME: use terminate() instead of sigkill. os.kill(ps0.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL) FNULL.close() + + self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) + self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index a6fcbd977f1..b3ccec9a7d6 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -11,200 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import numpy as np -import argparse -import time -import math - -import paddle -import paddle.fluid as fluid -import paddle.fluid.profiler as profiler -from paddle.fluid import core import unittest -from multiprocessing import Process -import os -import signal -from functools import reduce - -SEED = 1 -DTYPE = "float32" -paddle.dataset.mnist.fetch() - - -# random seed must set before configuring the network. -# fluid.default_startup_program().random_seed = SEED -def cnn_model(data): - conv_pool_1 = fluid.nets.simple_img_conv_pool( - input=data, - filter_size=5, - num_filters=20, - pool_size=2, - pool_stride=2, - act="relu") - conv_pool_2 = fluid.nets.simple_img_conv_pool( - input=conv_pool_1, - filter_size=5, - num_filters=50, - pool_size=2, - pool_stride=2, - act="relu") - - # TODO(dzhwinter) : refine the initializer and random seed settting - SIZE = 10 - input_shape = conv_pool_2.shape - param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] - scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 - - predict = fluid.layers.fc( - input=conv_pool_2, - size=SIZE, - act="softmax", - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.NormalInitializer( - loc=0.0, scale=scale))) - return predict - - -def get_model(batch_size): - # Input data - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - # Train program - predict = cnn_model(images) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - - # Evaluator - batch_size_tensor = fluid.layers.create_tensor(dtype='int64') - batch_acc = fluid.layers.accuracy( - input=predict, label=label, total=batch_size_tensor) - - inference_program = fluid.default_main_program().clone() - # Optimization - opt = fluid.optimizer.AdamOptimizer( - learning_rate=0.001, beta1=0.9, beta2=0.999) - - # Reader - train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=batch_size) - test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=batch_size) - opt.minimize(avg_cost) - return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -def run_pserver(pserver_endpoints, trainers, current_endpoint): - get_model(batch_size=20) - t = get_transpiler(0, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - - exe.run(pserver_prog) - - -class TestDistMnist(unittest.TestCase): - def setUp(self): - self._trainers = 1 - self._pservers = 1 - self._ps_endpoints = "127.0.0.1:9123" - - def start_pserver(self, endpoint): - p = Process( - target=run_pserver, - args=(self._ps_endpoints, self._trainers, endpoint)) - p.start() - return p.pid - - def _wait_ps_ready(self, pid): - retry_times = 5 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(1) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def stop_pserver(self, pid): - os.kill(pid, signal.SIGTERM) - - def test_with_place(self): - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - - pserver_pid = self.start_pserver(self._ps_endpoints) - self._wait_ps_ready(pserver_pid) - - self.run_trainer(p, 0) - - self.stop_pserver(pserver_pid) - - def run_trainer(self, place, trainer_id): - test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model( - batch_size=20) - t = get_transpiler(trainer_id, - fluid.default_main_program(), self._ps_endpoints, - self._trainers) - - trainer_prog = t.get_trainer_program() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] +from test_dist_base import TestDistBase - feeder = fluid.DataFeeder(feed_var_list, place) - for pass_id in range(10): - for batch_id, data in enumerate(train_reader()): - exe.run(trainer_prog, feed=feeder.feed(data)) - if (batch_id + 1) % 10 == 0: - acc_set = [] - avg_loss_set = [] - for test_data in test_reader(): - acc_np, avg_loss_np = exe.run( - program=test_program, - feed=feeder.feed(test_data), - fetch_list=[batch_acc, avg_cost]) - acc_set.append(float(acc_np)) - avg_loss_set.append(float(avg_loss_np)) - # get test acc and loss - acc_val = np.array(acc_set).mean() - avg_loss_val = np.array(avg_loss_set).mean() - if float(acc_val - ) > 0.8: # Smaller value to increase CI speed - return - else: - print( - 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. - format(pass_id, batch_id + 1, - float(avg_loss_val), float(acc_val))) - if math.isnan(float(avg_loss_val)): - assert ("got Nan loss, training failed.") +class TestDistSeResneXt2x2(TestDistBase): + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index f3a5fd6985b..a33a338fc11 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -17,7 +17,7 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): def test_se_resnext(self): - self.check_with_place("dist_se_resnext.py") + self.check_with_place("dist_se_resnext.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py index 4bb3998f891..543d0f9dc2c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py @@ -11,192 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import numpy as np -import argparse -import time -import math -import paddle -import paddle.fluid as fluid -import paddle.fluid.profiler as profiler -from paddle.fluid import core import unittest -from multiprocessing import Process -import os -import signal - -IS_SPARSE = True -EMBED_SIZE = 32 -HIDDEN_SIZE = 256 -N = 5 -BATCH_SIZE = 32 -ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy - - -def get_model(): - def __network__(words): - embed_first = fluid.layers.embedding( - input=words[0], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_second = fluid.layers.embedding( - input=words[1], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_third = fluid.layers.embedding( - input=words[2], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_forth = fluid.layers.embedding( - input=words[3], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - - concat_embed = fluid.layers.concat( - input=[embed_first, embed_second, embed_third, embed_forth], axis=1) - hidden1 = fluid.layers.fc(input=concat_embed, - size=HIDDEN_SIZE, - act='sigmoid') - predict_word = fluid.layers.fc(input=hidden1, - size=dict_size, - act='softmax') - cost = fluid.layers.cross_entropy(input=predict_word, label=words[4]) - avg_cost = fluid.layers.mean(cost) - return avg_cost, predict_word - - word_dict = paddle.dataset.imikolov.build_dict() - dict_size = len(word_dict) - - first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') - second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') - third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') - forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') - next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - avg_cost, predict_word = __network__( - [first_word, second_word, third_word, forth_word, next_word]) - - inference_program = paddle.fluid.default_main_program().clone() - - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) - - train_reader = paddle.batch( - paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) - test_reader = paddle.batch( - paddle.dataset.imikolov.test(word_dict, N), BATCH_SIZE) - - return inference_program, avg_cost, train_reader, test_reader, predict_word - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -def run_pserver(pserver_endpoints, trainers, current_endpoint): - get_model() - t = get_transpiler(0, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - - exe.run(pserver_prog) - - -class TestDistMnist(unittest.TestCase): - def setUp(self): - self._trainers = 1 - self._pservers = 1 - self._ps_endpoints = "127.0.0.1:9123" - - def start_pserver(self, endpoint): - p = Process( - target=run_pserver, - args=(self._ps_endpoints, self._trainers, endpoint)) - p.start() - return p.pid - - def _wait_ps_ready(self, pid): - retry_times = 5 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(1) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def stop_pserver(self, pid): - os.kill(pid, signal.SIGKILL) - - def test_with_place(self): - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - - pserver_pid = self.start_pserver(self._ps_endpoints) - self._wait_ps_ready(pserver_pid) - - self.run_trainer(p, 0) - - self.stop_pserver(pserver_pid) - - def run_trainer(self, place, trainer_id): - test_program, avg_cost, train_reader, test_reader, predict = get_model() - t = get_transpiler(trainer_id, - fluid.default_main_program(), self._ps_endpoints, - self._trainers) - - trainer_prog = t.get_trainer_program() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - use_gpu = True if core.is_compiled_with_cuda() else False - - exec_strategy = ExecutionStrategy() - exec_strategy.use_cuda = use_gpu - train_exe = fluid.ParallelExecutor( - use_cuda=use_gpu, - main_program=trainer_prog, - loss_name=avg_cost.name, - exec_strategy=exec_strategy) +from test_dist_base import TestDistBase - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] - feeder = fluid.DataFeeder(feed_var_list, place) - for pass_id in range(10): - for batch_id, data in enumerate(train_reader()): - avg_loss_np = train_exe.run(feed=feeder.feed(data), - fetch_list=[avg_cost.name]) - loss = np.array(avg_loss_np).mean() - if float(loss) < 5.0: - return - if math.isnan(loss): - assert ("Got Nan loss, training failed") +class TestDistSeResneXt2x2(TestDistBase): + def test_se_resnext(self): + self.check_with_place("dist_word2vec.py", delta=1e-7) if __name__ == "__main__": -- GitLab