未验证 提交 5a579df9 编写于 作者: C chengduo 提交者: GitHub

[Speedup] Make dygraph data parallel faster (#19280)

* update parallel.py
test=develop
上级 e3c68bde
...@@ -188,16 +188,14 @@ class DataParallel(layers.Layer): ...@@ -188,16 +188,14 @@ class DataParallel(layers.Layer):
from ..layers import nn from ..layers import nn
for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars: for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars:
grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes] grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes]
splited_vars = nn.split( self._helper.main_program.current_block().append_op(
coalesced_grad, num_or_sections=grad_var_len, dim=0) type='split',
reshaped_grad_vars = [] inputs={'X': coalesced_grad},
for g_var, g_shape in zip(splited_vars, grad_shapes): outputs={'Out': origin_grad_vars},
reshaped_grad_vars.append( attrs={'sections': grad_var_len,
nn.reshape( 'axis': 0})
x=g_var, shape=g_shape, inplace=True)) for g_var, g_shape in zip(origin_grad_vars, grad_shapes):
for origin_g_var, reshaped_g_var in zip(origin_grad_vars, nn.reshape(x=g_var, shape=g_shape, inplace=True)
reshaped_grad_vars):
nn.assign(input=reshaped_g_var, output=origin_g_var)
def apply_collective_grads(self): def apply_collective_grads(self):
""" """
......
...@@ -38,7 +38,14 @@ RUN_STEP = 5 ...@@ -38,7 +38,14 @@ RUN_STEP = 5
DEFAULT_BATCH_SIZE = 2 DEFAULT_BATCH_SIZE = 2
def my_print(class_name, log_str): def print_to_out(out_losses):
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
def print_to_err(class_name, log_str):
localtime = time.asctime(time.localtime(time.time())) localtime = time.asctime(time.localtime(time.time()))
print_str = localtime + "\t" + class_name + "\t" + log_str print_str = localtime + "\t" + class_name + "\t" + log_str
if six.PY2: if six.PY2:
...@@ -99,9 +106,9 @@ class TestDistRunnerBase(object): ...@@ -99,9 +106,9 @@ class TestDistRunnerBase(object):
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_prog) exe.run(startup_prog)
my_print(type(self).__name__, "run pserver startup program done.") print_to_err(type(self).__name__, "run pserver startup program done.")
exe.run(pserver_prog) exe.run(pserver_prog)
my_print(type(self).__name__, "run pserver main program done.") print_to_err(type(self).__name__, "run pserver main program done.")
def run_gpu_fleet_api_trainer(self, args): def run_gpu_fleet_api_trainer(self, args):
assert args.update_method == "nccl2" assert args.update_method == "nccl2"
...@@ -118,7 +125,7 @@ class TestDistRunnerBase(object): ...@@ -118,7 +125,7 @@ class TestDistRunnerBase(object):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
my_print("gpu_fleet", "fleet.node_num:") print_to_err("gpu_fleet", "fleet.node_num:")
#"fleet.node_id:", fleet.node_id(), #"fleet.node_id:", fleet.node_id(),
#"fleet.trainer_num:", fleet.worker_num()) #"fleet.trainer_num:", fleet.worker_num())
...@@ -154,15 +161,15 @@ class TestDistRunnerBase(object): ...@@ -154,15 +161,15 @@ class TestDistRunnerBase(object):
else: else:
return origin_batch return origin_batch
my_print(type(self).__name__, "begin to train on trainer") print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = [] out_losses = []
for i in six.moves.xrange(RUN_STEP): for i in six.moves.xrange(RUN_STEP):
loss, = exe.run(dist_prog, loss, = exe.run(dist_prog,
fetch_list=[avg_cost.name], fetch_list=[avg_cost.name],
feed=feeder.feed(get_data())) feed=feeder.feed(get_data()))
out_losses.append(loss[0]) out_losses.append(loss[0])
my_print(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "run step %d finished" % i)
my_print(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
if six.PY2: if six.PY2:
print(pickle.dumps(out_losses)) print(pickle.dumps(out_losses))
...@@ -182,7 +189,7 @@ class TestDistRunnerBase(object): ...@@ -182,7 +189,7 @@ class TestDistRunnerBase(object):
self.get_model(batch_size=args.batch_size) self.get_model(batch_size=args.batch_size)
if args.update_method == "pserver": if args.update_method == "pserver":
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"begin to run transpile on trainer with pserver mode") "begin to run transpile on trainer with pserver mode")
t = self.get_transpiler(args.trainer_id, t = self.get_transpiler(args.trainer_id,
...@@ -190,7 +197,7 @@ class TestDistRunnerBase(object): ...@@ -190,7 +197,7 @@ class TestDistRunnerBase(object):
args.endpoints, args.trainers, args.endpoints, args.trainers,
args.sync_mode, args.dc_asgd) args.sync_mode, args.dc_asgd)
trainer_prog = t.get_trainer_program() trainer_prog = t.get_trainer_program()
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"get trainer program done with pserver mode.") "get trainer program done with pserver mode.")
elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer": elif args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
...@@ -201,7 +208,7 @@ class TestDistRunnerBase(object): ...@@ -201,7 +208,7 @@ class TestDistRunnerBase(object):
if args.use_hallreduce: if args.use_hallreduce:
config.use_hierarchical_allreduce = True config.use_hierarchical_allreduce = True
config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"begin to run transpile on trainer with nccl2 mode") "begin to run transpile on trainer with nccl2 mode")
nccl2_t = fluid.DistributeTranspiler(config=config) nccl2_t = fluid.DistributeTranspiler(config=config)
...@@ -211,16 +218,16 @@ class TestDistRunnerBase(object): ...@@ -211,16 +218,16 @@ class TestDistRunnerBase(object):
startup_program=fluid.default_startup_program(), startup_program=fluid.default_startup_program(),
trainers=args.endpoints, trainers=args.endpoints,
current_endpoint=args.current_endpoint) current_endpoint=args.current_endpoint)
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"get trainer program done. with nccl2 mode") "get trainer program done. with nccl2 mode")
trainer_prog = fluid.default_main_program() trainer_prog = fluid.default_main_program()
else: else:
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"do nothing about main program, just use it") "do nothing about main program, just use it")
trainer_prog = fluid.default_main_program() trainer_prog = fluid.default_main_program()
my_print(type(self).__name__, "use main program done.") print_to_err(type(self).__name__, "use main program done.")
if args.use_cuda: if args.use_cuda:
device_id = int(os.getenv("FLAGS_selected_gpus", "0")) device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
...@@ -230,7 +237,7 @@ class TestDistRunnerBase(object): ...@@ -230,7 +237,7 @@ class TestDistRunnerBase(object):
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
my_print(type(self).__name__, "run worker startup program done.") print_to_err(type(self).__name__, "run worker startup program done.")
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 1 exec_strategy.num_threads = 1
...@@ -262,12 +269,12 @@ class TestDistRunnerBase(object): ...@@ -262,12 +269,12 @@ class TestDistRunnerBase(object):
build_stra.num_trainers = 1 build_stra.num_trainers = 1
build_stra.trainer_id = 0 build_stra.trainer_id = 0
my_print(type(self).__name__, "begin to compile with data parallel") print_to_err(type(self).__name__, "begin to compile with data parallel")
binary = compiler.CompiledProgram(trainer_prog).with_data_parallel( binary = compiler.CompiledProgram(trainer_prog).with_data_parallel(
loss_name=avg_cost.name, loss_name=avg_cost.name,
build_strategy=build_stra, build_strategy=build_stra,
exec_strategy=exec_strategy) exec_strategy=exec_strategy)
my_print(type(self).__name__, "program compiled with data parallel") print_to_err(type(self).__name__, "program compiled with data parallel")
feed_var_list = [ feed_var_list = [
var for var in trainer_prog.global_block().vars.values() var for var in trainer_prog.global_block().vars.values()
...@@ -288,20 +295,17 @@ class TestDistRunnerBase(object): ...@@ -288,20 +295,17 @@ class TestDistRunnerBase(object):
else: else:
return origin_batch return origin_batch
my_print(type(self).__name__, "begin to train on trainer") print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = [] out_losses = []
for i in six.moves.xrange(RUN_STEP): for i in six.moves.xrange(RUN_STEP):
loss, = exe.run(binary, loss, = exe.run(binary,
fetch_list=[avg_cost.name], fetch_list=[avg_cost.name],
feed=feeder.feed(get_data())) feed=feeder.feed(get_data()))
out_losses.append(loss[0]) out_losses.append(loss[0])
my_print(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "run step %d finished" % i)
my_print(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
if six.PY2: print_to_out(out_losses)
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
class TestParallelDyGraphRunnerBase(object): class TestParallelDyGraphRunnerBase(object):
...@@ -344,23 +348,23 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -344,23 +348,23 @@ class TestParallelDyGraphRunnerBase(object):
strategy.local_rank = args.trainer_id strategy.local_rank = args.trainer_id
strategy.trainer_endpoints = args.endpoints.split(",") strategy.trainer_endpoints = args.endpoints.split(",")
strategy.current_endpoint = args.current_endpoint strategy.current_endpoint = args.current_endpoint
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"begin to prepare context in dygraph with nccl2") "begin to prepare context in dygraph with nccl2")
dygraph.parallel.prepare_context(strategy) dygraph.parallel.prepare_context(strategy)
model = dygraph.parallel.DataParallel(model, strategy) model = dygraph.parallel.DataParallel(model, strategy)
my_print(type(self).__name__, "model built in dygraph") print_to_err(type(self).__name__, "model built in dygraph")
out_losses = [] out_losses = []
my_print(type(self).__name__, "begin to run dygraph training") print_to_err(type(self).__name__, "begin to run dygraph training")
for step_id, data in enumerate(train_reader()): for step_id, data in enumerate(train_reader()):
data = _get_data(data) data = _get_data(data)
if step_id == RUN_STEP: if step_id == RUN_STEP:
break break
loss = self.run_one_loop(model, opt, data) loss = self.run_one_loop(model, opt, data)
if step_id % 10 == 0: if step_id % 10 == 0:
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"loss at step %d: %f" % (step_id, loss)) "loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy()) out_losses.append(loss.numpy())
# FIXME(Yancey1989): scale the loss inplace # FIXME(Yancey1989): scale the loss inplace
...@@ -373,7 +377,7 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -373,7 +377,7 @@ class TestParallelDyGraphRunnerBase(object):
opt.minimize(loss) opt.minimize(loss)
model.clear_gradients() model.clear_gradients()
my_print(type(self).__name__, pickle.dumps(out_losses)) print_to_out(out_losses)
def runtime_main(test_class): def runtime_main(test_class):
...@@ -483,7 +487,7 @@ class TestDistBase(unittest.TestCase): ...@@ -483,7 +487,7 @@ class TestDistBase(unittest.TestCase):
with closing(socket.socket(socket.AF_INET, with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s: socket.SOCK_STREAM)) as s:
s.bind(('', 0)) s.bind(('', 0))
my_print( print_to_err(
type(self).__name__, "socket name: %s" % s.getsockname()[1]) type(self).__name__, "socket name: %s" % s.getsockname()[1])
return s.getsockname()[1] return s.getsockname()[1]
...@@ -519,13 +523,13 @@ class TestDistBase(unittest.TestCase): ...@@ -519,13 +523,13 @@ class TestDistBase(unittest.TestCase):
ps0_pipe = open("/tmp/ps0_err.log", "wb") ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb") ps1_pipe = open("/tmp/ps1_err.log", "wb")
my_print(type(self).__name__, "going to start pserver process 0") print_to_err(type(self).__name__, "going to start pserver process 0")
ps0_proc = subprocess.Popen( ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "), ps0_cmd.strip().split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=ps0_pipe, stderr=ps0_pipe,
env=required_envs) env=required_envs)
my_print(type(self).__name__, "going to start pserver process 1") print_to_err(type(self).__name__, "going to start pserver process 1")
ps1_proc = subprocess.Popen( ps1_proc = subprocess.Popen(
ps1_cmd.strip().split(" "), ps1_cmd.strip().split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -641,13 +645,13 @@ class TestDistBase(unittest.TestCase): ...@@ -641,13 +645,13 @@ class TestDistBase(unittest.TestCase):
tr0_pipe = open("/tmp/tr0_err.log", "wb") tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb")
my_print(type(self).__name__, "going to start trainer process 0") print_to_err(type(self).__name__, "going to start trainer process 0")
tr0_proc = subprocess.Popen( tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "), tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=tr0_pipe, stderr=tr0_pipe,
env=env0) env=env0)
my_print(type(self).__name__, "going to start trainer process 1") print_to_err(type(self).__name__, "going to start trainer process 1")
tr1_proc = subprocess.Popen( tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "), tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -761,7 +765,7 @@ class TestDistBase(unittest.TestCase): ...@@ -761,7 +765,7 @@ class TestDistBase(unittest.TestCase):
tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb") tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb")
my_print( print_to_err(
type(self).__name__, type(self).__name__,
"going to start process {} with nccl2".format(i)) "going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen( tr_proc = subprocess.Popen(
......
...@@ -13,13 +13,11 @@ ...@@ -13,13 +13,11 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
#import unittest import unittest
from test_dist_base import TestDistBase from test_dist_base import TestDistBase
import paddle.fluid as fluid import paddle.fluid as fluid
#TODO(guru4elephant): should have dygraph test dist base
# current TestDistBase has some incompatible code with dygraph
'''
class TestParallelDygraphMnist(TestDistBase): class TestParallelDygraphMnist(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
...@@ -27,11 +25,9 @@ class TestParallelDygraphMnist(TestDistBase): ...@@ -27,11 +25,9 @@ class TestParallelDygraphMnist(TestDistBase):
self._dygraph = True self._dygraph = True
def test_mnist(self): def test_mnist(self):
return
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
self.check_with_place("parallel_dygraph_mnist.py", delta=1e-5) self.check_with_place("parallel_dygraph_mnist.py", delta=1e-5)
'''
if __name__ == "__main__": if __name__ == "__main__":
#unittest.main() unittest.main()
pass
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册