未验证 提交 0941e3e0 编写于 作者: G guru4elephant 提交者: GitHub

add class name and timeline for test_dist_base.py (#18122)

* add class name and timeline for test_dist_base.py
上级 90897741
......@@ -24,7 +24,7 @@ import six
import argparse
import pickle
import numpy as np
import time
import paddle.fluid as fluid
from paddle.fluid import compiler
import paddle.fluid.dygraph as dygraph
......@@ -35,11 +35,13 @@ RUN_STEP = 5
DEFAULT_BATCH_SIZE = 2
def my_print(log_str):
def my_print(class_name, log_str):
localtime = time.asctime(time.localtime(time.time()))
print_str = localtime + "\t" + class_name + "\t" + log_str
if six.PY2:
sys.stderr.write(pickle.dumps(log_str))
sys.stderr.write(pickle.dumps(print_str))
else:
sys.stderr.buffer.write(pickle.dumps(log_str))
sys.stderr.buffer.write(pickle.dumps(print_str))
class TestDistRunnerBase(object):
......@@ -90,9 +92,9 @@ class TestDistRunnerBase(object):
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
my_print("run pserver startup program done.")
my_print(type(self).__name__, "run pserver startup program done.")
exe.run(pserver_prog)
my_print("run pserver main program done.")
my_print(type(self).__name__, "run pserver main program done.")
def run_trainer(self, args):
self.lr = args.lr
......@@ -107,23 +109,29 @@ class TestDistRunnerBase(object):
self.get_model(batch_size=args.batch_size)
if args.mem_opt:
my_print("begin to run memory optimize")
my_print(type(self).__name__, "begin to run memory optimize")
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
my_print("trainer run memory optimize done.")
my_print(type(self).__name__, "trainer run memory optimize done.")
if args.update_method == "pserver":
my_print("begin to run transpile on trainer with pserver mode")
my_print(
type(self).__name__,
"begin to run transpile on trainer with pserver mode")
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(),
args.endpoints, args.trainers,
args.sync_mode, args.dc_asgd)
trainer_prog = t.get_trainer_program()
my_print("get trainer program done with pserver mode.")
my_print(
type(self).__name__,
"get trainer program done with pserver mode.")
elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
# transpile for nccl2
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
config.nccl_comm_num = args.nccl_comm_num
my_print("begin to run transpile on trainer with nccl2 mode")
my_print(
type(self).__name__,
"begin to run transpile on trainer with nccl2 mode")
nccl2_t = fluid.DistributeTranspiler(config=config)
nccl2_t.transpile(
args.trainer_id,
......@@ -131,7 +139,9 @@ class TestDistRunnerBase(object):
startup_program=fluid.default_startup_program(),
trainers=args.endpoints,
current_endpoint=args.current_endpoint)
my_print("get trainer program done. with nccl2 mode")
my_print(
type(self).__name__,
"get trainer program done. with nccl2 mode")
trainer_prog = fluid.default_main_program()
else:
trainer_prog = fluid.default_main_program()
......@@ -144,7 +154,7 @@ class TestDistRunnerBase(object):
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
my_print("run worker startup program done.")
my_print(type(self).__name__, "run worker startup program done.")
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 1
......@@ -177,12 +187,12 @@ class TestDistRunnerBase(object):
build_stra.num_trainers = 1
build_stra.trainer_id = 0
my_print("begin to compile with data parallel")
my_print(type(self).__name__, "begin to compile with data parallel")
binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
loss_name=avg_cost.name,
build_strategy=build_stra,
exec_strategy=exec_strategy)
my_print("program compiled with data parallel")
my_print(type(self).__name__, "program compiled with data parallel")
if args.use_cuda and args.update_method == "nccl2":
# it just for test share_vars_from feature.
......@@ -212,7 +222,7 @@ class TestDistRunnerBase(object):
else:
return origin_batch
my_print("begin to train on trainer")
my_print(type(self).__name__, "begin to train on trainer")
out_losses = []
for _ in six.moves.xrange(RUN_STEP):
loss, = exe.run(binary,
......@@ -265,19 +275,23 @@ class TestParallelDyGraphRunnerBase(object):
strategy.local_rank = args.trainer_id
strategy.trainer_endpoints = args.endpoints.split(",")
strategy.current_endpoint = args.current_endpoint
my_print("begin to prepare context in dygraph with nccl2")
my_print(
type(self).__name__,
"begin to prepare context in dygraph with nccl2")
dygraph.parallel.prepare_context(strategy)
model = dygraph.parallel.DataParallel(model, strategy)
my_print("model built in dygraph")
my_print(type(self).__name__, "model built in dygraph")
out_losses = []
my_print("begin to run dygraph training")
my_print(type(self).__name__, "begin to run dygraph training")
for step_id, data in enumerate(train_reader()):
data = _get_data(data)
if step_id == RUN_STEP:
break
loss = self.run_one_loop(model, opt, data)
if step_id % 10 == 0:
my_print("loss at step %d: %f" % (step_id, loss))
my_print(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss))
out_losses.append(loss.numpy())
# FIXME(Yancey1989): scale the loss inplace
......@@ -290,7 +304,7 @@ class TestParallelDyGraphRunnerBase(object):
opt.minimize(loss)
model.clear_gradients()
my_print(pickle.dumps(out_losses))
my_print(type(self).__name__, pickle.dumps(out_losses))
def runtime_main(test_class):
......@@ -395,7 +409,8 @@ class TestDistBase(unittest.TestCase):
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
my_print("socket name: %s" % s.getsockname()[1])
my_print(
type(self).__name__, "socket name: %s" % s.getsockname()[1])
return s.getsockname()[1]
while True:
......@@ -426,13 +441,13 @@ class TestDistBase(unittest.TestCase):
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
my_print("going to start pserver process 0")
my_print(type(self).__name__, "going to start pserver process 0")
ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps0_pipe,
env=required_envs)
my_print("going to start pserver process 1")
my_print(type(self).__name__, "going to start pserver process 1")
ps1_proc = subprocess.Popen(
ps1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
......@@ -538,13 +553,13 @@ class TestDistBase(unittest.TestCase):
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
my_print("going to start trainer process 0")
my_print(type(self).__name__, "going to start trainer process 0")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
my_print("going to start trainer process 1")
my_print(type(self).__name__, "going to start trainer process 1")
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
......@@ -662,13 +677,13 @@ class TestDistBase(unittest.TestCase):
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
my_print("going to start process 0 with nccl2")
my_print(type(self).__name__, "going to start process 0 with nccl2")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
my_print("going to start process 1 with nccl2")
my_print(type(self).__name__, "going to start process 1 with nccl2")
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册