未验证 提交 aebc175c 编写于 作者: W Wu Yi 提交者: GitHub

add nccl2 dist tests (#14755)

* add nccl2 dist tests test=develop

* fix dist_base test=develop

* fix tests test=develop

* fix test on mac test=develop
上级 95886483
...@@ -102,7 +102,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2): ...@@ -102,7 +102,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2):
if args.mem_opt: if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
if args.is_dist: if args.update_method == "pserver":
t = self.get_transpiler(args.trainer_id, t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), fluid.default_main_program(),
args.endpoints, args.trainers, args.endpoints, args.trainers,
...@@ -147,7 +147,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2): ...@@ -147,7 +147,7 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2):
def get_data(): def get_data():
origin_batch = next(reader_generator) origin_batch = next(reader_generator)
if args.is_dist and args.use_reader_alloc: if args.update_method == "pserver" and args.use_reader_alloc:
new_batch = [] new_batch = []
for offset, item in enumerate(origin_batch): for offset, item in enumerate(origin_batch):
if offset % 2 == args.trainer_id: if offset % 2 == args.trainer_id:
......
...@@ -76,12 +76,24 @@ class TestDistRunnerBase(object): ...@@ -76,12 +76,24 @@ class TestDistRunnerBase(object):
if args.mem_opt: if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
if args.is_dist: if args.update_method == "pserver":
t = self.get_transpiler(args.trainer_id, t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), fluid.default_main_program(),
args.endpoints, args.trainers, args.endpoints, args.trainers,
args.sync_mode, args.dc_asgd) args.sync_mode, args.dc_asgd)
trainer_prog = t.get_trainer_program() trainer_prog = t.get_trainer_program()
elif args.update_method == "nccl2":
# transpile for nccl2
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
nccl2_t = fluid.DistributeTranspiler(config=config)
nccl2_t.transpile(
args.trainer_id,
program=fluid.default_main_program(),
startup_program=fluid.default_startup_program(),
trainers=args.endpoints,
current_endpoint=args.current_endpoint)
trainer_prog = fluid.default_main_program()
else: else:
trainer_prog = fluid.default_main_program() trainer_prog = fluid.default_main_program()
...@@ -110,11 +122,20 @@ class TestDistRunnerBase(object): ...@@ -110,11 +122,20 @@ class TestDistRunnerBase(object):
len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass") len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass")
mypass.set_int("num_repeats", args.batch_merge_repeat) mypass.set_int("num_repeats", args.batch_merge_repeat)
if args.update_method == "nccl2":
num_trainers = len(args.endpoints.split(","))
trainer_id = args.trainer_id
else:
num_trainers = 1
trainer_id = 0
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(
args.use_cuda, args.use_cuda,
loss_name=avg_cost.name, loss_name=avg_cost.name,
exec_strategy=strategy, exec_strategy=strategy,
build_strategy=build_stra) build_strategy=build_stra,
num_trainers=num_trainers,
trainer_id=trainer_id)
feed_var_list = [ feed_var_list = [
var for var in trainer_prog.global_block().vars.values() var for var in trainer_prog.global_block().vars.values()
...@@ -126,7 +147,7 @@ class TestDistRunnerBase(object): ...@@ -126,7 +147,7 @@ class TestDistRunnerBase(object):
def get_data(): def get_data():
origin_batch = next(reader_generator) origin_batch = next(reader_generator)
if args.is_dist and args.use_reader_alloc: if args.update_method != "local" and args.use_reader_alloc:
new_batch = [] new_batch = []
for offset, item in enumerate(origin_batch): for offset, item in enumerate(origin_batch):
if offset % 2 == args.trainer_id: if offset % 2 == args.trainer_id:
...@@ -151,7 +172,11 @@ def runtime_main(test_class): ...@@ -151,7 +172,11 @@ def runtime_main(test_class):
parser.add_argument( parser.add_argument(
'--role', type=str, required=True, choices=['pserver', 'trainer']) '--role', type=str, required=True, choices=['pserver', 'trainer'])
parser.add_argument('--endpoints', type=str, required=False, default="") parser.add_argument('--endpoints', type=str, required=False, default="")
parser.add_argument('--is_dist', action='store_true') parser.add_argument(
'--update_method',
type=str,
default="local",
choices=["pserver", "nccl2", "local"])
parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument( parser.add_argument(
...@@ -170,7 +195,7 @@ def runtime_main(test_class): ...@@ -170,7 +195,7 @@ def runtime_main(test_class):
args = parser.parse_args() args = parser.parse_args()
model = test_class() model = test_class()
if args.role == "pserver" and args.is_dist: if args.role == "pserver" and args.update_method == "pserver":
model.run_pserver(args) model.run_pserver(args)
else: else:
model.run_trainer(args) model.run_trainer(args)
...@@ -208,6 +233,7 @@ class TestDistBase(unittest.TestCase): ...@@ -208,6 +233,7 @@ class TestDistBase(unittest.TestCase):
self._use_reduce = False self._use_reduce = False
self._dc_asgd = False # must use with async mode self._dc_asgd = False # must use with async mode
self._use_reader_alloc = True self._use_reader_alloc = True
self._nccl2_mode = False
self._setup_config() self._setup_config()
self._after_setup_config() self._after_setup_config()
...@@ -218,7 +244,7 @@ class TestDistBase(unittest.TestCase): ...@@ -218,7 +244,7 @@ class TestDistBase(unittest.TestCase):
def start_pserver(self, model_file, check_error_log, required_envs): def start_pserver(self, model_file, check_error_log, required_envs):
ps0_ep, ps1_ep = self._ps_endpoints.split(",") ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist" ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --update_method pserver"
ps0_cmd = ps_cmd % \ ps0_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep, (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers) self._trainers)
...@@ -270,7 +296,8 @@ class TestDistBase(unittest.TestCase): ...@@ -270,7 +296,8 @@ class TestDistBase(unittest.TestCase):
else: else:
env_local = {'CPU_NUM': '1'} env_local = {'CPU_NUM': '1'}
envs.update(env_local) env_local.update(envs)
print("local_cmd: {}, env: {}".format(cmd, env_local))
if check_error_log: if check_error_log:
err_log = open("/tmp/trainer.err.log", "wb") err_log = open("/tmp/trainer.err.log", "wb")
...@@ -278,13 +305,13 @@ class TestDistBase(unittest.TestCase): ...@@ -278,13 +305,13 @@ class TestDistBase(unittest.TestCase):
cmd.split(" "), cmd.split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=err_log, stderr=err_log,
env=envs) env=env_local)
else: else:
local_proc = subprocess.Popen( local_proc = subprocess.Popen(
cmd.split(" "), cmd.split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
env=envs) env=env_local)
local_out, local_err = local_proc.communicate() local_out, local_err = local_proc.communicate()
...@@ -303,7 +330,7 @@ class TestDistBase(unittest.TestCase): ...@@ -303,7 +330,7 @@ class TestDistBase(unittest.TestCase):
ps0_ep, ps1_ep = self._ps_endpoints.split(",") ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist" tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --update_method pserver"
tr0_cmd = tr_cmd % \ tr0_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints, (self._python_interp, model, self._ps_endpoints,
0, ps0_ep, self._trainers) 0, ps0_ep, self._trainers)
...@@ -335,8 +362,8 @@ class TestDistBase(unittest.TestCase): ...@@ -335,8 +362,8 @@ class TestDistBase(unittest.TestCase):
env0.update(envs) env0.update(envs)
env1.update(envs) env1.update(envs)
print("tr0_cmd:{}".format(tr0_cmd)) print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}".format(tr1_cmd)) print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb") tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb")
...@@ -357,12 +384,9 @@ class TestDistBase(unittest.TestCase): ...@@ -357,12 +384,9 @@ class TestDistBase(unittest.TestCase):
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
tr1_pipe.close() tr1_pipe.close()
ps0_pipe.close() ps0_pipe.close()
ps1_pipe.close() ps1_pipe.close()
# FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL)
ps0.terminate() ps0.terminate()
ps1.terminate() ps1.terminate()
...@@ -372,7 +396,71 @@ class TestDistBase(unittest.TestCase): ...@@ -372,7 +396,71 @@ class TestDistBase(unittest.TestCase):
sys.stderr.write('trainer 1 stdout: %s\n' % pickle.loads(tr1_out)) sys.stderr.write('trainer 1 stdout: %s\n' % pickle.loads(tr1_out))
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
# return tr0_losses, tr1_losses return pickle.loads(tr0_out), pickle.loads(tr1_out)
def _run_cluster_nccl2(self, model, envs, check_error_log):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
worker_endpoints = self._ps_endpoints.split(",")
w0_ep, w1_ep = worker_endpoints
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method nccl2"
tr0_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
0, w0_ep)
tr1_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
1, w1_ep)
if self._mem_opt:
tr0_cmd += " --mem_opt"
tr1_cmd += " --mem_opt"
if self._use_reduce:
tr0_cmd += " --use_reduce"
tr1_cmd += " --use_reduce"
if self._use_reader_alloc:
tr0_cmd += " --use_reader_alloc"
tr1_cmd += " --use_reader_alloc"
if self.__use_cuda:
tr0_cmd += " --use_cuda"
tr1_cmd += " --use_cuda"
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
else:
env0 = {'CPU_NUM': '1'}
env1 = {'CPU_NUM': '1'}
env0.update(envs)
env1.update(envs)
print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
# print log
sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
sys.stderr.write('trainer 0 stdout: %s\n' % tr0_out)
sys.stderr.write('trainer 1 stdout: %s\n' % tr1_out)
return pickle.loads(tr0_out), pickle.loads(tr1_out) return pickle.loads(tr0_out), pickle.loads(tr1_out)
def check_with_place(self, def check_with_place(self,
...@@ -387,20 +475,25 @@ class TestDistBase(unittest.TestCase): ...@@ -387,20 +475,25 @@ class TestDistBase(unittest.TestCase):
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_cudnn_deterministic": "1", "FLAGS_cudnn_deterministic": "1",
"http_proxy": "" "http_proxy": "",
"NCCL_P2P_DISABLE": "1"
} }
required_envs.update(need_envs) required_envs.update(need_envs)
if check_error_log: if check_error_log:
required_envs["GLOG_v"] = "7" required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1" required_envs["GLOG_logtostderr"] = "1"
local_losses\ local_losses\
= self._run_local(model_file, required_envs, = self._run_local(model_file, required_envs,
check_error_log) check_error_log)
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs, if self._nccl2_mode:
check_error_log) tr0_losses, tr1_losses = self._run_cluster_nccl2(
model_file, required_envs, check_error_log)
else:
tr0_losses, tr1_losses = self._run_cluster(
model_file, required_envs, check_error_log)
for step_id in range(RUN_STEP): for step_id in range(RUN_STEP):
local_loss = local_losses[step_id] local_loss = local_losses[step_id]
......
...@@ -26,6 +26,19 @@ class TestDistMnist2x2(TestDistBase): ...@@ -26,6 +26,19 @@ class TestDistMnist2x2(TestDistBase):
self.check_with_place("dist_mnist.py", delta=1e-5) self.check_with_place("dist_mnist.py", delta=1e-5)
class TestDistMnistNCCL2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1)
class TestDistMnist2x2Lars(TestDistBase): class TestDistMnist2x2Lars(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
......
...@@ -44,7 +44,7 @@ class TestDistSaveLoadDense2x2(TestDistBase): ...@@ -44,7 +44,7 @@ class TestDistSaveLoadDense2x2(TestDistBase):
required_envs.update(need_envs) required_envs.update(need_envs)
if check_error_log: if check_error_log:
required_envs["GLOG_v"] = "7" required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1" required_envs["GLOG_logtostderr"] = "1"
model_dir = tempfile.mkdtemp() model_dir = tempfile.mkdtemp()
......
...@@ -769,6 +769,7 @@ class TestNCCL2Transpile(TranspilerTest): ...@@ -769,6 +769,7 @@ class TestNCCL2Transpile(TranspilerTest):
config = fluid.DistributeTranspilerConfig() config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2" config.mode = "nccl2"
config.wait_port = False
t = fluid.DistributeTranspiler(config=config) t = fluid.DistributeTranspiler(config=config)
t.transpile( t.transpile(
0, 0,
......
...@@ -142,6 +142,7 @@ class DistributeTranspilerConfig(object): ...@@ -142,6 +142,7 @@ class DistributeTranspilerConfig(object):
# supported modes: pserver, nccl2 # supported modes: pserver, nccl2
mode = "pserver" mode = "pserver"
print_log = False print_log = False
wait_port = True
class DistributeTranspiler(object): class DistributeTranspiler(object):
...@@ -171,7 +172,6 @@ class DistributeTranspiler(object): ...@@ -171,7 +172,6 @@ class DistributeTranspiler(object):
trainer_id = 0 trainer_id = 0
trainers = 4 trainers = 4
role = os.getenv("PADDLE_TRAINING_ROLE") role = os.getenv("PADDLE_TRAINING_ROLE")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile( t.transpile(
trainer_id, pservers=pserver_endpoints, trainers=trainers) trainer_id, pservers=pserver_endpoints, trainers=trainers)
...@@ -214,13 +214,16 @@ class DistributeTranspiler(object): ...@@ -214,13 +214,16 @@ class DistributeTranspiler(object):
trainer_id, trainer_id,
trainers, trainers,
current_endpoint, current_endpoint,
startup_program=None): startup_program=None,
wait_port=True):
if not startup_program: if not startup_program:
startup_program = default_startup_program() startup_program = default_startup_program()
if trainer_id >= 0: if trainer_id >= 0:
worker_endpoints = trainers.split(",") worker_endpoints = trainers.split(",")
# send NCCL_ID to others or recv from trainer 0 # send NCCL_ID to others or recv from trainer 0
worker_endpoints.remove(current_endpoint) worker_endpoints.remove(current_endpoint)
if trainer_id == 0 and wait_port:
wait_server_ready(worker_endpoints)
nccl_id_var = startup_program.global_block().create_var( nccl_id_var = startup_program.global_block().create_var(
name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW)
...@@ -306,7 +309,8 @@ class DistributeTranspiler(object): ...@@ -306,7 +309,8 @@ class DistributeTranspiler(object):
trainer_id, trainer_id,
trainers, trainers,
current_endpoint, current_endpoint,
startup_program=startup_program) startup_program=startup_program,
wait_port=self.config.wait_port)
return return
self.trainer_num = trainers self.trainer_num = trainers
...@@ -652,9 +656,6 @@ class DistributeTranspiler(object): ...@@ -652,9 +656,6 @@ class DistributeTranspiler(object):
# NOTE: assume blocks of the same variable is not distributed # NOTE: assume blocks of the same variable is not distributed
# on the same pserver, only change param/grad varnames for # on the same pserver, only change param/grad varnames for
# trainers to fetch. # trainers to fetch.
sys.stderr.write("get_pserver_program() is deprecated, call \
get_pserver_programs() to get pserver main and startup \
in a single call.")
# step1 # step1
pserver_program = Program() pserver_program = Program()
pserver_program.random_seed = self.origin_program.random_seed pserver_program.random_seed = self.origin_program.random_seed
...@@ -922,18 +923,6 @@ in a single call.") ...@@ -922,18 +923,6 @@ in a single call.")
Returns: Returns:
Program: parameter server side startup program. Program: parameter server side startup program.
""" """
sys.stderr.write("get_startup_program() is deprecated, call \
get_pserver_programs() to get pserver main and startup \
in a single call.")
if pserver_program != None:
sys.stderr.write("passing pserver_program to get_startup_program() \
is deprecated, you can use new API get_pserver_programs() to \
get both pserver main program and startup program.")
if startup_program != None:
sys.stderr.write("passing startup_program to get_startup_program() \
is deprecated, use fluid.program_guard() or pass this argument \
to transpile() call.")
s_prog = Program() s_prog = Program()
orig_s_prog = self.startup_program orig_s_prog = self.startup_program
s_prog.random_seed = orig_s_prog.random_seed s_prog.random_seed = orig_s_prog.random_seed
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册