未验证 提交 7309f8ab 编写于 作者: TaoTao Li's avatar TaoTao Li 提交者: GitHub

Fix cuda12 timeout (#54540)

* fix a100 cuda12 timeout

* fix cuda12 pickle loads problem

* fix ist_sharding_save ut
上级 3b4bdc08
...@@ -24,7 +24,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU) ...@@ -24,7 +24,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_optimization_tuner_api MODULES py_test_modules(test_optimization_tuner_api MODULES
test_optimization_tuner_api) test_optimization_tuner_api)
set_tests_properties(test_optimization_tuner_api set_tests_properties(test_optimization_tuner_api
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80) PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100)
py_test_modules(test_converter MODULES test_converter) py_test_modules(test_converter MODULES test_converter)
set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE"
TIMEOUT 50) TIMEOUT 50)
......
...@@ -1305,4 +1305,4 @@ set_tests_properties(test_reduce_op_static_build PROPERTIES TIMEOUT 500) ...@@ -1305,4 +1305,4 @@ set_tests_properties(test_reduce_op_static_build PROPERTIES TIMEOUT 500)
set_tests_properties(test_sync_batch_norm_op_static_build set_tests_properties(test_sync_batch_norm_op_static_build
PROPERTIES LABELS "RUN_TYPE=DIST") PROPERTIES LABELS "RUN_TYPE=DIST")
set_tests_properties(test_sync_batch_norm_op_static_build PROPERTIES TIMEOUT set_tests_properties(test_sync_batch_norm_op_static_build PROPERTIES TIMEOUT
120) 160)
...@@ -13,10 +13,9 @@ ...@@ -13,10 +13,9 @@
# limitations under the License. # limitations under the License.
import os import os
import pickle
import sys
from dist_mnist import cnn_model # noqa: F401 from dist_mnist import cnn_model # noqa: F401
from test_dist_base import dump_output
import paddle import paddle
from paddle import fluid from paddle import fluid
...@@ -83,7 +82,7 @@ def runtime_main(): ...@@ -83,7 +82,7 @@ def runtime_main():
) )
out_losses = [] out_losses = []
sys.stdout.buffer.write(pickle.dumps(out_losses)) dump_output(out_losses)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -48,6 +48,19 @@ def print_to_out(out_losses): ...@@ -48,6 +48,19 @@ def print_to_out(out_losses):
sys.stdout.buffer.write(pickle.dumps(out_losses)) sys.stdout.buffer.write(pickle.dumps(out_losses))
def dump_output(x):
path = os.environ['DUMP_FILE']
with open(path, 'wb') as f:
pickle.dump(x, f)
def load_and_remove(path):
with open(path, 'rb') as f:
out = pickle.load(f)
os.remove(path)
return out
def print_to_err(class_name, log_str): def print_to_err(class_name, log_str):
localtime = time.asctime(time.localtime(time.time())) localtime = time.asctime(time.localtime(time.time()))
print_str = localtime + "\t" + class_name + "\t" + log_str print_str = localtime + "\t" + class_name + "\t" + log_str
...@@ -210,7 +223,7 @@ class TestDistRunnerBase: ...@@ -210,7 +223,7 @@ class TestDistRunnerBase:
data_loader.reset() data_loader.reset()
print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
sys.stdout.buffer.write(pickle.dumps(out_losses)) dump_output(out_losses)
def run_use_fleet_api_20_trainer(self, args): def run_use_fleet_api_20_trainer(self, args):
""" """
...@@ -291,7 +304,7 @@ class TestDistRunnerBase: ...@@ -291,7 +304,7 @@ class TestDistRunnerBase:
print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
print_to_err(type(self).__name__, f"dist losses: {out_losses}") print_to_err(type(self).__name__, f"dist losses: {out_losses}")
sys.stdout.buffer.write(pickle.dumps(out_losses)) dump_output(out_losses)
def run_use_fleet_api_trainer(self, args): def run_use_fleet_api_trainer(self, args):
assert args.update_method == "nccl2" or "bkcl" assert args.update_method == "nccl2" or "bkcl"
...@@ -386,7 +399,7 @@ class TestDistRunnerBase: ...@@ -386,7 +399,7 @@ class TestDistRunnerBase:
print_to_err(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "run step %d finished" % i)
print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
sys.stdout.buffer.write(pickle.dumps(out_losses)) dump_output(out_losses)
if args.save_model: if args.save_model:
model_save_dir = "/tmp" model_save_dir = "/tmp"
...@@ -628,7 +641,7 @@ class TestDistRunnerBase: ...@@ -628,7 +641,7 @@ class TestDistRunnerBase:
# print_to_err(type(self).__name__, "out_losses") # print_to_err(type(self).__name__, "out_losses")
sys.stdout = old_stdout sys.stdout = old_stdout
print_to_out(out_losses) dump_output(out_losses)
class TestParallelDyGraphRunnerBase: class TestParallelDyGraphRunnerBase:
...@@ -751,7 +764,7 @@ class TestParallelDyGraphRunnerBase: ...@@ -751,7 +764,7 @@ class TestParallelDyGraphRunnerBase:
opt.minimize(loss) opt.minimize(loss)
if not args.accumulate_gradient: if not args.accumulate_gradient:
model.clear_gradients() model.clear_gradients()
print_to_out(out_losses) dump_output(out_losses)
def run_trainer_with_spawn(self, args): def run_trainer_with_spawn(self, args):
# 1. enable dygraph # 1. enable dygraph
...@@ -836,7 +849,7 @@ class TestParallelDyGraphRunnerBase: ...@@ -836,7 +849,7 @@ class TestParallelDyGraphRunnerBase:
opt.step() opt.step()
if not args.accumulate_gradient: if not args.accumulate_gradient:
opt.clear_grad() opt.clear_grad()
print_to_out(out_losses) dump_output(out_losses)
def runtime_main(test_class): def runtime_main(test_class):
...@@ -1139,6 +1152,9 @@ class TestDistBase(unittest.TestCase): ...@@ -1139,6 +1152,9 @@ class TestDistBase(unittest.TestCase):
cmd += " --find_unused_parameters" cmd += " --find_unused_parameters"
env_local.update(envs) env_local.update(envs)
cur_pid = os.getpid()
dump_file = f"out_data_local_{cur_pid}.pickled"
env_local["DUMP_FILE"] = dump_file
print(f"local_cmd: {cmd}, env: {env_local}") print(f"local_cmd: {cmd}, env: {env_local}")
if check_error_log: if check_error_log:
...@@ -1164,9 +1180,8 @@ class TestDistBase(unittest.TestCase): ...@@ -1164,9 +1180,8 @@ class TestDistBase(unittest.TestCase):
err_log.close() err_log.close()
sys.stderr.write('local_stderr: %s\n' % local_err) sys.stderr.write('local_stderr: %s\n' % local_err)
sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
return pickle.loads(local_out) return load_and_remove(dump_file)
def _run_local_gloo( def _run_local_gloo(
self, self,
...@@ -1245,6 +1260,14 @@ class TestDistBase(unittest.TestCase): ...@@ -1245,6 +1260,14 @@ class TestDistBase(unittest.TestCase):
env0.update(envs) env0.update(envs)
env1.update(envs) env1.update(envs)
cur_pid = os.getpid()
dump_files = [
f'./out_data_0_{cur_pid}.pickled',
f'./out_data_1_{cur_pid}.pickled',
]
env0["DUMP_FILE"] = dump_files[0]
env1["DUMP_FILE"] = dump_files[1]
print(f"tr0_cmd: {tr0_cmd}, env: {env0}") print(f"tr0_cmd: {tr0_cmd}, env: {env0}")
print(f"tr1_cmd: {tr1_cmd}, env: {env1}") print(f"tr1_cmd: {tr1_cmd}, env: {env1}")
...@@ -1292,7 +1315,7 @@ class TestDistBase(unittest.TestCase): ...@@ -1292,7 +1315,7 @@ class TestDistBase(unittest.TestCase):
ps0.terminate() ps0.terminate()
ps1.terminate() ps1.terminate()
return pickle.loads(tr0_out), pickle.loads(tr1_out) return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])
def _get_gloo_trainer_cmd( def _get_gloo_trainer_cmd(
self, model, ep, update_method, trainer_id, trainer_num self, model, ep, update_method, trainer_id, trainer_num
...@@ -1479,6 +1502,8 @@ class TestDistBase(unittest.TestCase): ...@@ -1479,6 +1502,8 @@ class TestDistBase(unittest.TestCase):
procs = [] procs = []
pipes = [] pipes = []
dump_files = []
cur_pid = os.getpid()
for i in range(0, trainer_num): for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_gloo_trainer_cmd( tr_cmd, tr_env = self._get_gloo_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num model, worker_endpoints[i], update_method, i, trainer_num
...@@ -1486,6 +1511,10 @@ class TestDistBase(unittest.TestCase): ...@@ -1486,6 +1511,10 @@ class TestDistBase(unittest.TestCase):
tr_env.update(envs) tr_env.update(envs)
tr_env["GLOG_vmodule"] = 'gloo_context=4' tr_env["GLOG_vmodule"] = 'gloo_context=4'
tr_env["GLOG_v"] = '3' tr_env["GLOG_v"] = '3'
dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print( print(
"use_hallreduce:{} tr_cmd:{}, env: {}".format( "use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env self._use_hallreduce, tr_cmd, tr_env
...@@ -1521,13 +1550,15 @@ class TestDistBase(unittest.TestCase): ...@@ -1521,13 +1550,15 @@ class TestDistBase(unittest.TestCase):
if trainer_num == 1: if trainer_num == 1:
if check_error_log: if check_error_log:
print("outs[0]:", outs[0]) print("outs[0]:", outs[0])
return pickle.loads(outs[0]) return load_and_remove(dump_files[0])
else: else:
if check_error_log: if check_error_log:
print("outs[0]:", outs[0]) print("outs[0]:", outs[0])
print("outs[1]:", outs[1]) print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1]) return load_and_remove(dump_files[0]), load_and_remove(
dump_files[1]
)
def _run_cluster_nccl2( def _run_cluster_nccl2(
self, model, envs, update_method, check_error_log, log_name self, model, envs, update_method, check_error_log, log_name
...@@ -1555,11 +1586,16 @@ class TestDistBase(unittest.TestCase): ...@@ -1555,11 +1586,16 @@ class TestDistBase(unittest.TestCase):
procs = [] procs = []
pipes = [] pipes = []
cur_pid = os.getpid()
dump_files = []
for i in range(0, trainer_num): for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd( tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num model, worker_endpoints[i], update_method, i, trainer_num
) )
tr_env.update(envs) tr_env.update(envs)
dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print( print(
"use_hallreduce:{} tr_cmd:{}, env: {}".format( "use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env self._use_hallreduce, tr_cmd, tr_env
...@@ -1596,7 +1632,7 @@ class TestDistBase(unittest.TestCase): ...@@ -1596,7 +1632,7 @@ class TestDistBase(unittest.TestCase):
print("outs[0]:", outs[0]) print("outs[0]:", outs[0])
print("outs[1]:", outs[1]) print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1]) return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])
def _run_pipeline(self, model, envs, check_error_log, log_name): def _run_pipeline(self, model, envs, check_error_log, log_name):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints # NOTE: we reuse ps_endpoints as nccl2 worker endpoints
...@@ -1607,6 +1643,8 @@ class TestDistBase(unittest.TestCase): ...@@ -1607,6 +1643,8 @@ class TestDistBase(unittest.TestCase):
procs = [] procs = []
pipes = [] pipes = []
cur_pid = os.getpid()
dump_files = []
for i in range(0, trainer_num): for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd( tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num model, worker_endpoints[i], update_method, i, trainer_num
...@@ -1616,6 +1654,10 @@ class TestDistBase(unittest.TestCase): ...@@ -1616,6 +1654,10 @@ class TestDistBase(unittest.TestCase):
tr_env['NCCL_SHM_DISABLE'] = '1' tr_env['NCCL_SHM_DISABLE'] = '1'
tr_env['FLAGS_selected_gpus'] = str(i) tr_env['FLAGS_selected_gpus'] = str(i)
tr_env['FLAGS_cudnn_deterministic'] = '0' tr_env['FLAGS_cudnn_deterministic'] = '0'
dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print(f"tr_cmd:{tr_cmd}, env: {tr_env}") print(f"tr_cmd:{tr_cmd}, env: {tr_env}")
path = os.path.join(self.temp_dir.name + f"tr{i}_err.log") path = os.path.join(self.temp_dir.name + f"tr{i}_err.log")
...@@ -1645,7 +1687,7 @@ class TestDistBase(unittest.TestCase): ...@@ -1645,7 +1687,7 @@ class TestDistBase(unittest.TestCase):
if check_error_log: if check_error_log:
print("outs[0]:", outs[0]) print("outs[0]:", outs[0])
print("outs[1]:", outs[1]) print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1]) return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])
def _get_required_envs(self, check_error_log=False, need_envs={}): def _get_required_envs(self, check_error_log=False, need_envs={}):
# TODO(typhoonzero): should auto adapt GPU count on the machine. # TODO(typhoonzero): should auto adapt GPU count on the machine.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册