未验证 提交 e7547ca7 编写于 作者: Y Yuang Liu 提交者: GitHub

Pass NVIDIA_TF32_OVERRIDE to internal (#43646) (#44796)

Co-authored-by: Ngongweibao <gongweibao@baidu.com>
上级 6de20581
......@@ -32,6 +32,7 @@ from paddle.fluid import core
class TestCollectiveAPIRunnerBase(object):
def get_model(self, train_prog, startup_prog, rank, indata=None):
raise NotImplementedError(
"get model should be implemented by child class.")
......@@ -91,6 +92,7 @@ from contextlib import closing
class TestDistBase(unittest.TestCase):
def setUp(self):
self._port_set = set()
self._trainers = 2
......@@ -104,6 +106,7 @@ class TestDistBase(unittest.TestCase):
self.temp_dir.cleanup()
def _find_free_port(self):
def __free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
......@@ -168,17 +171,15 @@ class TestDistBase(unittest.TestCase):
tr0_pipe = open(path0, "w")
tr1_pipe = open(path1, "w")
#print(tr0_cmd)
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr1_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
tr0_proc = subprocess.Popen(tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr1_proc = subprocess.Popen(tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
......@@ -220,8 +221,14 @@ class TestDistBase(unittest.TestCase):
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
required_envs["GLOO_LOG_LEVEL"] = "TRACE"
tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file,
required_envs)
if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None:
required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv(
'NVIDIA_TF32_OVERRIDE', '')
tr0_out, tr1_out, pid0, pid1 = self._run_cluster(
model_file, required_envs)
np.random.seed(pid0)
input1 = np.random.random((10, 1000))
np.random.seed(pid1)
......@@ -248,11 +255,9 @@ class TestDistBase(unittest.TestCase):
elif col_type == "allreduce":
need_result = input1 + input2
self.assertTrue(
np.allclose(
tr0_out, need_result, rtol=1e-05, atol=1e-05))
np.allclose(tr0_out, need_result, rtol=1e-05, atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out, need_result, rtol=1e-05, atol=1e-05))
np.allclose(tr1_out, need_result, rtol=1e-05, atol=1e-05))
elif col_type == "parallel_embedding":
result_data = tr0_out[0]
np.random.seed(2020)
......@@ -260,24 +265,23 @@ class TestDistBase(unittest.TestCase):
for i in range(result_data.shape[0]):
for j in range(result_data.shape[1]):
data = result_data[i][j]
assert np.allclose(
tr0_out[1][i][j], need_result[data], atol=1e-08)
assert np.allclose(tr0_out[1][i][j],
need_result[data],
atol=1e-08)
elif col_type == "row_parallel_linear":
result_data = tr0_out[0]
np.random.seed(2020)
weight = np.random.rand(1000, 16)
need_result = np.matmul(input1, weight)
self.assertTrue(
np.allclose(
result_data, need_result, rtol=1e-05, atol=1e-05))
np.allclose(result_data, need_result, rtol=1e-05, atol=1e-05))
elif col_type == "column_parallel_linear":
result_data = tr0_out[0]
np.random.seed(2020)
weight = np.random.rand(1000, 16)
need_result = np.matmul(input1, weight)
self.assertTrue(
np.allclose(
result_data, need_result, rtol=1e-05, atol=1e-05))
np.allclose(result_data, need_result, rtol=1e-05, atol=1e-05))
elif col_type == "alltoall":
need_result1 = np.vstack((input1[0:input1.shape[0] // 2, :],
input2[0:input2.shape[0] // 2, :]))
......@@ -286,16 +290,13 @@ class TestDistBase(unittest.TestCase):
tr0_out = np.vstack(tr0_out)
tr1_out = np.vstack(tr1_out)
self.assertTrue(
np.allclose(
tr0_out, need_result1, rtol=1e-05, atol=1e-05))
np.allclose(tr0_out, need_result1, rtol=1e-05, atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out, need_result2, rtol=1e-05, atol=1e-05))
np.allclose(tr1_out, need_result2, rtol=1e-05, atol=1e-05))
elif col_type == "sendrecv":
result_data = tr1_out[0]
self.assertTrue(
np.allclose(
input1, result_data, rtol=1e-05, atol=1e-05))
np.allclose(input1, result_data, rtol=1e-05, atol=1e-05))
elif col_type == "global_gather":
in_feat = 2
n_expert = 2
......@@ -372,15 +373,13 @@ class TestDistBase(unittest.TestCase):
if result1 == []:
output1 = np.array([])
else:
output1 = np.concatenate(
result1, axis=0).reshape(
sum(local_expert_count1), in_feat)
output1 = np.concatenate(result1, axis=0).reshape(
sum(local_expert_count1), in_feat)
if result2 == []:
output2 = np.array([])
else:
output2 = np.concatenate(
result2, axis=0).reshape(
sum(local_expert_count2), in_feat)
output2 = np.concatenate(result2, axis=0).reshape(
sum(local_expert_count2), in_feat)
if tr0_out[0] is None or tr0_out[0].shape[0] == 0:
tr0_out[0] = np.array([])
......@@ -389,24 +388,20 @@ class TestDistBase(unittest.TestCase):
tr1_out[0] = np.array([])
self.assertTrue(
np.allclose(
tr0_out[0], output1, rtol=1e-05, atol=1e-05))
np.allclose(tr0_out[0], output1, rtol=1e-05, atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out[0], output2, rtol=1e-05, atol=1e-05))
np.allclose(tr1_out[0], output2, rtol=1e-05, atol=1e-05))
if static_mode == 0:
self.assertTrue(
np.allclose(
tr0_out[1],
2 * local_input_buf1,
rtol=1e-05,
atol=1e-05))
np.allclose(tr0_out[1],
2 * local_input_buf1,
rtol=1e-05,
atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out[1],
2 * local_input_buf2,
rtol=1e-05,
atol=1e-05))
np.allclose(tr1_out[1],
2 * local_input_buf2,
rtol=1e-05,
atol=1e-05))
elif col_type == "global_scatter":
np.random.seed(pid0)
......@@ -460,23 +455,19 @@ class TestDistBase(unittest.TestCase):
tr1_out[0] = np.array([])
self.assertTrue(
np.allclose(
tr0_out[0], output1, rtol=1e-05, atol=1e-05))
np.allclose(tr0_out[0], output1, rtol=1e-05, atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out[0], output2, rtol=1e-05, atol=1e-05))
np.allclose(tr1_out[0], output2, rtol=1e-05, atol=1e-05))
if static_mode == 0:
self.assertTrue(
np.allclose(
tr0_out[1],
2 * local_input_buf1,
rtol=1e-05,
atol=1e-05))
np.allclose(tr0_out[1],
2 * local_input_buf1,
rtol=1e-05,
atol=1e-05))
self.assertTrue(
np.allclose(
tr1_out[1],
2 * local_input_buf2,
rtol=1e-05,
atol=1e-05))
np.allclose(tr1_out[1],
2 * local_input_buf2,
rtol=1e-05,
atol=1e-05))
else:
pass
......@@ -59,6 +59,7 @@ def eprint(*args, **kwargs):
class TestDistRunnerBase(object):
def get_model(self,
batch_size=DEFAULT_BATCH_SIZE,
lr=0.1,
......@@ -88,13 +89,12 @@ class TestDistRunnerBase(object):
config.nccl_comm_num = nccl_comm_num
# config.runtime_split_send_recv = True
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id=trainer_id,
program=main_program,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode,
current_endpoint=current_endpoint)
t.transpile(trainer_id=trainer_id,
program=main_program,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode,
current_endpoint=current_endpoint)
return t
@staticmethod
......@@ -111,14 +111,13 @@ class TestDistRunnerBase(object):
self.get_model(batch_size=args.batch_size)
# NOTE: pserver should not call memory optimize
t = self.get_transpiler(
trainer_id=args.trainer_id,
main_program=fluid.default_main_program(),
pserver_endpoints=args.endpoints,
trainers=args.trainers,
sync_mode=args.sync_mode,
dc_asgd=args.dc_asgd,
hogwild_mode=args.hogwild)
t = self.get_transpiler(trainer_id=args.trainer_id,
main_program=fluid.default_main_program(),
pserver_endpoints=args.endpoints,
trainers=args.trainers,
sync_mode=args.sync_mode,
dc_asgd=args.dc_asgd,
hogwild_mode=args.hogwild)
pserver_prog = t.get_pserver_program(args.current_endpoint)
startup_prog = t.get_startup_program(args.current_endpoint,
pserver_prog)
......@@ -195,8 +194,8 @@ class TestDistRunnerBase(object):
eprint(type(self).__name__, "run worker startup program done.")
feed_var_list = [
var
for var in fluid.default_main_program().global_block().vars.values()
var for var in
fluid.default_main_program().global_block().vars.values()
if var.is_data
]
......@@ -366,14 +365,13 @@ class TestDistRunnerBase(object):
print_to_err(
type(self).__name__,
"begin to run transpile on trainer with pserver mode")
t = self.get_transpiler(
trainer_id=args.trainer_id,
main_program=fluid.default_main_program(),
pserver_endpoints=args.endpoints,
trainers=args.trainers,
sync_mode=args.sync_mode,
dc_asgd=args.dc_asgd,
hogwild_mode=args.hogwild)
t = self.get_transpiler(trainer_id=args.trainer_id,
main_program=fluid.default_main_program(),
pserver_endpoints=args.endpoints,
trainers=args.trainers,
sync_mode=args.sync_mode,
dc_asgd=args.dc_asgd,
hogwild_mode=args.hogwild)
trainer_prog = t.get_trainer_program()
print_to_err(
......@@ -391,12 +389,11 @@ class TestDistRunnerBase(object):
type(self).__name__,
"begin to run transpile on trainer with nccl2 mode")
nccl2_t = fluid.DistributeTranspiler(config=config)
nccl2_t.transpile(
args.trainer_id,
program=fluid.default_main_program(),
startup_program=fluid.default_startup_program(),
trainers=args.endpoints,
current_endpoint=args.current_endpoint)
nccl2_t.transpile(args.trainer_id,
program=fluid.default_main_program(),
startup_program=fluid.default_startup_program(),
trainers=args.endpoints,
current_endpoint=args.current_endpoint)
print_to_err(
type(self).__name__,
"get trainer program done. with nccl2 mode")
......@@ -502,6 +499,7 @@ class TestDistRunnerBase(object):
class TestParallelDyGraphRunnerBase(object):
def get_model(self):
raise NotImplementedError(
"get_model should be implemented by child classes.")
......@@ -517,9 +515,9 @@ class TestParallelDyGraphRunnerBase(object):
elif args.update_method != "local":
new_batch = []
# NOTE(@xiongkun03) args.diff_batch means batch length is different:
# such as : batch = [2,3,4,5], then the first rank will get [2] and
# the second rank will get [3,4,5].
# NOTE(@xiongkun03) args.diff_batch means batch length is different:
# such as : batch = [2,3,4,5], then the first rank will get [2] and
# the second rank will get [3,4,5].
# this function is for test sparse_embedding_differ_length
if hasattr(args, "diff_batch") and args.diff_batch:
assert len(
......@@ -700,17 +698,18 @@ class TestParallelDyGraphRunnerBase(object):
def runtime_main(test_class):
parser = argparse.ArgumentParser(description='Run dist test.')
parser.add_argument(
'--role', type=str, required=True, choices=['pserver', 'trainer'])
parser.add_argument('--role',
type=str,
required=True,
choices=['pserver', 'trainer'])
parser.add_argument('--endpoints', type=str, required=False, default="")
parser.add_argument(
'--update_method',
type=str,
default="local",
choices=[
"pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo",
"hccl"
])
parser.add_argument('--update_method',
type=str,
default="local",
choices=[
"pserver", "nccl2", "bkcl", "local",
"nccl2_reduce_layer", "gloo", "hccl"
])
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
......@@ -722,10 +721,14 @@ def runtime_main(test_class):
parser.add_argument('--use_local_sgd', action='store_true')
parser.add_argument('--diff_batch', action='store_true')
parser.add_argument('--ut4grad_allreduce', action='store_true')
parser.add_argument(
'--hallreduce_inter_nranks', type=int, required=False, default=2)
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--hallreduce_inter_nranks',
type=int,
required=False,
default=2)
parser.add_argument('--current_endpoint',
type=str,
required=False,
default="")
parser.add_argument('--sync_mode', action='store_true')
parser.add_argument('--use_cuda', action='store_true')
parser.add_argument('--use_cpu', action='store_true')
......@@ -738,23 +741,24 @@ def runtime_main(test_class):
parser.add_argument('--dc_asgd', action='store_true')
parser.add_argument('--hogwild', action='store_true')
parser.add_argument('--save_model', action='store_true')
parser.add_argument(
'--use_reader_alloc', action='store_true', required=False)
parser.add_argument('--use_reader_alloc',
action='store_true',
required=False)
parser.add_argument('--batch_size', required=False, type=int, default=2)
parser.add_argument('--lr', required=False, type=float, default=0.001)
parser.add_argument(
'--batch_merge_repeat', required=False, type=int, default=1)
parser.add_argument(
'--nccl2_reduce_layer_local_run',
required=False,
type=bool,
default=False)
parser.add_argument('--batch_merge_repeat',
required=False,
type=int,
default=1)
parser.add_argument('--nccl2_reduce_layer_local_run',
required=False,
type=bool,
default=False)
parser.add_argument('--sync_batch_norm', action='store_true')
parser.add_argument(
'--fuse_all_reduce',
required=False,
type=ast.literal_eval,
default=None)
parser.add_argument('--fuse_all_reduce',
required=False,
type=ast.literal_eval,
default=None)
args = parser.parse_args()
......@@ -780,6 +784,7 @@ from contextlib import closing
class TestDistBase(unittest.TestCase):
def _setup_config(self):
raise NotImplementedError("tests should have _setup_config implemented")
......@@ -873,6 +878,7 @@ class TestDistBase(unittest.TestCase):
self.temp_dir.cleanup()
def _find_free_port(self):
def __free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
......@@ -920,17 +926,15 @@ class TestDistBase(unittest.TestCase):
ps1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start pserver process 0")
ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps0_pipe,
env=required_envs)
ps0_proc = subprocess.Popen(ps0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps0_pipe,
env=required_envs)
print_to_err(type(self).__name__, "going to start pserver process 1")
ps1_proc = subprocess.Popen(
ps1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps1_pipe,
env=required_envs)
ps1_proc = subprocess.Popen(ps1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps1_pipe,
env=required_envs)
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
......@@ -999,17 +1003,15 @@ class TestDistBase(unittest.TestCase):
if check_error_log:
path = os.path.join(self.temp_dir.name, log_name + "_local.log")
err_log = open(path, "wb")
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=err_log,
env=env_local)
local_proc = subprocess.Popen(cmd.split(" "),
stdout=subprocess.PIPE,
stderr=err_log,
env=env_local)
else:
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_local)
local_proc = subprocess.Popen(cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_local)
local_out, local_err = local_proc.communicate()
......@@ -1038,8 +1040,10 @@ class TestDistBase(unittest.TestCase):
def _run_cluster(self, model, envs, check_error_log, log_name):
# Run dist train to compare with local results
ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(
model, check_error_log, envs, log_name=log_name)
ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
check_error_log,
envs,
log_name=log_name)
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
......@@ -1091,17 +1095,15 @@ class TestDistBase(unittest.TestCase):
tr1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start trainer process 0")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr0_proc = subprocess.Popen(tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
print_to_err(type(self).__name__, "going to start trainer process 1")
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
tr1_proc = subprocess.Popen(tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
# Wait until trainer process terminate
while True:
......@@ -1296,8 +1298,10 @@ class TestDistBase(unittest.TestCase):
procs = []
pipes = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_gloo_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num)
tr_cmd, tr_env = self._get_gloo_trainer_cmd(model,
worker_endpoints[i],
update_method, i,
trainer_num)
tr_env.update(envs)
tr_env["GLOG_vmodule"] = 'gloo_context=4'
tr_env["GLOG_v"] = '3'
......@@ -1311,11 +1315,10 @@ class TestDistBase(unittest.TestCase):
print_to_err(
type(self).__name__,
"going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
tr_proc = subprocess.Popen(tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
procs.append(tr_proc)
pipes.append(tr_pipe)
......@@ -1375,11 +1378,10 @@ class TestDistBase(unittest.TestCase):
print_to_err(
type(self).__name__,
"going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
tr_proc = subprocess.Popen(tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
procs.append(tr_proc)
pipes.append(tr_pipe)
......@@ -1422,11 +1424,10 @@ class TestDistBase(unittest.TestCase):
print_to_err(
type(self).__name__,
"going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
tr_proc = subprocess.Popen(tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
procs.append(tr_proc)
pipes.append(tr_pipe)
......@@ -1467,6 +1468,10 @@ class TestDistBase(unittest.TestCase):
"grpc_server=10,request_handler_impl=10,section_worker=10"
required_envs["GLOG_logtostderr"] = "1"
if os.getenv('NVIDIA_TF32_OVERRIDE', '') is not None:
required_envs['NVIDIA_TF32_OVERRIDE'] = os.getenv(
'NVIDIA_TF32_OVERRIDE', '')
required_envs.update(need_envs)
return required_envs
......@@ -1478,25 +1483,22 @@ class TestDistBase(unittest.TestCase):
log_name=""):
if self._dygraph and (self._gloo_mode or self._nccl2_mode):
with _test_eager_guard():
self.check_with_place_func(
model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
self.check_with_place_func(
model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
self.check_with_place_func(model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
self.check_with_place_func(model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
else:
self.check_with_place_func(
model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
self.check_with_place_func(model_file=model_file,
delta=delta,
check_error_log=check_error_log,
need_envs=need_envs,
log_name=log_name)
def check_with_place_func(self,
model_file,
......@@ -1554,11 +1556,15 @@ class TestDistBase(unittest.TestCase):
log_name=log_name)
elif self._pipeline_mode:
tr0_losses, tr1_losses = self._run_pipeline(
model_file, required_envs, check_error_log, log_name=log_name)
tr0_losses, tr1_losses = self._run_pipeline(model_file,
required_envs,
check_error_log,
log_name=log_name)
else:
tr0_losses, tr1_losses = self._run_cluster(
model_file, required_envs, check_error_log, log_name=log_name)
tr0_losses, tr1_losses = self._run_cluster(model_file,
required_envs,
check_error_log,
log_name=log_name)
for step_id in range(RUN_STEP):
local_loss = local_losses[step_id]
......@@ -1584,20 +1590,19 @@ class TestDistBase(unittest.TestCase):
required_envs = self._get_required_envs(check_error_log, need_envs)
if self._use_dgc:
multi_cards_losses = self._run_local(
model_file,
required_envs,
check_error_log,
log_name=log_name + "_dgc_2cards",
devices="0,1")
multi_cards_losses = self._run_local(model_file,
required_envs,
check_error_log,
log_name=log_name +
"_dgc_2cards",
devices="0,1")
self._use_dgc = False
base_losses = self._run_local(
model_file,
required_envs,
check_error_log,
log_name=log_name + "_base_2cards",
devices="0,1")
base_losses = self._run_local(model_file,
required_envs,
check_error_log,
log_name=log_name + "_base_2cards",
devices="0,1")
self._use_dgc = True
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册