提交 b35239df 编写于 作者: T tangwei12 提交者: gongweibao

fix dist ut with place, test=develop (#13647)

上级 d6747a9a
......@@ -247,7 +247,7 @@ class DistSeResneXt2x2(TestDistRunnerBase):
# Reader
train_reader = paddle.batch(
paddle.dataset.flowers.train(), batch_size=batch_size)
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
......
......@@ -164,6 +164,17 @@ class TestDistBase(unittest.TestCase):
def _setup_config(self):
raise NotImplementedError("tests should have _setup_config implemented")
def _after_setup_config(self):
if self._enforce_place == "CPU":
self.__use_cuda = False
elif self._enforce_place == "GPU":
self.__use_cuda = True
else:
if fluid.core.is_compiled_with_cuda():
self.__use_cuda = True
else:
self.__use_cuda = False
def setUp(self):
self._trainers = 2
self._pservers = 2
......@@ -171,11 +182,12 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port())
self._python_interp = "python"
self._sync_mode = True
self._use_cuda = True
self._enforce_place = None
self._mem_opt = False
self._use_reduce = False
self._use_reader_alloc = True
self._setup_config()
self._after_setup_config()
def _find_free_port(self):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
......@@ -199,13 +211,10 @@ class TestDistBase(unittest.TestCase):
ps0_cmd += " --mem_opt"
ps1_cmd += " --mem_opt"
ps0_pipe = subprocess.PIPE
ps1_pipe = subprocess.PIPE
if check_error_log:
print(ps0_cmd)
print(ps1_cmd)
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
print(ps0_cmd)
print(ps1_cmd)
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "),
......@@ -218,10 +227,7 @@ class TestDistBase(unittest.TestCase):
stderr=ps1_pipe,
env=required_envs)
if not check_error_log:
return ps0_proc, ps1_proc, None, None
else:
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
def _wait_ps_ready(self, pid):
retry_times = 50
......@@ -242,7 +248,7 @@ class TestDistBase(unittest.TestCase):
cmd = "%s %s --role trainer" % (self._python_interp, model)
if self._use_cuda:
if self.__use_cuda:
cmd += " --use_cuda"
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
else:
......@@ -250,7 +256,7 @@ class TestDistBase(unittest.TestCase):
envs.update(env_local)
if not check_error_log:
if check_error_log:
err_log = open("/tmp/trainer.err.log", "wb")
local_proc = subprocess.Popen(
cmd.split(" "),
......@@ -264,7 +270,6 @@ class TestDistBase(unittest.TestCase):
stderr=subprocess.PIPE,
env=envs)
local_proc.wait()
local_out, local_err = local_proc.communicate()
local_ret = cpt.to_text(local_out)
......@@ -305,7 +310,7 @@ class TestDistBase(unittest.TestCase):
if self._use_reader_alloc:
tr0_cmd += " --use_reader_alloc"
tr1_cmd += " --use_reader_alloc"
if self._use_cuda:
if self.__use_cuda:
tr0_cmd += " --use_cuda"
tr1_cmd += " --use_cuda"
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
......@@ -317,15 +322,10 @@ class TestDistBase(unittest.TestCase):
env0.update(envs)
env1.update(envs)
FNULL = open(os.devnull, 'w')
tr0_pipe = subprocess.PIPE
tr1_pipe = subprocess.PIPE
if check_error_log:
print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
......@@ -338,29 +338,22 @@ class TestDistBase(unittest.TestCase):
stderr=tr1_pipe,
env=env1)
tr0_proc.wait()
tr1_proc.wait()
tr0_out, tr0_err = tr0_proc.communicate()
tr0_loss_text = cpt.to_text(tr0_out)
tr1_out, tr1_err = tr1_proc.communicate()
tr1_loss_text = cpt.to_text(tr1_out)
# close trainer file
if check_error_log:
tr0_pipe.close()
tr1_pipe.close()
tr0_pipe.close()
tr1_pipe.close()
ps0_pipe.close()
ps1_pipe.close()
ps0_pipe.close()
ps1_pipe.close()
# FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL)
ps0.terminate()
ps1.terminate()
ps0.wait()
ps1.wait()
FNULL.close()
# print log
sys.stderr.write('trainer 0 stdout:\n %s\n' % tr0_loss_text)
......@@ -385,6 +378,7 @@ class TestDistBase(unittest.TestCase):
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_cudnn_deterministic": "1",
"http_proxy": ""
}
required_envs.update(need_envs)
......
......@@ -21,10 +21,11 @@ from test_dist_base import TestDistBase
class TestDistCTR2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_cuda = False
self._enforce_place = "CPU"
def test_dist_ctr(self):
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
def test_dist_ctr(self):
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
if __name__ == "__main__":
......
......@@ -22,7 +22,7 @@ class TestDistSeResneXt2x2(TestDistBase):
self._sync_mode = True
self._use_reader_alloc = False
def no_test_dist_train(self):
def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=100)
......@@ -40,7 +40,7 @@ class TestDistSeResneXt2x2Async(TestDistBase):
self._sync_mode = False
self._use_reader_alloc = False
def no_test_dist_train(self):
def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=100)
......
......@@ -22,7 +22,7 @@ from test_dist_base import TestDistBase
class TestDistSimnetBowDense2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_cuda = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
......@@ -36,7 +36,7 @@ class TestDistSimnetBowDense2x2(TestDistBase):
class TestDistSimnetBow2x2DenseAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._use_cuda = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
......@@ -50,7 +50,7 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase):
class TestDistSimnetBowSparse2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_cuda = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
......@@ -64,7 +64,7 @@ class TestDistSimnetBowSparse2x2(TestDistBase):
class TestDistSimnetBow2x2SparseAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._use_cuda = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
......
......@@ -21,7 +21,7 @@ from test_dist_base import TestDistBase
class TestDistTextClassification2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_cuda = False
self._enforce_place = "CPU"
def test_text_classification(self):
self.check_with_place("dist_text_classification.py", delta=1e-6)
......@@ -30,7 +30,7 @@ class TestDistTextClassification2x2(TestDistBase):
class TestDistTextClassification2x2Async(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._use_cuda = False
self._enforce_place = "CPU"
def test_se_resnext(self):
self.check_with_place("dist_text_classification.py", delta=100)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册