diff --git a/python/paddle/fluid/tests/unittests/dist_transformer.py b/python/paddle/fluid/tests/unittests/dist_transformer.py index 179c2540f812feac32632540e9070b58451c9a90..7abfa0a4be0dec9fe251704e22dfef1f932e7c5b 100644 --- a/python/paddle/fluid/tests/unittests/dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/dist_transformer.py @@ -1667,16 +1667,6 @@ def get_model(is_dist, is_async): return sum_cost, avg_cost, predict, token_num, local_lr_scheduler -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - def update_args(): src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath) trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath) @@ -1691,69 +1681,46 @@ def update_args(): class DistTransformer2x2(TestDistRunnerBase): - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id, sync_mode): - get_model(True, not sync_mode) - t = get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + def run_pserver(self, args): + get_model(True, not args.sync_mode) + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), args.endpoints, + args.trainers, args.sync_mode) + pserver_prog = t.get_pserver_program(args.current_endpoint) + startup_prog = t.get_startup_program(args.current_endpoint, + pserver_prog) place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) exe.run(pserver_prog) - def _wait_ps_ready(self, pid): - retry_times = 20 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(3) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def run_trainer(self, - place, - endpoints, - trainer_id, - trainers, - is_dist=True, - sync_mode=True): + def run_trainer(self, place, args): sum_cost, avg_cost, predict, token_num, local_lr_scheduler = get_model( - is_dist, not sync_mode) + args.is_dist, not args.sync_mode) - if is_dist: - t = get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers) + if args.is_dist: + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), + args.endpoints, args.trainers, + args.sync_mode) trainer_prog = t.get_trainer_program() TrainTaskConfig.batch_size = 10 TrainTaskConfig.train_file_pattern = TrainTaskConfig.data_path + "train.tok.clean.bpe.32000.en-de.train_{}".format( - trainer_id) + args.trainer_id) else: TrainTaskConfig.batch_size = 20 trainer_prog = fluid.default_main_program() startup_exe = fluid.Executor(place) - TrainTaskConfig.local = not is_dist + TrainTaskConfig.local = not args.is_dist train_loop(startup_exe, trainer_prog, 1, sum_cost, avg_cost, local_lr_scheduler, token_num, predict) if __name__ == "__main__": - if len(sys.argv) != 8: - print( - "Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]" - ) - update_args() runtime_main(DistTransformer2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 0e815c91446b285ba2c2c5aa9ad18d97f51eae65..b9387ae9d83f36a491414764619b86e39368d266 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -21,7 +21,7 @@ import sys import six import signal import subprocess -import six +import argparse class TestDistRunnerBase(object): @@ -43,40 +43,35 @@ class TestDistRunnerBase(object): sync_mode=sync_mode) return t - def run_pserver(self, - pserver_endpoints, - trainers, - current_endpoint, - trainer_id, - sync_mode=True): + def run_pserver(self, args): import paddle import paddle.fluid as fluid self.get_model(batch_size=2) - t = self.get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers, sync_mode) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + if args.mem_opt: + fluid.memory_optimize(fluid.default_main_program()) + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), args.endpoints, + args.trainers, args.sync_mode) + pserver_prog = t.get_pserver_program(args.current_endpoint) + startup_prog = t.get_startup_program(args.current_endpoint, + pserver_prog) place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) exe.run(pserver_prog) - def run_trainer(self, - place, - endpoints, - trainer_id, - trainers, - is_dist=True, - sync_mode=True): + def run_trainer(self, place, args): import paddle import paddle.fluid as fluid test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ - self.get_model(batch_size=2) - if is_dist: - t = self.get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers, sync_mode) + self.get_model(batch_size=2) + if args.mem_opt: + fluid.memory_optimize(fluid.default_main_program()) + if args.is_dist: + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), + args.endpoints, args.trainers, + args.sync_mode) trainer_prog = t.get_trainer_program() else: trainer_prog = fluid.default_main_program() @@ -117,27 +112,27 @@ def runtime_main(test_class): import paddle.fluid as fluid import paddle.fluid.core as core - if len(sys.argv) != 8: - print( - "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]" - ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False - sync_mode = True if sys.argv[7] == "TRUE" else False + parser = argparse.ArgumentParser(description='Run dist test.') + parser.add_argument( + '--role', type=str, required=True, choices=['pserver', 'trainer']) + parser.add_argument('--endpoints', type=str, required=False, default="") + parser.add_argument('--is_dist', action='store_true') + parser.add_argument('--trainer_id', type=int, required=False, default=0) + parser.add_argument('--trainers', type=int, required=False, default=1) + parser.add_argument( + '--current_endpoint', type=str, required=False, default="") + parser.add_argument('--sync_mode', action='store_true') + parser.add_argument('--mem_opt', action='store_true') + + args = parser.parse_args() model = test_class() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id, - sync_mode) + if args.role == "pserver" and args.is_dist: + model.run_pserver(args) else: p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist, - sync_mode) + model.run_trainer(p, args) import paddle.compat as cpt @@ -153,30 +148,34 @@ class TestDistBase(unittest.TestCase): self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124" self._python_interp = "python" self._sync_mode = True + self._mem_opt = False self._setup_config() def start_pserver(self, model_file, check_error_log): - sync_mode_str = "TRUE" if self._sync_mode else "FALSE" + ps0_ep, ps1_ep = self._ps_endpoints.split(",") - ps0_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \ + ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist %s %s" + sync_mode_str = "--sync_mode" if self._sync_mode else "" + mem_opt_str = "--mem_opt" if self._mem_opt else "" + ps0_cmd = ps_cmd % \ (self._python_interp, model_file, self._ps_endpoints, ps0_ep, - self._trainers, sync_mode_str) - ps1_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \ + self._trainers, sync_mode_str, mem_opt_str) + ps1_cmd = ps_cmd % \ (self._python_interp, model_file, self._ps_endpoints, ps1_ep, - self._trainers, sync_mode_str) + self._trainers, sync_mode_str, mem_opt_str) ps0_pipe = subprocess.PIPE ps1_pipe = subprocess.PIPE if check_error_log: - print("ps0_cmd:", ps0_cmd) - print("ps1_cmd:", ps1_cmd) + print(ps0_cmd) + print(ps1_cmd) ps0_pipe = open("/tmp/ps0_err.log", "wb") ps1_pipe = open("/tmp/ps1_err.log", "wb") ps0_proc = subprocess.Popen( - ps0_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe) + ps0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe) ps1_proc = subprocess.Popen( - ps1_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe) + ps1_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe) if not check_error_log: return ps0_proc, ps1_proc, None, None @@ -199,7 +198,7 @@ class TestDistBase(unittest.TestCase): retry_times -= 1 def check_with_place(self, model_file, delta=1e-3, check_error_log=False): - # *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN + # TODO(typhoonzero): should auto adapt GPU count on the machine. required_envs = { "PATH": os.getenv("PATH"), "PYTHONPATH": os.getenv("PYTHONPATH"), @@ -215,10 +214,7 @@ class TestDistBase(unittest.TestCase): # Run local to get a base line env_local = {"CUDA_VISIBLE_DEVICES": "0"} env_local.update(required_envs) - sync_mode_str = "TRUE" if self._sync_mode else "FALSE" - local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \ - (self._python_interp, model_file, - "127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str) + local_cmd = "%s %s --role trainer" % (self._python_interp, model_file) if not check_error_log: local_proc = subprocess.Popen( local_cmd.split(" "), @@ -226,7 +222,6 @@ class TestDistBase(unittest.TestCase): stderr=subprocess.PIPE, env=env_local) else: - print("trainer cmd:", local_cmd) err_log = open("/tmp/trainer.err.log", "wb") local_proc = subprocess.Popen( local_cmd.split(" "), @@ -247,12 +242,17 @@ class TestDistBase(unittest.TestCase): self._wait_ps_ready(ps1.pid) ps0_ep, ps1_ep = self._ps_endpoints.split(",") - tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \ - (self._python_interp, model_file, self._ps_endpoints, ps0_ep, - self._trainers, sync_mode_str) - tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \ - (self._python_interp, model_file, self._ps_endpoints, ps1_ep, - self._trainers, sync_mode_str) + tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist %s %s" + sync_mode_str = "--sync_mode" if self._sync_mode else "" + mem_opt_str = "--mem_opt" if self._mem_opt else "" + tr0_cmd = tr_cmd % \ + (self._python_interp, model_file, self._ps_endpoints, + 0, ps0_ep, + self._trainers, sync_mode_str, mem_opt_str) + tr1_cmd = tr_cmd % \ + (self._python_interp, model_file, self._ps_endpoints, + 1, ps1_ep, + self._trainers, sync_mode_str, mem_opt_str) env0 = {"CUDA_VISIBLE_DEVICES": "0"} env1 = {"CUDA_VISIBLE_DEVICES": "1"} @@ -269,12 +269,12 @@ class TestDistBase(unittest.TestCase): tr1_pipe = open("/tmp/tr1_err.log", "wb") tr0_proc = subprocess.Popen( - tr0_cmd.split(" "), + tr0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=tr0_pipe, env=env0) tr1_proc = subprocess.Popen( - tr1_cmd.split(" "), + tr1_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=tr1_pipe, env=env1) @@ -303,6 +303,8 @@ class TestDistBase(unittest.TestCase): # FIXME: use terminate() instead of sigkill. os.kill(ps0.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL) + ps0.wait() + ps1.wait() FNULL.close() self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 36bab6f04603b7ad3218603489eead859bfcb5b6..157243df47189bddd494e5d533fdc34a28100c57 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -25,6 +25,15 @@ class TestDistMnist2x2(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-7) +class TestDistMnist2x2WithMemopt(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._mem_opt = True + + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=1e-7) + + class TestDistMnistAsync(TestDistBase): def _setup_config(self): self._sync_mode = False