diff --git a/python/paddle/fluid/tests/unittests/test_collective_api_base.py b/python/paddle/fluid/tests/unittests/test_collective_api_base.py index f4eb31032da252a2b6973f70d4573880def490fb..500191d666b5ed0a86ff94386067ea721e1520ed 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_api_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_api_base.py @@ -32,6 +32,7 @@ from paddle.fluid import core class TestCollectiveAPIRunnerBase(object): + def get_model(self, train_prog, startup_prog, rank, indata=None): raise NotImplementedError( "get model should be implemented by child class.") @@ -91,6 +92,7 @@ from contextlib import closing class TestDistBase(unittest.TestCase): + def setUp(self): self._port_set = set() self._trainers = 2 @@ -104,6 +106,7 @@ class TestDistBase(unittest.TestCase): self.temp_dir.cleanup() def _find_free_port(self): + def __free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -168,17 +171,15 @@ class TestDistBase(unittest.TestCase): tr0_pipe = open(path0, "w") tr1_pipe = open(path1, "w") #print(tr0_cmd) - tr0_proc = subprocess.Popen( - tr0_cmd.strip().split(), - stdout=subprocess.PIPE, - stderr=tr0_pipe, - env=env0) - - tr1_proc = subprocess.Popen( - tr0_cmd.strip().split(), - stdout=subprocess.PIPE, - stderr=tr1_pipe, - env=env1) + tr0_proc = subprocess.Popen(tr0_cmd.strip().split(), + stdout=subprocess.PIPE, + stderr=tr0_pipe, + env=env0) + + tr1_proc = subprocess.Popen(tr0_cmd.strip().split(), + stdout=subprocess.PIPE, + stderr=tr1_pipe, + env=env1) tr0_out, tr0_err = tr0_proc.communicate() tr1_out, tr1_err = tr1_proc.communicate() @@ -220,8 +221,14 @@ class TestDistBase(unittest.TestCase): required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" required_envs["GLOO_LOG_LEVEL"] = "TRACE" - tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file, - required_envs) + + if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None: + required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv( + 'NVIDIA_TF32_OVERRIDE', '') + + tr0_out, tr1_out, pid0, pid1 = self._run_cluster( + model_file, required_envs) + np.random.seed(pid0) input1 = np.random.random((10, 1000)) np.random.seed(pid1) @@ -248,11 +255,9 @@ class TestDistBase(unittest.TestCase): elif col_type == "allreduce": need_result = input1 + input2 self.assertTrue( - np.allclose( - tr0_out, need_result, rtol=1e-05, atol=1e-05)) + np.allclose(tr0_out, need_result, rtol=1e-05, atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out, need_result, rtol=1e-05, atol=1e-05)) + np.allclose(tr1_out, need_result, rtol=1e-05, atol=1e-05)) elif col_type == "parallel_embedding": result_data = tr0_out[0] np.random.seed(2020) @@ -260,24 +265,23 @@ class TestDistBase(unittest.TestCase): for i in range(result_data.shape[0]): for j in range(result_data.shape[1]): data = result_data[i][j] - assert np.allclose( - tr0_out[1][i][j], need_result[data], atol=1e-08) + assert np.allclose(tr0_out[1][i][j], + need_result[data], + atol=1e-08) elif col_type == "row_parallel_linear": result_data = tr0_out[0] np.random.seed(2020) weight = np.random.rand(1000, 16) need_result = np.matmul(input1, weight) self.assertTrue( - np.allclose( - result_data, need_result, rtol=1e-05, atol=1e-05)) + np.allclose(result_data, need_result, rtol=1e-05, atol=1e-05)) elif col_type == "column_parallel_linear": result_data = tr0_out[0] np.random.seed(2020) weight = np.random.rand(1000, 16) need_result = np.matmul(input1, weight) self.assertTrue( - np.allclose( - result_data, need_result, rtol=1e-05, atol=1e-05)) + np.allclose(result_data, need_result, rtol=1e-05, atol=1e-05)) elif col_type == "alltoall": need_result1 = np.vstack((input1[0:input1.shape[0] // 2, :], input2[0:input2.shape[0] // 2, :])) @@ -286,16 +290,13 @@ class TestDistBase(unittest.TestCase): tr0_out = np.vstack(tr0_out) tr1_out = np.vstack(tr1_out) self.assertTrue( - np.allclose( - tr0_out, need_result1, rtol=1e-05, atol=1e-05)) + np.allclose(tr0_out, need_result1, rtol=1e-05, atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out, need_result2, rtol=1e-05, atol=1e-05)) + np.allclose(tr1_out, need_result2, rtol=1e-05, atol=1e-05)) elif col_type == "sendrecv": result_data = tr1_out[0] self.assertTrue( - np.allclose( - input1, result_data, rtol=1e-05, atol=1e-05)) + np.allclose(input1, result_data, rtol=1e-05, atol=1e-05)) elif col_type == "global_gather": in_feat = 2 n_expert = 2 @@ -372,15 +373,13 @@ class TestDistBase(unittest.TestCase): if result1 == []: output1 = np.array([]) else: - output1 = np.concatenate( - result1, axis=0).reshape( - sum(local_expert_count1), in_feat) + output1 = np.concatenate(result1, axis=0).reshape( + sum(local_expert_count1), in_feat) if result2 == []: output2 = np.array([]) else: - output2 = np.concatenate( - result2, axis=0).reshape( - sum(local_expert_count2), in_feat) + output2 = np.concatenate(result2, axis=0).reshape( + sum(local_expert_count2), in_feat) if tr0_out[0] is None or tr0_out[0].shape[0] == 0: tr0_out[0] = np.array([]) @@ -389,24 +388,20 @@ class TestDistBase(unittest.TestCase): tr1_out[0] = np.array([]) self.assertTrue( - np.allclose( - tr0_out[0], output1, rtol=1e-05, atol=1e-05)) + np.allclose(tr0_out[0], output1, rtol=1e-05, atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out[0], output2, rtol=1e-05, atol=1e-05)) + np.allclose(tr1_out[0], output2, rtol=1e-05, atol=1e-05)) if static_mode == 0: self.assertTrue( - np.allclose( - tr0_out[1], - 2 * local_input_buf1, - rtol=1e-05, - atol=1e-05)) + np.allclose(tr0_out[1], + 2 * local_input_buf1, + rtol=1e-05, + atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out[1], - 2 * local_input_buf2, - rtol=1e-05, - atol=1e-05)) + np.allclose(tr1_out[1], + 2 * local_input_buf2, + rtol=1e-05, + atol=1e-05)) elif col_type == "global_scatter": np.random.seed(pid0) @@ -460,23 +455,19 @@ class TestDistBase(unittest.TestCase): tr1_out[0] = np.array([]) self.assertTrue( - np.allclose( - tr0_out[0], output1, rtol=1e-05, atol=1e-05)) + np.allclose(tr0_out[0], output1, rtol=1e-05, atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out[0], output2, rtol=1e-05, atol=1e-05)) + np.allclose(tr1_out[0], output2, rtol=1e-05, atol=1e-05)) if static_mode == 0: self.assertTrue( - np.allclose( - tr0_out[1], - 2 * local_input_buf1, - rtol=1e-05, - atol=1e-05)) + np.allclose(tr0_out[1], + 2 * local_input_buf1, + rtol=1e-05, + atol=1e-05)) self.assertTrue( - np.allclose( - tr1_out[1], - 2 * local_input_buf2, - rtol=1e-05, - atol=1e-05)) + np.allclose(tr1_out[1], + 2 * local_input_buf2, + rtol=1e-05, + atol=1e-05)) else: pass diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 131b47bc57a8b1ebace2d195f21bd7f18d950e39..63bb8f558552c6bc4d4b19a1ab4adf93577f00d1 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -59,6 +59,7 @@ def eprint(*args, **kwargs): class TestDistRunnerBase(object): + def get_model(self, batch_size=DEFAULT_BATCH_SIZE, lr=0.1, @@ -88,13 +89,12 @@ class TestDistRunnerBase(object): config.nccl_comm_num = nccl_comm_num # config.runtime_split_send_recv = True t = fluid.DistributeTranspiler(config=config) - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers, - sync_mode=sync_mode, - current_endpoint=current_endpoint) + t.transpile(trainer_id=trainer_id, + program=main_program, + pservers=pserver_endpoints, + trainers=trainers, + sync_mode=sync_mode, + current_endpoint=current_endpoint) return t @staticmethod @@ -111,14 +111,13 @@ class TestDistRunnerBase(object): self.get_model(batch_size=args.batch_size) # NOTE: pserver should not call memory optimize - t = self.get_transpiler( - trainer_id=args.trainer_id, - main_program=fluid.default_main_program(), - pserver_endpoints=args.endpoints, - trainers=args.trainers, - sync_mode=args.sync_mode, - dc_asgd=args.dc_asgd, - hogwild_mode=args.hogwild) + t = self.get_transpiler(trainer_id=args.trainer_id, + main_program=fluid.default_main_program(), + pserver_endpoints=args.endpoints, + trainers=args.trainers, + sync_mode=args.sync_mode, + dc_asgd=args.dc_asgd, + hogwild_mode=args.hogwild) pserver_prog = t.get_pserver_program(args.current_endpoint) startup_prog = t.get_startup_program(args.current_endpoint, pserver_prog) @@ -195,8 +194,8 @@ class TestDistRunnerBase(object): eprint(type(self).__name__, "run worker startup program done.") feed_var_list = [ - var - for var in fluid.default_main_program().global_block().vars.values() + var for var in + fluid.default_main_program().global_block().vars.values() if var.is_data ] @@ -366,14 +365,13 @@ class TestDistRunnerBase(object): print_to_err( type(self).__name__, "begin to run transpile on trainer with pserver mode") - t = self.get_transpiler( - trainer_id=args.trainer_id, - main_program=fluid.default_main_program(), - pserver_endpoints=args.endpoints, - trainers=args.trainers, - sync_mode=args.sync_mode, - dc_asgd=args.dc_asgd, - hogwild_mode=args.hogwild) + t = self.get_transpiler(trainer_id=args.trainer_id, + main_program=fluid.default_main_program(), + pserver_endpoints=args.endpoints, + trainers=args.trainers, + sync_mode=args.sync_mode, + dc_asgd=args.dc_asgd, + hogwild_mode=args.hogwild) trainer_prog = t.get_trainer_program() print_to_err( @@ -391,12 +389,11 @@ class TestDistRunnerBase(object): type(self).__name__, "begin to run transpile on trainer with nccl2 mode") nccl2_t = fluid.DistributeTranspiler(config=config) - nccl2_t.transpile( - args.trainer_id, - program=fluid.default_main_program(), - startup_program=fluid.default_startup_program(), - trainers=args.endpoints, - current_endpoint=args.current_endpoint) + nccl2_t.transpile(args.trainer_id, + program=fluid.default_main_program(), + startup_program=fluid.default_startup_program(), + trainers=args.endpoints, + current_endpoint=args.current_endpoint) print_to_err( type(self).__name__, "get trainer program done. with nccl2 mode") @@ -502,6 +499,7 @@ class TestDistRunnerBase(object): class TestParallelDyGraphRunnerBase(object): + def get_model(self): raise NotImplementedError( "get_model should be implemented by child classes.") @@ -517,9 +515,9 @@ class TestParallelDyGraphRunnerBase(object): elif args.update_method != "local": new_batch = [] - # NOTE(@xiongkun03) args.diff_batch means batch length is different: - # such as : batch = [2,3,4,5], then the first rank will get [2] and - # the second rank will get [3,4,5]. + # NOTE(@xiongkun03) args.diff_batch means batch length is different: + # such as : batch = [2,3,4,5], then the first rank will get [2] and + # the second rank will get [3,4,5]. # this function is for test sparse_embedding_differ_length if hasattr(args, "diff_batch") and args.diff_batch: assert len( @@ -700,17 +698,18 @@ class TestParallelDyGraphRunnerBase(object): def runtime_main(test_class): parser = argparse.ArgumentParser(description='Run dist test.') - parser.add_argument( - '--role', type=str, required=True, choices=['pserver', 'trainer']) + parser.add_argument('--role', + type=str, + required=True, + choices=['pserver', 'trainer']) parser.add_argument('--endpoints', type=str, required=False, default="") - parser.add_argument( - '--update_method', - type=str, - default="local", - choices=[ - "pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo", - "hccl" - ]) + parser.add_argument('--update_method', + type=str, + default="local", + choices=[ + "pserver", "nccl2", "bkcl", "local", + "nccl2_reduce_layer", "gloo", "hccl" + ]) parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) @@ -722,10 +721,14 @@ def runtime_main(test_class): parser.add_argument('--use_local_sgd', action='store_true') parser.add_argument('--diff_batch', action='store_true') parser.add_argument('--ut4grad_allreduce', action='store_true') - parser.add_argument( - '--hallreduce_inter_nranks', type=int, required=False, default=2) - parser.add_argument( - '--current_endpoint', type=str, required=False, default="") + parser.add_argument('--hallreduce_inter_nranks', + type=int, + required=False, + default=2) + parser.add_argument('--current_endpoint', + type=str, + required=False, + default="") parser.add_argument('--sync_mode', action='store_true') parser.add_argument('--use_cuda', action='store_true') parser.add_argument('--use_cpu', action='store_true') @@ -738,23 +741,24 @@ def runtime_main(test_class): parser.add_argument('--dc_asgd', action='store_true') parser.add_argument('--hogwild', action='store_true') parser.add_argument('--save_model', action='store_true') - parser.add_argument( - '--use_reader_alloc', action='store_true', required=False) + parser.add_argument('--use_reader_alloc', + action='store_true', + required=False) parser.add_argument('--batch_size', required=False, type=int, default=2) parser.add_argument('--lr', required=False, type=float, default=0.001) - parser.add_argument( - '--batch_merge_repeat', required=False, type=int, default=1) - parser.add_argument( - '--nccl2_reduce_layer_local_run', - required=False, - type=bool, - default=False) + parser.add_argument('--batch_merge_repeat', + required=False, + type=int, + default=1) + parser.add_argument('--nccl2_reduce_layer_local_run', + required=False, + type=bool, + default=False) parser.add_argument('--sync_batch_norm', action='store_true') - parser.add_argument( - '--fuse_all_reduce', - required=False, - type=ast.literal_eval, - default=None) + parser.add_argument('--fuse_all_reduce', + required=False, + type=ast.literal_eval, + default=None) args = parser.parse_args() @@ -780,6 +784,7 @@ from contextlib import closing class TestDistBase(unittest.TestCase): + def _setup_config(self): raise NotImplementedError("tests should have _setup_config implemented") @@ -873,6 +878,7 @@ class TestDistBase(unittest.TestCase): self.temp_dir.cleanup() def _find_free_port(self): + def __free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -920,17 +926,15 @@ class TestDistBase(unittest.TestCase): ps1_pipe = open(path1, "wb") print_to_err(type(self).__name__, "going to start pserver process 0") - ps0_proc = subprocess.Popen( - ps0_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=ps0_pipe, - env=required_envs) + ps0_proc = subprocess.Popen(ps0_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=ps0_pipe, + env=required_envs) print_to_err(type(self).__name__, "going to start pserver process 1") - ps1_proc = subprocess.Popen( - ps1_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=ps1_pipe, - env=required_envs) + ps1_proc = subprocess.Popen(ps1_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=ps1_pipe, + env=required_envs) return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe @@ -999,17 +1003,15 @@ class TestDistBase(unittest.TestCase): if check_error_log: path = os.path.join(self.temp_dir.name, log_name + "_local.log") err_log = open(path, "wb") - local_proc = subprocess.Popen( - cmd.split(" "), - stdout=subprocess.PIPE, - stderr=err_log, - env=env_local) + local_proc = subprocess.Popen(cmd.split(" "), + stdout=subprocess.PIPE, + stderr=err_log, + env=env_local) else: - local_proc = subprocess.Popen( - cmd.split(" "), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env_local) + local_proc = subprocess.Popen(cmd.split(" "), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env_local) local_out, local_err = local_proc.communicate() @@ -1038,8 +1040,10 @@ class TestDistBase(unittest.TestCase): def _run_cluster(self, model, envs, check_error_log, log_name): # Run dist train to compare with local results - ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver( - model, check_error_log, envs, log_name=log_name) + ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model, + check_error_log, + envs, + log_name=log_name) ps0_ep, ps1_ep = self._ps_endpoints.split(",") @@ -1091,17 +1095,15 @@ class TestDistBase(unittest.TestCase): tr1_pipe = open(path1, "wb") print_to_err(type(self).__name__, "going to start trainer process 0") - tr0_proc = subprocess.Popen( - tr0_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr0_pipe, - env=env0) + tr0_proc = subprocess.Popen(tr0_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr0_pipe, + env=env0) print_to_err(type(self).__name__, "going to start trainer process 1") - tr1_proc = subprocess.Popen( - tr1_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr1_pipe, - env=env1) + tr1_proc = subprocess.Popen(tr1_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr1_pipe, + env=env1) # Wait until trainer process terminate while True: @@ -1296,8 +1298,10 @@ class TestDistBase(unittest.TestCase): procs = [] pipes = [] for i in range(0, trainer_num): - tr_cmd, tr_env = self._get_gloo_trainer_cmd( - model, worker_endpoints[i], update_method, i, trainer_num) + tr_cmd, tr_env = self._get_gloo_trainer_cmd(model, + worker_endpoints[i], + update_method, i, + trainer_num) tr_env.update(envs) tr_env["GLOG_vmodule"] = 'gloo_context=4' tr_env["GLOG_v"] = '3' @@ -1311,11 +1315,10 @@ class TestDistBase(unittest.TestCase): print_to_err( type(self).__name__, "going to start process {} with nccl2".format(i)) - tr_proc = subprocess.Popen( - tr_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr_pipe, - env=tr_env) + tr_proc = subprocess.Popen(tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) procs.append(tr_proc) pipes.append(tr_pipe) @@ -1375,11 +1378,10 @@ class TestDistBase(unittest.TestCase): print_to_err( type(self).__name__, "going to start process {} with nccl2".format(i)) - tr_proc = subprocess.Popen( - tr_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr_pipe, - env=tr_env) + tr_proc = subprocess.Popen(tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) procs.append(tr_proc) pipes.append(tr_pipe) @@ -1422,11 +1424,10 @@ class TestDistBase(unittest.TestCase): print_to_err( type(self).__name__, "going to start process {} with nccl2".format(i)) - tr_proc = subprocess.Popen( - tr_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr_pipe, - env=tr_env) + tr_proc = subprocess.Popen(tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) procs.append(tr_proc) pipes.append(tr_pipe) @@ -1467,6 +1468,10 @@ class TestDistBase(unittest.TestCase): "grpc_server=10,request_handler_impl=10,section_worker=10" required_envs["GLOG_logtostderr"] = "1" + if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None: + required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv( + 'NVIDIA_TF32_OVERRIDE', '') + required_envs.update(need_envs) return required_envs @@ -1478,25 +1483,22 @@ class TestDistBase(unittest.TestCase): log_name=""): if self._dygraph and (self._gloo_mode or self._nccl2_mode): with _test_eager_guard(): - self.check_with_place_func( - model_file=model_file, - delta=delta, - check_error_log=check_error_log, - need_envs=need_envs, - log_name=log_name) - self.check_with_place_func( - model_file=model_file, - delta=delta, - check_error_log=check_error_log, - need_envs=need_envs, - log_name=log_name) + self.check_with_place_func(model_file=model_file, + delta=delta, + check_error_log=check_error_log, + need_envs=need_envs, + log_name=log_name) + self.check_with_place_func(model_file=model_file, + delta=delta, + check_error_log=check_error_log, + need_envs=need_envs, + log_name=log_name) else: - self.check_with_place_func( - model_file=model_file, - delta=delta, - check_error_log=check_error_log, - need_envs=need_envs, - log_name=log_name) + self.check_with_place_func(model_file=model_file, + delta=delta, + check_error_log=check_error_log, + need_envs=need_envs, + log_name=log_name) def check_with_place_func(self, model_file, @@ -1554,11 +1556,15 @@ class TestDistBase(unittest.TestCase): log_name=log_name) elif self._pipeline_mode: - tr0_losses, tr1_losses = self._run_pipeline( - model_file, required_envs, check_error_log, log_name=log_name) + tr0_losses, tr1_losses = self._run_pipeline(model_file, + required_envs, + check_error_log, + log_name=log_name) else: - tr0_losses, tr1_losses = self._run_cluster( - model_file, required_envs, check_error_log, log_name=log_name) + tr0_losses, tr1_losses = self._run_cluster(model_file, + required_envs, + check_error_log, + log_name=log_name) for step_id in range(RUN_STEP): local_loss = local_losses[step_id] @@ -1584,20 +1590,19 @@ class TestDistBase(unittest.TestCase): required_envs = self._get_required_envs(check_error_log, need_envs) if self._use_dgc: - multi_cards_losses = self._run_local( - model_file, - required_envs, - check_error_log, - log_name=log_name + "_dgc_2cards", - devices="0,1") + multi_cards_losses = self._run_local(model_file, + required_envs, + check_error_log, + log_name=log_name + + "_dgc_2cards", + devices="0,1") self._use_dgc = False - base_losses = self._run_local( - model_file, - required_envs, - check_error_log, - log_name=log_name + "_base_2cards", - devices="0,1") + base_losses = self._run_local(model_file, + required_envs, + check_error_log, + log_name=log_name + "_base_2cards", + devices="0,1") self._use_dgc = True