From 7309f8ab4f609f70f25205f1134850a4dc4219ca Mon Sep 17 00:00:00 2001 From: wentao yu Date: Tue, 13 Jun 2023 14:37:12 +0800 Subject: [PATCH] Fix cuda12 timeout (#54540) * fix a100 cuda12 timeout * fix cuda12 pickle loads problem * fix ist_sharding_save ut --- test/auto_parallel/CMakeLists.txt | 2 +- test/legacy_test/CMakeLists.txt | 2 +- test/legacy_test/dist_sharding_save.py | 5 +- test/legacy_test/test_dist_base.py | 68 +++++++++++++++++++++----- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index c805071af32..61c1b578030 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -24,7 +24,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_optimization_tuner_api MODULES test_optimization_tuner_api) set_tests_properties(test_optimization_tuner_api - PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80) + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100) py_test_modules(test_converter MODULES test_converter) set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) diff --git a/test/legacy_test/CMakeLists.txt b/test/legacy_test/CMakeLists.txt index ed7ba083808..5b7ffd451ec 100644 --- a/test/legacy_test/CMakeLists.txt +++ b/test/legacy_test/CMakeLists.txt @@ -1305,4 +1305,4 @@ set_tests_properties(test_reduce_op_static_build PROPERTIES TIMEOUT 500) set_tests_properties(test_sync_batch_norm_op_static_build PROPERTIES LABELS "RUN_TYPE=DIST") set_tests_properties(test_sync_batch_norm_op_static_build PROPERTIES TIMEOUT - 120) + 160) diff --git a/test/legacy_test/dist_sharding_save.py b/test/legacy_test/dist_sharding_save.py index 9466e325a73..b022fbc4dd3 100755 --- a/test/legacy_test/dist_sharding_save.py +++ b/test/legacy_test/dist_sharding_save.py @@ -13,10 +13,9 @@ # limitations under the License. import os -import pickle -import sys from dist_mnist import cnn_model # noqa: F401 +from test_dist_base import dump_output import paddle from paddle import fluid @@ -83,7 +82,7 @@ def runtime_main(): ) out_losses = [] - sys.stdout.buffer.write(pickle.dumps(out_losses)) + dump_output(out_losses) if __name__ == "__main__": diff --git a/test/legacy_test/test_dist_base.py b/test/legacy_test/test_dist_base.py index 94fc67f83f6..5847fe0e799 100755 --- a/test/legacy_test/test_dist_base.py +++ b/test/legacy_test/test_dist_base.py @@ -48,6 +48,19 @@ def print_to_out(out_losses): sys.stdout.buffer.write(pickle.dumps(out_losses)) +def dump_output(x): + path = os.environ['DUMP_FILE'] + with open(path, 'wb') as f: + pickle.dump(x, f) + + +def load_and_remove(path): + with open(path, 'rb') as f: + out = pickle.load(f) + os.remove(path) + return out + + def print_to_err(class_name, log_str): localtime = time.asctime(time.localtime(time.time())) print_str = localtime + "\t" + class_name + "\t" + log_str @@ -210,7 +223,7 @@ class TestDistRunnerBase: data_loader.reset() print_to_err(type(self).__name__, "trainer run finished") - sys.stdout.buffer.write(pickle.dumps(out_losses)) + dump_output(out_losses) def run_use_fleet_api_20_trainer(self, args): """ @@ -291,7 +304,7 @@ class TestDistRunnerBase: print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, f"dist losses: {out_losses}") - sys.stdout.buffer.write(pickle.dumps(out_losses)) + dump_output(out_losses) def run_use_fleet_api_trainer(self, args): assert args.update_method == "nccl2" or "bkcl" @@ -386,7 +399,7 @@ class TestDistRunnerBase: print_to_err(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "trainer run finished") - sys.stdout.buffer.write(pickle.dumps(out_losses)) + dump_output(out_losses) if args.save_model: model_save_dir = "/tmp" @@ -628,7 +641,7 @@ class TestDistRunnerBase: # print_to_err(type(self).__name__, "out_losses") sys.stdout = old_stdout - print_to_out(out_losses) + dump_output(out_losses) class TestParallelDyGraphRunnerBase: @@ -751,7 +764,7 @@ class TestParallelDyGraphRunnerBase: opt.minimize(loss) if not args.accumulate_gradient: model.clear_gradients() - print_to_out(out_losses) + dump_output(out_losses) def run_trainer_with_spawn(self, args): # 1. enable dygraph @@ -836,7 +849,7 @@ class TestParallelDyGraphRunnerBase: opt.step() if not args.accumulate_gradient: opt.clear_grad() - print_to_out(out_losses) + dump_output(out_losses) def runtime_main(test_class): @@ -1139,6 +1152,9 @@ class TestDistBase(unittest.TestCase): cmd += " --find_unused_parameters" env_local.update(envs) + cur_pid = os.getpid() + dump_file = f"out_data_local_{cur_pid}.pickled" + env_local["DUMP_FILE"] = dump_file print(f"local_cmd: {cmd}, env: {env_local}") if check_error_log: @@ -1164,9 +1180,8 @@ class TestDistBase(unittest.TestCase): err_log.close() sys.stderr.write('local_stderr: %s\n' % local_err) - sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out)) - return pickle.loads(local_out) + return load_and_remove(dump_file) def _run_local_gloo( self, @@ -1245,6 +1260,14 @@ class TestDistBase(unittest.TestCase): env0.update(envs) env1.update(envs) + cur_pid = os.getpid() + dump_files = [ + f'./out_data_0_{cur_pid}.pickled', + f'./out_data_1_{cur_pid}.pickled', + ] + env0["DUMP_FILE"] = dump_files[0] + env1["DUMP_FILE"] = dump_files[1] + print(f"tr0_cmd: {tr0_cmd}, env: {env0}") print(f"tr1_cmd: {tr1_cmd}, env: {env1}") @@ -1292,7 +1315,7 @@ class TestDistBase(unittest.TestCase): ps0.terminate() ps1.terminate() - return pickle.loads(tr0_out), pickle.loads(tr1_out) + return load_and_remove(dump_files[0]), load_and_remove(dump_files[1]) def _get_gloo_trainer_cmd( self, model, ep, update_method, trainer_id, trainer_num @@ -1479,6 +1502,8 @@ class TestDistBase(unittest.TestCase): procs = [] pipes = [] + dump_files = [] + cur_pid = os.getpid() for i in range(0, trainer_num): tr_cmd, tr_env = self._get_gloo_trainer_cmd( model, worker_endpoints[i], update_method, i, trainer_num @@ -1486,6 +1511,10 @@ class TestDistBase(unittest.TestCase): tr_env.update(envs) tr_env["GLOG_vmodule"] = 'gloo_context=4' tr_env["GLOG_v"] = '3' + + dump_file = f'./out_data_{i}_{cur_pid}.pickled' + dump_files.append(dump_file) + tr_env["DUMP_FILE"] = dump_file print( "use_hallreduce:{} tr_cmd:{}, env: {}".format( self._use_hallreduce, tr_cmd, tr_env @@ -1521,13 +1550,15 @@ class TestDistBase(unittest.TestCase): if trainer_num == 1: if check_error_log: print("outs[0]:", outs[0]) - return pickle.loads(outs[0]) + return load_and_remove(dump_files[0]) else: if check_error_log: print("outs[0]:", outs[0]) print("outs[1]:", outs[1]) - return pickle.loads(outs[0]), pickle.loads(outs[1]) + return load_and_remove(dump_files[0]), load_and_remove( + dump_files[1] + ) def _run_cluster_nccl2( self, model, envs, update_method, check_error_log, log_name @@ -1555,11 +1586,16 @@ class TestDistBase(unittest.TestCase): procs = [] pipes = [] + cur_pid = os.getpid() + dump_files = [] for i in range(0, trainer_num): tr_cmd, tr_env = self._get_nccl2_trainer_cmd( model, worker_endpoints[i], update_method, i, trainer_num ) tr_env.update(envs) + dump_file = f'./out_data_{i}_{cur_pid}.pickled' + dump_files.append(dump_file) + tr_env["DUMP_FILE"] = dump_file print( "use_hallreduce:{} tr_cmd:{}, env: {}".format( self._use_hallreduce, tr_cmd, tr_env @@ -1596,7 +1632,7 @@ class TestDistBase(unittest.TestCase): print("outs[0]:", outs[0]) print("outs[1]:", outs[1]) - return pickle.loads(outs[0]), pickle.loads(outs[1]) + return load_and_remove(dump_files[0]), load_and_remove(dump_files[1]) def _run_pipeline(self, model, envs, check_error_log, log_name): # NOTE: we reuse ps_endpoints as nccl2 worker endpoints @@ -1607,6 +1643,8 @@ class TestDistBase(unittest.TestCase): procs = [] pipes = [] + cur_pid = os.getpid() + dump_files = [] for i in range(0, trainer_num): tr_cmd, tr_env = self._get_nccl2_trainer_cmd( model, worker_endpoints[i], update_method, i, trainer_num @@ -1616,6 +1654,10 @@ class TestDistBase(unittest.TestCase): tr_env['NCCL_SHM_DISABLE'] = '1' tr_env['FLAGS_selected_gpus'] = str(i) tr_env['FLAGS_cudnn_deterministic'] = '0' + + dump_file = f'./out_data_{i}_{cur_pid}.pickled' + dump_files.append(dump_file) + tr_env["DUMP_FILE"] = dump_file print(f"tr_cmd:{tr_cmd}, env: {tr_env}") path = os.path.join(self.temp_dir.name + f"tr{i}_err.log") @@ -1645,7 +1687,7 @@ class TestDistBase(unittest.TestCase): if check_error_log: print("outs[0]:", outs[0]) print("outs[1]:", outs[1]) - return pickle.loads(outs[0]), pickle.loads(outs[1]) + return load_and_remove(dump_files[0]), load_and_remove(dump_files[1]) def _get_required_envs(self, check_error_log=False, need_envs={}): # TODO(typhoonzero): should auto adapt GPU count on the machine. -- GitLab