未验证 提交 a615ad46 编写于 作者: W Wu Yi 提交者: GitHub

Add test for dist and memopt (#13049)

* add test for dist and memopt

* update transformer too
上级 515a756a
......@@ -1667,16 +1667,6 @@ def get_model(is_dist, is_async):
return sum_cost, avg_cost, predict, token_num, local_lr_scheduler
def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers):
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id=trainer_id,
program=main_program,
pservers=pserver_endpoints,
trainers=trainers)
return t
def update_args():
src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath)
trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath)
......@@ -1691,69 +1681,46 @@ def update_args():
class DistTransformer2x2(TestDistRunnerBase):
def run_pserver(self, pserver_endpoints, trainers, current_endpoint,
trainer_id, sync_mode):
get_model(True, not sync_mode)
t = get_transpiler(trainer_id,
fluid.default_main_program(), pserver_endpoints,
trainers)
pserver_prog = t.get_pserver_program(current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
def run_pserver(self, args):
get_model(True, not args.sync_mode)
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), args.endpoints,
args.trainers, args.sync_mode)
pserver_prog = t.get_pserver_program(args.current_endpoint)
startup_prog = t.get_startup_program(args.current_endpoint,
pserver_prog)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
exe.run(pserver_prog)
def _wait_ps_ready(self, pid):
retry_times = 20
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(3)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
retry_times -= 1
def run_trainer(self,
place,
endpoints,
trainer_id,
trainers,
is_dist=True,
sync_mode=True):
def run_trainer(self, place, args):
sum_cost, avg_cost, predict, token_num, local_lr_scheduler = get_model(
is_dist, not sync_mode)
args.is_dist, not args.sync_mode)
if is_dist:
t = get_transpiler(trainer_id,
fluid.default_main_program(), endpoints,
trainers)
if args.is_dist:
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(),
args.endpoints, args.trainers,
args.sync_mode)
trainer_prog = t.get_trainer_program()
TrainTaskConfig.batch_size = 10
TrainTaskConfig.train_file_pattern = TrainTaskConfig.data_path + "train.tok.clean.bpe.32000.en-de.train_{}".format(
trainer_id)
args.trainer_id)
else:
TrainTaskConfig.batch_size = 20
trainer_prog = fluid.default_main_program()
startup_exe = fluid.Executor(place)
TrainTaskConfig.local = not is_dist
TrainTaskConfig.local = not args.is_dist
train_loop(startup_exe, trainer_prog, 1, sum_cost, avg_cost,
local_lr_scheduler, token_num, predict)
if __name__ == "__main__":
if len(sys.argv) != 8:
print(
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
)
update_args()
runtime_main(DistTransformer2x2)
......@@ -21,7 +21,7 @@ import sys
import six
import signal
import subprocess
import six
import argparse
class TestDistRunnerBase(object):
......@@ -43,40 +43,35 @@ class TestDistRunnerBase(object):
sync_mode=sync_mode)
return t
def run_pserver(self,
pserver_endpoints,
trainers,
current_endpoint,
trainer_id,
sync_mode=True):
def run_pserver(self, args):
import paddle
import paddle.fluid as fluid
self.get_model(batch_size=2)
t = self.get_transpiler(trainer_id,
fluid.default_main_program(), pserver_endpoints,
trainers, sync_mode)
pserver_prog = t.get_pserver_program(current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program())
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), args.endpoints,
args.trainers, args.sync_mode)
pserver_prog = t.get_pserver_program(args.current_endpoint)
startup_prog = t.get_startup_program(args.current_endpoint,
pserver_prog)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
exe.run(pserver_prog)
def run_trainer(self,
place,
endpoints,
trainer_id,
trainers,
is_dist=True,
sync_mode=True):
def run_trainer(self, place, args):
import paddle
import paddle.fluid as fluid
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=2)
if is_dist:
t = self.get_transpiler(trainer_id,
fluid.default_main_program(), endpoints,
trainers, sync_mode)
self.get_model(batch_size=2)
if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program())
if args.is_dist:
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(),
args.endpoints, args.trainers,
args.sync_mode)
trainer_prog = t.get_trainer_program()
else:
trainer_prog = fluid.default_main_program()
......@@ -117,27 +112,27 @@ def runtime_main(test_class):
import paddle.fluid as fluid
import paddle.fluid.core as core
if len(sys.argv) != 8:
print(
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
)
role = sys.argv[1]
endpoints = sys.argv[2]
trainer_id = int(sys.argv[3])
current_endpoint = sys.argv[4]
trainers = int(sys.argv[5])
is_dist = True if sys.argv[6] == "TRUE" else False
sync_mode = True if sys.argv[7] == "TRUE" else False
parser = argparse.ArgumentParser(description='Run dist test.')
parser.add_argument(
'--role', type=str, required=True, choices=['pserver', 'trainer'])
parser.add_argument('--endpoints', type=str, required=False, default="")
parser.add_argument('--is_dist', action='store_true')
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--sync_mode', action='store_true')
parser.add_argument('--mem_opt', action='store_true')
args = parser.parse_args()
model = test_class()
if role == "pserver":
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id,
sync_mode)
if args.role == "pserver" and args.is_dist:
model.run_pserver(args)
else:
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
) else fluid.CPUPlace()
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist,
sync_mode)
model.run_trainer(p, args)
import paddle.compat as cpt
......@@ -153,30 +148,34 @@ class TestDistBase(unittest.TestCase):
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
self._python_interp = "python"
self._sync_mode = True
self._mem_opt = False
self._setup_config()
def start_pserver(self, model_file, check_error_log):
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist %s %s"
sync_mode_str = "--sync_mode" if self._sync_mode else ""
mem_opt_str = "--mem_opt" if self._mem_opt else ""
ps0_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers, sync_mode_str)
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
self._trainers, sync_mode_str, mem_opt_str)
ps1_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers, sync_mode_str)
self._trainers, sync_mode_str, mem_opt_str)
ps0_pipe = subprocess.PIPE
ps1_pipe = subprocess.PIPE
if check_error_log:
print("ps0_cmd:", ps0_cmd)
print("ps1_cmd:", ps1_cmd)
print(ps0_cmd)
print(ps1_cmd)
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
ps0_proc = subprocess.Popen(
ps0_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe)
ps0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe)
ps1_proc = subprocess.Popen(
ps1_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe)
ps1_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe)
if not check_error_log:
return ps0_proc, ps1_proc, None, None
......@@ -199,7 +198,7 @@ class TestDistBase(unittest.TestCase):
retry_times -= 1
def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
# *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN
# TODO(typhoonzero): should auto adapt GPU count on the machine.
required_envs = {
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH"),
......@@ -215,10 +214,7 @@ class TestDistBase(unittest.TestCase):
# Run local to get a base line
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
env_local.update(required_envs)
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \
(self._python_interp, model_file,
"127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str)
local_cmd = "%s %s --role trainer" % (self._python_interp, model_file)
if not check_error_log:
local_proc = subprocess.Popen(
local_cmd.split(" "),
......@@ -226,7 +222,6 @@ class TestDistBase(unittest.TestCase):
stderr=subprocess.PIPE,
env=env_local)
else:
print("trainer cmd:", local_cmd)
err_log = open("/tmp/trainer.err.log", "wb")
local_proc = subprocess.Popen(
local_cmd.split(" "),
......@@ -247,12 +242,17 @@ class TestDistBase(unittest.TestCase):
self._wait_ps_ready(ps1.pid)
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers, sync_mode_str)
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers, sync_mode_str)
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist %s %s"
sync_mode_str = "--sync_mode" if self._sync_mode else ""
mem_opt_str = "--mem_opt" if self._mem_opt else ""
tr0_cmd = tr_cmd % \
(self._python_interp, model_file, self._ps_endpoints,
0, ps0_ep,
self._trainers, sync_mode_str, mem_opt_str)
tr1_cmd = tr_cmd % \
(self._python_interp, model_file, self._ps_endpoints,
1, ps1_ep,
self._trainers, sync_mode_str, mem_opt_str)
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
......@@ -269,12 +269,12 @@ class TestDistBase(unittest.TestCase):
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen(
tr0_cmd.split(" "),
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr1_proc = subprocess.Popen(
tr1_cmd.split(" "),
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
......@@ -303,6 +303,8 @@ class TestDistBase(unittest.TestCase):
# FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL)
ps0.wait()
ps1.wait()
FNULL.close()
self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta)
......
......@@ -25,6 +25,15 @@ class TestDistMnist2x2(TestDistBase):
self.check_with_place("dist_mnist.py", delta=1e-7)
class TestDistMnist2x2WithMemopt(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._mem_opt = True
def test_se_resnext(self):
self.check_with_place("dist_mnist.py", delta=1e-7)
class TestDistMnistAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册