提交 da4129f2 编写于 作者: T tangwei12

fix dist ut with place, test=develop (#13647)

上级 e9adfc41
...@@ -247,7 +247,7 @@ class DistSeResneXt2x2(TestDistRunnerBase): ...@@ -247,7 +247,7 @@ class DistSeResneXt2x2(TestDistRunnerBase):
# Reader # Reader
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.dataset.flowers.train(), batch_size=batch_size) paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
test_reader = paddle.batch( test_reader = paddle.batch(
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size) paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
......
...@@ -166,6 +166,17 @@ class TestDistBase(unittest.TestCase): ...@@ -166,6 +166,17 @@ class TestDistBase(unittest.TestCase):
def _setup_config(self): def _setup_config(self):
raise NotImplementedError("tests should have _setup_config implemented") raise NotImplementedError("tests should have _setup_config implemented")
def _after_setup_config(self):
if self._enforce_place == "CPU":
self.__use_cuda = False
elif self._enforce_place == "GPU":
self.__use_cuda = True
else:
if fluid.core.is_compiled_with_cuda():
self.__use_cuda = True
else:
self.__use_cuda = False
def setUp(self): def setUp(self):
self._trainers = 2 self._trainers = 2
self._pservers = 2 self._pservers = 2
...@@ -173,11 +184,12 @@ class TestDistBase(unittest.TestCase): ...@@ -173,11 +184,12 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port()) self._find_free_port(), self._find_free_port())
self._python_interp = "python" self._python_interp = "python"
self._sync_mode = True self._sync_mode = True
self._use_cuda = True self._enforce_place = None
self._mem_opt = False self._mem_opt = False
self._use_reduce = False self._use_reduce = False
self._use_reader_alloc = True self._use_reader_alloc = True
self._setup_config() self._setup_config()
self._after_setup_config()
def _find_free_port(self): def _find_free_port(self):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
...@@ -201,13 +213,10 @@ class TestDistBase(unittest.TestCase): ...@@ -201,13 +213,10 @@ class TestDistBase(unittest.TestCase):
ps0_cmd += " --mem_opt" ps0_cmd += " --mem_opt"
ps1_cmd += " --mem_opt" ps1_cmd += " --mem_opt"
ps0_pipe = subprocess.PIPE print(ps0_cmd)
ps1_pipe = subprocess.PIPE print(ps1_cmd)
if check_error_log: ps0_pipe = open("/tmp/ps0_err.log", "wb")
print(ps0_cmd) ps1_pipe = open("/tmp/ps1_err.log", "wb")
print(ps1_cmd)
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
ps0_proc = subprocess.Popen( ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "), ps0_cmd.strip().split(" "),
...@@ -220,10 +229,7 @@ class TestDistBase(unittest.TestCase): ...@@ -220,10 +229,7 @@ class TestDistBase(unittest.TestCase):
stderr=ps1_pipe, stderr=ps1_pipe,
env=required_envs) env=required_envs)
if not check_error_log: return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
return ps0_proc, ps1_proc, None, None
else:
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
def _wait_ps_ready(self, pid): def _wait_ps_ready(self, pid):
retry_times = 50 retry_times = 50
...@@ -244,7 +250,7 @@ class TestDistBase(unittest.TestCase): ...@@ -244,7 +250,7 @@ class TestDistBase(unittest.TestCase):
cmd = "%s %s --role trainer" % (self._python_interp, model) cmd = "%s %s --role trainer" % (self._python_interp, model)
if self._use_cuda: if self.__use_cuda:
cmd += " --use_cuda" cmd += " --use_cuda"
env_local = {"CUDA_VISIBLE_DEVICES": "0"} env_local = {"CUDA_VISIBLE_DEVICES": "0"}
else: else:
...@@ -252,7 +258,7 @@ class TestDistBase(unittest.TestCase): ...@@ -252,7 +258,7 @@ class TestDistBase(unittest.TestCase):
envs.update(env_local) envs.update(env_local)
if not check_error_log: if check_error_log:
err_log = open("/tmp/trainer.err.log", "wb") err_log = open("/tmp/trainer.err.log", "wb")
local_proc = subprocess.Popen( local_proc = subprocess.Popen(
cmd.split(" "), cmd.split(" "),
...@@ -266,7 +272,6 @@ class TestDistBase(unittest.TestCase): ...@@ -266,7 +272,6 @@ class TestDistBase(unittest.TestCase):
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
env=envs) env=envs)
local_proc.wait()
local_out, local_err = local_proc.communicate() local_out, local_err = local_proc.communicate()
local_ret = cpt.to_text(local_out) local_ret = cpt.to_text(local_out)
...@@ -307,7 +312,7 @@ class TestDistBase(unittest.TestCase): ...@@ -307,7 +312,7 @@ class TestDistBase(unittest.TestCase):
if self._use_reader_alloc: if self._use_reader_alloc:
tr0_cmd += " --use_reader_alloc" tr0_cmd += " --use_reader_alloc"
tr1_cmd += " --use_reader_alloc" tr1_cmd += " --use_reader_alloc"
if self._use_cuda: if self.__use_cuda:
tr0_cmd += " --use_cuda" tr0_cmd += " --use_cuda"
tr1_cmd += " --use_cuda" tr1_cmd += " --use_cuda"
env0 = {"CUDA_VISIBLE_DEVICES": "0"} env0 = {"CUDA_VISIBLE_DEVICES": "0"}
...@@ -319,15 +324,10 @@ class TestDistBase(unittest.TestCase): ...@@ -319,15 +324,10 @@ class TestDistBase(unittest.TestCase):
env0.update(envs) env0.update(envs)
env1.update(envs) env1.update(envs)
FNULL = open(os.devnull, 'w') print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1))
tr0_pipe = subprocess.PIPE tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = subprocess.PIPE tr1_pipe = open("/tmp/tr1_err.log", "wb")
if check_error_log:
print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen( tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "), tr0_cmd.strip().split(" "),
...@@ -340,29 +340,22 @@ class TestDistBase(unittest.TestCase): ...@@ -340,29 +340,22 @@ class TestDistBase(unittest.TestCase):
stderr=tr1_pipe, stderr=tr1_pipe,
env=env1) env=env1)
tr0_proc.wait()
tr1_proc.wait()
tr0_out, tr0_err = tr0_proc.communicate() tr0_out, tr0_err = tr0_proc.communicate()
tr0_loss_text = cpt.to_text(tr0_out) tr0_loss_text = cpt.to_text(tr0_out)
tr1_out, tr1_err = tr1_proc.communicate() tr1_out, tr1_err = tr1_proc.communicate()
tr1_loss_text = cpt.to_text(tr1_out) tr1_loss_text = cpt.to_text(tr1_out)
# close trainer file # close trainer file
if check_error_log: tr0_pipe.close()
tr0_pipe.close() tr1_pipe.close()
tr1_pipe.close()
ps0_pipe.close() ps0_pipe.close()
ps1_pipe.close() ps1_pipe.close()
# FIXME: use terminate() instead of sigkill. # FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL) os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL)
ps0.terminate() ps0.terminate()
ps1.terminate() ps1.terminate()
ps0.wait()
ps1.wait()
FNULL.close()
# print log # print log
sys.stderr.write('trainer 0 stdout:\n %s\n' % tr0_loss_text) sys.stderr.write('trainer 0 stdout:\n %s\n' % tr0_loss_text)
...@@ -387,6 +380,7 @@ class TestDistBase(unittest.TestCase): ...@@ -387,6 +380,7 @@ class TestDistBase(unittest.TestCase):
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_cudnn_deterministic": "1", "FLAGS_cudnn_deterministic": "1",
"http_proxy": ""
} }
required_envs.update(need_envs) required_envs.update(need_envs)
......
...@@ -21,10 +21,11 @@ from test_dist_base import TestDistBase ...@@ -21,10 +21,11 @@ from test_dist_base import TestDistBase
class TestDistCTR2x2(TestDistBase): class TestDistCTR2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
self._use_cuda = False self._enforce_place = "CPU"
def test_dist_ctr(self):
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) def test_dist_ctr(self):
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -22,7 +22,7 @@ from test_dist_base import TestDistBase ...@@ -22,7 +22,7 @@ from test_dist_base import TestDistBase
class TestDistSimnetBowDense2x2(TestDistBase): class TestDistSimnetBowDense2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
self._use_cuda = False self._enforce_place = "CPU"
def test_simnet_bow(self): def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
...@@ -36,7 +36,7 @@ class TestDistSimnetBowDense2x2(TestDistBase): ...@@ -36,7 +36,7 @@ class TestDistSimnetBowDense2x2(TestDistBase):
class TestDistSimnetBow2x2DenseAsync(TestDistBase): class TestDistSimnetBow2x2DenseAsync(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
self._use_cuda = False self._enforce_place = "CPU"
def test_simnet_bow(self): def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
...@@ -50,7 +50,7 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase): ...@@ -50,7 +50,7 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase):
class TestDistSimnetBowSparse2x2(TestDistBase): class TestDistSimnetBowSparse2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
self._use_cuda = False self._enforce_place = "CPU"
def test_simnet_bow(self): def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
...@@ -64,7 +64,7 @@ class TestDistSimnetBowSparse2x2(TestDistBase): ...@@ -64,7 +64,7 @@ class TestDistSimnetBowSparse2x2(TestDistBase):
class TestDistSimnetBow2x2SparseAsync(TestDistBase): class TestDistSimnetBow2x2SparseAsync(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
self._use_cuda = False self._enforce_place = "CPU"
def test_simnet_bow(self): def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
......
...@@ -21,7 +21,7 @@ from test_dist_base import TestDistBase ...@@ -21,7 +21,7 @@ from test_dist_base import TestDistBase
class TestDistTextClassification2x2(TestDistBase): class TestDistTextClassification2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
self._use_cuda = False self._enforce_place = "CPU"
def test_text_classification(self): def test_text_classification(self):
self.check_with_place("dist_text_classification.py", delta=1e-6) self.check_with_place("dist_text_classification.py", delta=1e-6)
...@@ -30,7 +30,7 @@ class TestDistTextClassification2x2(TestDistBase): ...@@ -30,7 +30,7 @@ class TestDistTextClassification2x2(TestDistBase):
class TestDistTextClassification2x2Async(TestDistBase): class TestDistTextClassification2x2Async(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
self._use_cuda = False self._enforce_place = "CPU"
def test_se_resnext(self): def test_se_resnext(self):
self.check_with_place("dist_text_classification.py", delta=100) self.check_with_place("dist_text_classification.py", delta=100)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册