diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 257bb12dc8d8d8a21dad09b48932a9579e44219c..6daf9f8994d6f25989599587fe093d4b75452473 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -24,7 +24,7 @@ import six import argparse import pickle import numpy as np - +import time import paddle.fluid as fluid from paddle.fluid import compiler import paddle.fluid.dygraph as dygraph @@ -35,11 +35,13 @@ RUN_STEP = 5 DEFAULT_BATCH_SIZE = 2 -def my_print(log_str): +def my_print(class_name, log_str): + localtime = time.asctime(time.localtime(time.time())) + print_str = localtime + "\t" + class_name + "\t" + log_str if six.PY2: - sys.stderr.write(pickle.dumps(log_str)) + sys.stderr.write(pickle.dumps(print_str)) else: - sys.stderr.buffer.write(pickle.dumps(log_str)) + sys.stderr.buffer.write(pickle.dumps(print_str)) class TestDistRunnerBase(object): @@ -90,9 +92,9 @@ class TestDistRunnerBase(object): place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) - my_print("run pserver startup program done.") + my_print(type(self).__name__, "run pserver startup program done.") exe.run(pserver_prog) - my_print("run pserver main program done.") + my_print(type(self).__name__, "run pserver main program done.") def run_trainer(self, args): self.lr = args.lr @@ -107,23 +109,29 @@ class TestDistRunnerBase(object): self.get_model(batch_size=args.batch_size) if args.mem_opt: - my_print("begin to run memory optimize") + my_print(type(self).__name__, "begin to run memory optimize") fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) - my_print("trainer run memory optimize done.") + my_print(type(self).__name__, "trainer run memory optimize done.") if args.update_method == "pserver": - my_print("begin to run transpile on trainer with pserver mode") + my_print( + type(self).__name__, + "begin to run transpile on trainer with pserver mode") t = self.get_transpiler(args.trainer_id, fluid.default_main_program(), args.endpoints, args.trainers, args.sync_mode, args.dc_asgd) trainer_prog = t.get_trainer_program() - my_print("get trainer program done with pserver mode.") + my_print( + type(self).__name__, + "get trainer program done with pserver mode.") elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer": # transpile for nccl2 config = fluid.DistributeTranspilerConfig() config.mode = "nccl2" config.nccl_comm_num = args.nccl_comm_num - my_print("begin to run transpile on trainer with nccl2 mode") + my_print( + type(self).__name__, + "begin to run transpile on trainer with nccl2 mode") nccl2_t = fluid.DistributeTranspiler(config=config) nccl2_t.transpile( args.trainer_id, @@ -131,7 +139,9 @@ class TestDistRunnerBase(object): startup_program=fluid.default_startup_program(), trainers=args.endpoints, current_endpoint=args.current_endpoint) - my_print("get trainer program done. with nccl2 mode") + my_print( + type(self).__name__, + "get trainer program done. with nccl2 mode") trainer_prog = fluid.default_main_program() else: trainer_prog = fluid.default_main_program() @@ -144,7 +154,7 @@ class TestDistRunnerBase(object): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) - my_print("run worker startup program done.") + my_print(type(self).__name__, "run worker startup program done.") exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_threads = 1 @@ -177,12 +187,12 @@ class TestDistRunnerBase(object): build_stra.num_trainers = 1 build_stra.trainer_id = 0 - my_print("begin to compile with data parallel") + my_print(type(self).__name__, "begin to compile with data parallel") binary = compiler.CompiledProgram(trainer_prog).with_data_parallel( loss_name=avg_cost.name, build_strategy=build_stra, exec_strategy=exec_strategy) - my_print("program compiled with data parallel") + my_print(type(self).__name__, "program compiled with data parallel") if args.use_cuda and args.update_method == "nccl2": # it just for test share_vars_from feature. @@ -212,7 +222,7 @@ class TestDistRunnerBase(object): else: return origin_batch - my_print("begin to train on trainer") + my_print(type(self).__name__, "begin to train on trainer") out_losses = [] for _ in six.moves.xrange(RUN_STEP): loss, = exe.run(binary, @@ -265,19 +275,23 @@ class TestParallelDyGraphRunnerBase(object): strategy.local_rank = args.trainer_id strategy.trainer_endpoints = args.endpoints.split(",") strategy.current_endpoint = args.current_endpoint - my_print("begin to prepare context in dygraph with nccl2") + my_print( + type(self).__name__, + "begin to prepare context in dygraph with nccl2") dygraph.parallel.prepare_context(strategy) model = dygraph.parallel.DataParallel(model, strategy) - my_print("model built in dygraph") + my_print(type(self).__name__, "model built in dygraph") out_losses = [] - my_print("begin to run dygraph training") + my_print(type(self).__name__, "begin to run dygraph training") for step_id, data in enumerate(train_reader()): data = _get_data(data) if step_id == RUN_STEP: break loss = self.run_one_loop(model, opt, data) if step_id % 10 == 0: - my_print("loss at step %d: %f" % (step_id, loss)) + my_print( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss)) out_losses.append(loss.numpy()) # FIXME(Yancey1989): scale the loss inplace @@ -290,7 +304,7 @@ class TestParallelDyGraphRunnerBase(object): opt.minimize(loss) model.clear_gradients() - my_print(pickle.dumps(out_losses)) + my_print(type(self).__name__, pickle.dumps(out_losses)) def runtime_main(test_class): @@ -395,7 +409,8 @@ class TestDistBase(unittest.TestCase): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) - my_print("socket name: %s" % s.getsockname()[1]) + my_print( + type(self).__name__, "socket name: %s" % s.getsockname()[1]) return s.getsockname()[1] while True: @@ -426,13 +441,13 @@ class TestDistBase(unittest.TestCase): ps0_pipe = open("/tmp/ps0_err.log", "wb") ps1_pipe = open("/tmp/ps1_err.log", "wb") - my_print("going to start pserver process 0") + my_print(type(self).__name__, "going to start pserver process 0") ps0_proc = subprocess.Popen( ps0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe, env=required_envs) - my_print("going to start pserver process 1") + my_print(type(self).__name__, "going to start pserver process 1") ps1_proc = subprocess.Popen( ps1_cmd.strip().split(" "), stdout=subprocess.PIPE, @@ -538,13 +553,13 @@ class TestDistBase(unittest.TestCase): tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") - my_print("going to start trainer process 0") + my_print(type(self).__name__, "going to start trainer process 0") tr0_proc = subprocess.Popen( tr0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=tr0_pipe, env=env0) - my_print("going to start trainer process 1") + my_print(type(self).__name__, "going to start trainer process 1") tr1_proc = subprocess.Popen( tr1_cmd.strip().split(" "), stdout=subprocess.PIPE, @@ -662,13 +677,13 @@ class TestDistBase(unittest.TestCase): tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") - my_print("going to start process 0 with nccl2") + my_print(type(self).__name__, "going to start process 0 with nccl2") tr0_proc = subprocess.Popen( tr0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=tr0_pipe, env=env0) - my_print("going to start process 1 with nccl2") + my_print(type(self).__name__, "going to start process 1 with nccl2") tr1_proc = subprocess.Popen( tr1_cmd.strip().split(" "), stdout=subprocess.PIPE,