diff --git a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py index d561c4b81ff4c11707276dd6a088eed2ff340ed0..c3fb0df3415815e152716cd862da2d2857f7874b 100644 --- a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py @@ -213,7 +213,7 @@ class TestDistSimnetBow2x2(TestDistRunnerBase): # Reader train_reader, _ = get_train_reader() - return inference_program, avg_cost, train_reader, _, acc, predict + return inference_program, avg_cost, train_reader, train_reader, acc, predict if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_inference_save.py b/python/paddle/fluid/tests/unittests/test_dist_inference_save.py new file mode 100644 index 0000000000000000000000000000000000000000..55fe38dc376cba07f52cb0266edbe89402ef0c05 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_inference_save.py @@ -0,0 +1,146 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +from test_dist_base import TestDistBase + +import os +import sys +import signal +import subprocess +import paddle.compat as cpt + + +class TestDistMnist2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + + def check_with_place(self, model_file, delta=1e-3, check_error_log=False): + # *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN + required_envs = { + "PATH": os.getenv("PATH"), + "PYTHONPATH": os.getenv("PYTHONPATH"), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"), + "FLAGS_fraction_of_gpu_memory_to_use": "0.15", + "FLAGS_cudnn_deterministic": "1" + } + + if check_error_log: + required_envs["GLOG_v"] = "7" + required_envs["GLOG_logtostderr"] = "1" + + # Run local to get a base line + env_local = {"CUDA_VISIBLE_DEVICES": "0"} + env_local.update(required_envs) + sync_mode_str = "TRUE" if self._sync_mode else "FALSE" + local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \ + (self._python_interp, model_file, + "127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str) + if not check_error_log: + local_proc = subprocess.Popen( + local_cmd.split(" "), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env_local) + else: + print("trainer cmd:", local_cmd) + err_log = open("/tmp/trainer.err.log", "wb") + local_proc = subprocess.Popen( + local_cmd.split(" "), + stdout=subprocess.PIPE, + stderr=err_log, + env=env_local) + + local_proc.wait() + out, err = local_proc.communicate() + local_ret = cpt.to_text(out) + sys.stderr.write('local_loss: %s\n' % local_ret) + sys.stderr.write('local_stderr: %s\n' % err) + + # Run dist train to compare with local results + ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model_file, + check_error_log) + self._wait_ps_ready(ps0.pid) + self._wait_ps_ready(ps1.pid) + + ps0_ep, ps1_ep = self._ps_endpoints.split(",") + tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \ + (self._python_interp, model_file, self._ps_endpoints, ps0_ep, + self._trainers, sync_mode_str) + tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \ + (self._python_interp, model_file, self._ps_endpoints, ps1_ep, + self._trainers, sync_mode_str) + + env0 = {"CUDA_VISIBLE_DEVICES": "0"} + env1 = {"CUDA_VISIBLE_DEVICES": "1"} + env0.update(required_envs) + env1.update(required_envs) + FNULL = open(os.devnull, 'w') + + tr0_pipe = subprocess.PIPE + tr1_pipe = subprocess.PIPE + if check_error_log: + print("tr0_cmd:", tr0_cmd) + print("tr1_cmd:", tr1_cmd) + tr0_pipe = open("/tmp/tr0_err.log", "wb") + tr1_pipe = open("/tmp/tr1_err.log", "wb") + + tr0_proc = subprocess.Popen( + tr0_cmd.split(" "), + stdout=subprocess.PIPE, + stderr=tr0_pipe, + env=env0) + tr1_proc = subprocess.Popen( + tr1_cmd.split(" "), + stdout=subprocess.PIPE, + stderr=tr1_pipe, + env=env1) + + tr0_proc.wait() + tr1_proc.wait() + out, err = tr0_proc.communicate() + sys.stderr.write('dist_stderr: %s\n' % err) + loss_data0 = cpt.to_text(out) + sys.stderr.write('dist_loss: %s\n' % loss_data0) + lines = loss_data0.split("\n") + dist_first_loss = eval(lines[0].replace(" ", ","))[0] + dist_last_loss = eval(lines[1].replace(" ", ","))[0] + + local_lines = local_ret.split("\n") + local_first_loss = eval(local_lines[0])[0] + local_last_loss = eval(local_lines[1])[0] + + # close trainer file + if check_error_log: + tr0_pipe.close() + tr1_pipe.close() + + ps0_pipe.close() + ps1_pipe.close() + # FIXME: use terminate() instead of sigkill. + os.kill(ps0.pid, signal.SIGKILL) + os.kill(ps1.pid, signal.SIGKILL) + FNULL.close() + + self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) + self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta) + + @unittest.skip(reason="Not Ready, Debugging") + def test_dist_save_inference_model(self): + self.check_with_place("dist_simnet_bow.py", delta=1e-7) + + +if __name__ == "__main__": + unittest.main()