diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index c17cfc73de7b5767f842701aba62cf9b29ecd156..e5f57ac7cc4c7414567f91be19a900e088c60633 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -188,16 +188,14 @@ class DataParallel(layers.Layer): from ..layers import nn for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars: grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes] - splited_vars = nn.split( - coalesced_grad, num_or_sections=grad_var_len, dim=0) - reshaped_grad_vars = [] - for g_var, g_shape in zip(splited_vars, grad_shapes): - reshaped_grad_vars.append( - nn.reshape( - x=g_var, shape=g_shape, inplace=True)) - for origin_g_var, reshaped_g_var in zip(origin_grad_vars, - reshaped_grad_vars): - nn.assign(input=reshaped_g_var, output=origin_g_var) + self._helper.main_program.current_block().append_op( + type='split', + inputs={'X': coalesced_grad}, + outputs={'Out': origin_grad_vars}, + attrs={'sections': grad_var_len, + 'axis': 0}) + for g_var, g_shape in zip(origin_grad_vars, grad_shapes): + nn.reshape(x=g_var, shape=g_shape, inplace=True) def apply_collective_grads(self): """ diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 8b5890c01f50f799299d5b3039f55c0e8112de73..fae4f5213f6d8ffa93e22d7df4bc43e630ca60b0 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -38,7 +38,14 @@ RUN_STEP = 5 DEFAULT_BATCH_SIZE = 2 -def my_print(class_name, log_str): +def print_to_out(out_losses): + if six.PY2: + print(pickle.dumps(out_losses)) + else: + sys.stdout.buffer.write(pickle.dumps(out_losses)) + + +def print_to_err(class_name, log_str): localtime = time.asctime(time.localtime(time.time())) print_str = localtime + "\t" + class_name + "\t" + log_str if six.PY2: @@ -99,9 +106,9 @@ class TestDistRunnerBase(object): place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) - my_print(type(self).__name__, "run pserver startup program done.") + print_to_err(type(self).__name__, "run pserver startup program done.") exe.run(pserver_prog) - my_print(type(self).__name__, "run pserver main program done.") + print_to_err(type(self).__name__, "run pserver main program done.") def run_gpu_fleet_api_trainer(self, args): assert args.update_method == "nccl2" @@ -118,7 +125,7 @@ class TestDistRunnerBase(object): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - my_print("gpu_fleet", "fleet.node_num:") + print_to_err("gpu_fleet", "fleet.node_num:") #"fleet.node_id:", fleet.node_id(), #"fleet.trainer_num:", fleet.worker_num()) @@ -154,15 +161,15 @@ class TestDistRunnerBase(object): else: return origin_batch - my_print(type(self).__name__, "begin to train on trainer") + print_to_err(type(self).__name__, "begin to train on trainer") out_losses = [] for i in six.moves.xrange(RUN_STEP): loss, = exe.run(dist_prog, fetch_list=[avg_cost.name], feed=feeder.feed(get_data())) out_losses.append(loss[0]) - my_print(type(self).__name__, "run step %d finished" % i) - my_print(type(self).__name__, "trainer run finished") + print_to_err(type(self).__name__, "run step %d finished" % i) + print_to_err(type(self).__name__, "trainer run finished") if six.PY2: print(pickle.dumps(out_losses)) @@ -182,7 +189,7 @@ class TestDistRunnerBase(object): self.get_model(batch_size=args.batch_size) if args.update_method == "pserver": - my_print( + print_to_err( type(self).__name__, "begin to run transpile on trainer with pserver mode") t = self.get_transpiler(args.trainer_id, @@ -190,7 +197,7 @@ class TestDistRunnerBase(object): args.endpoints, args.trainers, args.sync_mode, args.dc_asgd) trainer_prog = t.get_trainer_program() - my_print( + print_to_err( type(self).__name__, "get trainer program done with pserver mode.") elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer": @@ -201,7 +208,7 @@ class TestDistRunnerBase(object): if args.use_hallreduce: config.use_hierarchical_allreduce = True config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks - my_print( + print_to_err( type(self).__name__, "begin to run transpile on trainer with nccl2 mode") nccl2_t = fluid.DistributeTranspiler(config=config) @@ -211,16 +218,16 @@ class TestDistRunnerBase(object): startup_program=fluid.default_startup_program(), trainers=args.endpoints, current_endpoint=args.current_endpoint) - my_print( + print_to_err( type(self).__name__, "get trainer program done. with nccl2 mode") trainer_prog = fluid.default_main_program() else: - my_print( + print_to_err( type(self).__name__, "do nothing about main program, just use it") trainer_prog = fluid.default_main_program() - my_print(type(self).__name__, "use main program done.") + print_to_err(type(self).__name__, "use main program done.") if args.use_cuda: device_id = int(os.getenv("FLAGS_selected_gpus", "0")) @@ -230,7 +237,7 @@ class TestDistRunnerBase(object): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) - my_print(type(self).__name__, "run worker startup program done.") + print_to_err(type(self).__name__, "run worker startup program done.") exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_threads = 1 @@ -262,12 +269,12 @@ class TestDistRunnerBase(object): build_stra.num_trainers = 1 build_stra.trainer_id = 0 - my_print(type(self).__name__, "begin to compile with data parallel") + print_to_err(type(self).__name__, "begin to compile with data parallel") binary = compiler.CompiledProgram(trainer_prog).with_data_parallel( loss_name=avg_cost.name, build_strategy=build_stra, exec_strategy=exec_strategy) - my_print(type(self).__name__, "program compiled with data parallel") + print_to_err(type(self).__name__, "program compiled with data parallel") feed_var_list = [ var for var in trainer_prog.global_block().vars.values() @@ -288,20 +295,17 @@ class TestDistRunnerBase(object): else: return origin_batch - my_print(type(self).__name__, "begin to train on trainer") + print_to_err(type(self).__name__, "begin to train on trainer") out_losses = [] for i in six.moves.xrange(RUN_STEP): loss, = exe.run(binary, fetch_list=[avg_cost.name], feed=feeder.feed(get_data())) out_losses.append(loss[0]) - my_print(type(self).__name__, "run step %d finished" % i) - my_print(type(self).__name__, "trainer run finished") + print_to_err(type(self).__name__, "run step %d finished" % i) + print_to_err(type(self).__name__, "trainer run finished") - if six.PY2: - print(pickle.dumps(out_losses)) - else: - sys.stdout.buffer.write(pickle.dumps(out_losses)) + print_to_out(out_losses) class TestParallelDyGraphRunnerBase(object): @@ -344,23 +348,23 @@ class TestParallelDyGraphRunnerBase(object): strategy.local_rank = args.trainer_id strategy.trainer_endpoints = args.endpoints.split(",") strategy.current_endpoint = args.current_endpoint - my_print( + print_to_err( type(self).__name__, "begin to prepare context in dygraph with nccl2") dygraph.parallel.prepare_context(strategy) model = dygraph.parallel.DataParallel(model, strategy) - my_print(type(self).__name__, "model built in dygraph") + print_to_err(type(self).__name__, "model built in dygraph") out_losses = [] - my_print(type(self).__name__, "begin to run dygraph training") + print_to_err(type(self).__name__, "begin to run dygraph training") for step_id, data in enumerate(train_reader()): data = _get_data(data) if step_id == RUN_STEP: break loss = self.run_one_loop(model, opt, data) if step_id % 10 == 0: - my_print( + print_to_err( type(self).__name__, - "loss at step %d: %f" % (step_id, loss)) + "loss at step %d: %f" % (step_id, loss.numpy())) out_losses.append(loss.numpy()) # FIXME(Yancey1989): scale the loss inplace @@ -373,7 +377,7 @@ class TestParallelDyGraphRunnerBase(object): opt.minimize(loss) model.clear_gradients() - my_print(type(self).__name__, pickle.dumps(out_losses)) + print_to_out(out_losses) def runtime_main(test_class): @@ -483,7 +487,7 @@ class TestDistBase(unittest.TestCase): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) - my_print( + print_to_err( type(self).__name__, "socket name: %s" % s.getsockname()[1]) return s.getsockname()[1] @@ -519,13 +523,13 @@ class TestDistBase(unittest.TestCase): ps0_pipe = open("/tmp/ps0_err.log", "wb") ps1_pipe = open("/tmp/ps1_err.log", "wb") - my_print(type(self).__name__, "going to start pserver process 0") + print_to_err(type(self).__name__, "going to start pserver process 0") ps0_proc = subprocess.Popen( ps0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe, env=required_envs) - my_print(type(self).__name__, "going to start pserver process 1") + print_to_err(type(self).__name__, "going to start pserver process 1") ps1_proc = subprocess.Popen( ps1_cmd.strip().split(" "), stdout=subprocess.PIPE, @@ -641,13 +645,13 @@ class TestDistBase(unittest.TestCase): tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") - my_print(type(self).__name__, "going to start trainer process 0") + print_to_err(type(self).__name__, "going to start trainer process 0") tr0_proc = subprocess.Popen( tr0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=tr0_pipe, env=env0) - my_print(type(self).__name__, "going to start trainer process 1") + print_to_err(type(self).__name__, "going to start trainer process 1") tr1_proc = subprocess.Popen( tr1_cmd.strip().split(" "), stdout=subprocess.PIPE, @@ -761,7 +765,7 @@ class TestDistBase(unittest.TestCase): tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb") - my_print( + print_to_err( type(self).__name__, "going to start process {} with nccl2".format(i)) tr_proc = subprocess.Popen( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py index ecdca39a543204b4ab3c1918a8f83acf2e538ae2..19cd1577df4a1a202513006263121b323591793c 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py @@ -13,13 +13,11 @@ # limitations under the License. from __future__ import print_function -#import unittest +import unittest from test_dist_base import TestDistBase import paddle.fluid as fluid -#TODO(guru4elephant): should have dygraph test dist base -# current TestDistBase has some incompatible code with dygraph -''' + class TestParallelDygraphMnist(TestDistBase): def _setup_config(self): self._sync_mode = False @@ -27,11 +25,9 @@ class TestParallelDygraphMnist(TestDistBase): self._dygraph = True def test_mnist(self): - return if fluid.core.is_compiled_with_cuda(): self.check_with_place("parallel_dygraph_mnist.py", delta=1e-5) -''' + if __name__ == "__main__": - #unittest.main() - pass + unittest.main()