From aebc175cd466e4e256e19d16dd3d2ad6f37dcdd7 Mon Sep 17 00:00:00 2001 From: Wu Yi Date: Fri, 7 Dec 2018 09:01:14 +0800 Subject: [PATCH] add nccl2 dist tests (#14755) * add nccl2 dist tests test=develop * fix dist_base test=develop * fix tests test=develop * fix test on mac test=develop --- .../fluid/tests/unittests/dist_save_load.py | 4 +- .../fluid/tests/unittests/test_dist_base.py | 135 +++++++++++++++--- .../fluid/tests/unittests/test_dist_mnist.py | 13 ++ .../tests/unittests/test_dist_save_load.py | 2 +- .../tests/unittests/test_dist_transpiler.py | 1 + .../fluid/transpiler/distribute_transpiler.py | 25 +--- 6 files changed, 138 insertions(+), 42 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/dist_save_load.py b/python/paddle/fluid/tests/unittests/dist_save_load.py index cf62817956..faec535042 100644 --- a/python/paddle/fluid/tests/unittests/dist_save_load.py +++ b/python/paddle/fluid/tests/unittests/dist_save_load.py @@ -102,7 +102,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2): if args.mem_opt: fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) - if args.is_dist: + if args.update_method == "pserver": t = self.get_transpiler(args.trainer_id, fluid.default_main_program(), args.endpoints, args.trainers, @@ -147,7 +147,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2): def get_data(): origin_batch = next(reader_generator) - if args.is_dist and args.use_reader_alloc: + if args.update_method == "pserver" and args.use_reader_alloc: new_batch = [] for offset, item in enumerate(origin_batch): if offset % 2 == args.trainer_id: diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 160969c63f..0a43f53658 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -76,12 +76,24 @@ class TestDistRunnerBase(object): if args.mem_opt: fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) - if args.is_dist: + if args.update_method == "pserver": t = self.get_transpiler(args.trainer_id, fluid.default_main_program(), args.endpoints, args.trainers, args.sync_mode, args.dc_asgd) trainer_prog = t.get_trainer_program() + elif args.update_method == "nccl2": + # transpile for nccl2 + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + nccl2_t = fluid.DistributeTranspiler(config=config) + nccl2_t.transpile( + args.trainer_id, + program=fluid.default_main_program(), + startup_program=fluid.default_startup_program(), + trainers=args.endpoints, + current_endpoint=args.current_endpoint) + trainer_prog = fluid.default_main_program() else: trainer_prog = fluid.default_main_program() @@ -110,11 +122,20 @@ class TestDistRunnerBase(object): len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass") mypass.set_int("num_repeats", args.batch_merge_repeat) + if args.update_method == "nccl2": + num_trainers = len(args.endpoints.split(",")) + trainer_id = args.trainer_id + else: + num_trainers = 1 + trainer_id = 0 + exe = fluid.ParallelExecutor( args.use_cuda, loss_name=avg_cost.name, exec_strategy=strategy, - build_strategy=build_stra) + build_strategy=build_stra, + num_trainers=num_trainers, + trainer_id=trainer_id) feed_var_list = [ var for var in trainer_prog.global_block().vars.values() @@ -126,7 +147,7 @@ class TestDistRunnerBase(object): def get_data(): origin_batch = next(reader_generator) - if args.is_dist and args.use_reader_alloc: + if args.update_method != "local" and args.use_reader_alloc: new_batch = [] for offset, item in enumerate(origin_batch): if offset % 2 == args.trainer_id: @@ -151,7 +172,11 @@ def runtime_main(test_class): parser.add_argument( '--role', type=str, required=True, choices=['pserver', 'trainer']) parser.add_argument('--endpoints', type=str, required=False, default="") - parser.add_argument('--is_dist', action='store_true') + parser.add_argument( + '--update_method', + type=str, + default="local", + choices=["pserver", "nccl2", "local"]) parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument( @@ -170,7 +195,7 @@ def runtime_main(test_class): args = parser.parse_args() model = test_class() - if args.role == "pserver" and args.is_dist: + if args.role == "pserver" and args.update_method == "pserver": model.run_pserver(args) else: model.run_trainer(args) @@ -208,6 +233,7 @@ class TestDistBase(unittest.TestCase): self._use_reduce = False self._dc_asgd = False # must use with async mode self._use_reader_alloc = True + self._nccl2_mode = False self._setup_config() self._after_setup_config() @@ -218,7 +244,7 @@ class TestDistBase(unittest.TestCase): def start_pserver(self, model_file, check_error_log, required_envs): ps0_ep, ps1_ep = self._ps_endpoints.split(",") - ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist" + ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver" ps0_cmd = ps_cmd % \ (self._python_interp, model_file, self._ps_endpoints, ps0_ep, self._trainers) @@ -270,7 +296,8 @@ class TestDistBase(unittest.TestCase): else: env_local = {'CPU_NUM': '1'} - envs.update(env_local) + env_local.update(envs) + print("local_cmd: {}, env: {}".format(cmd, env_local)) if check_error_log: err_log = open("/tmp/trainer.err.log", "wb") @@ -278,13 +305,13 @@ class TestDistBase(unittest.TestCase): cmd.split(" "), stdout=subprocess.PIPE, stderr=err_log, - env=envs) + env=env_local) else: local_proc = subprocess.Popen( cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=envs) + env=env_local) local_out, local_err = local_proc.communicate() @@ -303,7 +330,7 @@ class TestDistBase(unittest.TestCase): ps0_ep, ps1_ep = self._ps_endpoints.split(",") - tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist" + tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver" tr0_cmd = tr_cmd % \ (self._python_interp, model, self._ps_endpoints, 0, ps0_ep, self._trainers) @@ -335,8 +362,8 @@ class TestDistBase(unittest.TestCase): env0.update(envs) env1.update(envs) - print("tr0_cmd:{}".format(tr0_cmd)) - print("tr1_cmd:{}".format(tr1_cmd)) + print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0)) + print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1)) tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") @@ -357,12 +384,9 @@ class TestDistBase(unittest.TestCase): # close trainer file tr0_pipe.close() tr1_pipe.close() - ps0_pipe.close() ps1_pipe.close() - # FIXME: use terminate() instead of sigkill. - os.kill(ps0.pid, signal.SIGKILL) - os.kill(ps1.pid, signal.SIGKILL) + ps0.terminate() ps1.terminate() @@ -372,7 +396,71 @@ class TestDistBase(unittest.TestCase): sys.stderr.write('trainer 1 stdout: %s\n' % pickle.loads(tr1_out)) sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) - # return tr0_losses, tr1_losses + return pickle.loads(tr0_out), pickle.loads(tr1_out) + + def _run_cluster_nccl2(self, model, envs, check_error_log): + # NOTE: we reuse ps_endpoints as nccl2 worker endpoints + worker_endpoints = self._ps_endpoints.split(",") + w0_ep, w1_ep = worker_endpoints + + tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method nccl2" + tr0_cmd = tr_cmd % \ + (self._python_interp, model, self._ps_endpoints, + 0, w0_ep) + tr1_cmd = tr_cmd % \ + (self._python_interp, model, self._ps_endpoints, + 1, w1_ep) + + if self._mem_opt: + tr0_cmd += " --mem_opt" + tr1_cmd += " --mem_opt" + if self._use_reduce: + tr0_cmd += " --use_reduce" + tr1_cmd += " --use_reduce" + if self._use_reader_alloc: + tr0_cmd += " --use_reader_alloc" + tr1_cmd += " --use_reader_alloc" + if self.__use_cuda: + tr0_cmd += " --use_cuda" + tr1_cmd += " --use_cuda" + env0 = {"CUDA_VISIBLE_DEVICES": "0"} + env1 = {"CUDA_VISIBLE_DEVICES": "1"} + else: + env0 = {'CPU_NUM': '1'} + env1 = {'CPU_NUM': '1'} + + env0.update(envs) + env1.update(envs) + + print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0)) + print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1)) + tr0_pipe = open("/tmp/tr0_err.log", "wb") + tr1_pipe = open("/tmp/tr1_err.log", "wb") + + tr0_proc = subprocess.Popen( + tr0_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr0_pipe, + env=env0) + tr1_proc = subprocess.Popen( + tr1_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr1_pipe, + env=env1) + + tr0_out, tr0_err = tr0_proc.communicate() + tr1_out, tr1_err = tr1_proc.communicate() + + # close trainer file + tr0_pipe.close() + tr1_pipe.close() + + # print log + sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err) + sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) + sys.stderr.write('trainer 0 stdout: %s\n' % tr0_out) + sys.stderr.write('trainer 1 stdout: %s\n' % tr1_out) + return pickle.loads(tr0_out), pickle.loads(tr1_out) def check_with_place(self, @@ -387,20 +475,25 @@ class TestDistBase(unittest.TestCase): "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_cudnn_deterministic": "1", - "http_proxy": "" + "http_proxy": "", + "NCCL_P2P_DISABLE": "1" } required_envs.update(need_envs) if check_error_log: - required_envs["GLOG_v"] = "7" + required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" local_losses\ = self._run_local(model_file, required_envs, check_error_log) - tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs, - check_error_log) + if self._nccl2_mode: + tr0_losses, tr1_losses = self._run_cluster_nccl2( + model_file, required_envs, check_error_log) + else: + tr0_losses, tr1_losses = self._run_cluster( + model_file, required_envs, check_error_log) for step_id in range(RUN_STEP): local_loss = local_losses[step_id] diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 81eb651878..630bed198f 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -26,6 +26,19 @@ class TestDistMnist2x2(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-5) +class TestDistMnistNCCL2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._nccl2_mode = True + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place("dist_mnist.py", delta=1) + + class TestDistMnist2x2Lars(TestDistBase): def _setup_config(self): self._sync_mode = True diff --git a/python/paddle/fluid/tests/unittests/test_dist_save_load.py b/python/paddle/fluid/tests/unittests/test_dist_save_load.py index ea2b554dac..4588ca7c17 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_save_load.py +++ b/python/paddle/fluid/tests/unittests/test_dist_save_load.py @@ -44,7 +44,7 @@ class TestDistSaveLoadDense2x2(TestDistBase): required_envs.update(need_envs) if check_error_log: - required_envs["GLOG_v"] = "7" + required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" model_dir = tempfile.mkdtemp() diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index 194387bc98..d9ad4e2e2c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -769,6 +769,7 @@ class TestNCCL2Transpile(TranspilerTest): config = fluid.DistributeTranspilerConfig() config.mode = "nccl2" + config.wait_port = False t = fluid.DistributeTranspiler(config=config) t.transpile( 0, diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 1d867d9194..3b898af706 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -142,6 +142,7 @@ class DistributeTranspilerConfig(object): # supported modes: pserver, nccl2 mode = "pserver" print_log = False + wait_port = True class DistributeTranspiler(object): @@ -171,7 +172,6 @@ class DistributeTranspiler(object): trainer_id = 0 trainers = 4 role = os.getenv("PADDLE_TRAINING_ROLE") - t = fluid.DistributeTranspiler() t.transpile( trainer_id, pservers=pserver_endpoints, trainers=trainers) @@ -214,13 +214,16 @@ class DistributeTranspiler(object): trainer_id, trainers, current_endpoint, - startup_program=None): + startup_program=None, + wait_port=True): if not startup_program: startup_program = default_startup_program() if trainer_id >= 0: worker_endpoints = trainers.split(",") # send NCCL_ID to others or recv from trainer 0 worker_endpoints.remove(current_endpoint) + if trainer_id == 0 and wait_port: + wait_server_ready(worker_endpoints) nccl_id_var = startup_program.global_block().create_var( name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) @@ -306,7 +309,8 @@ class DistributeTranspiler(object): trainer_id, trainers, current_endpoint, - startup_program=startup_program) + startup_program=startup_program, + wait_port=self.config.wait_port) return self.trainer_num = trainers @@ -652,9 +656,6 @@ class DistributeTranspiler(object): # NOTE: assume blocks of the same variable is not distributed # on the same pserver, only change param/grad varnames for # trainers to fetch. - sys.stderr.write("get_pserver_program() is deprecated, call \ -get_pserver_programs() to get pserver main and startup \ -in a single call.") # step1 pserver_program = Program() pserver_program.random_seed = self.origin_program.random_seed @@ -922,18 +923,6 @@ in a single call.") Returns: Program: parameter server side startup program. """ - sys.stderr.write("get_startup_program() is deprecated, call \ -get_pserver_programs() to get pserver main and startup \ -in a single call.") - if pserver_program != None: - sys.stderr.write("passing pserver_program to get_startup_program() \ -is deprecated, you can use new API get_pserver_programs() to \ -get both pserver main program and startup program.") - if startup_program != None: - sys.stderr.write("passing startup_program to get_startup_program() \ -is deprecated, use fluid.program_guard() or pass this argument \ -to transpile() call.") - s_prog = Program() orig_s_prog = self.startup_program s_prog.random_seed = orig_s_prog.random_seed -- GitLab