提交 8640630b 编写于 作者: L LielinJiang

suport multiple gpus

上级 71075698
...@@ -17,15 +17,18 @@ from __future__ import absolute_import ...@@ -17,15 +17,18 @@ from __future__ import absolute_import
import inspect import inspect
import os import os
import pickle import pickle
from collections import OrderedDict
import numpy as np import numpy as np
from collections import OrderedDict
from paddle import fluid from paddle import fluid
from paddle.fluid.framework import in_dygraph_mode, Variable from paddle.fluid.framework import in_dygraph_mode, Variable
from paddle.fluid.executor import global_scope from paddle.fluid.executor import global_scope
from paddle.fluid.io import is_belong_to_optimizer from paddle.fluid.io import is_belong_to_optimizer
from paddle.fluid.dygraph.base import to_variable from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import distributed
__all__ = ['shape_hints', 'Model', 'Loss', 'CrossEntropy'] __all__ = ['shape_hints', 'Model', 'Loss', 'CrossEntropy']
...@@ -124,6 +127,9 @@ class StaticGraphAdapter(object): ...@@ -124,6 +127,9 @@ class StaticGraphAdapter(object):
self._lazy_load_optimizer = None self._lazy_load_optimizer = None
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
# parse shape hints # parse shape hints
self._input_desc = OrderedDict([ self._input_desc = OrderedDict([
(n, None) for n in extract_args(self.model.forward) if n != 'self' (n, None) for n in extract_args(self.model.forward) if n != 'self'
...@@ -279,13 +285,15 @@ class StaticGraphAdapter(object): ...@@ -279,13 +285,15 @@ class StaticGraphAdapter(object):
endpoints = self._endpoints[self.mode] endpoints = self._endpoints[self.mode]
fetch_list = endpoints['output'] + endpoints['loss'] fetch_list = endpoints['output'] + endpoints['loss']
num_output = len(endpoints['output']) num_output = len(endpoints['output'])
if self.mode != 'test':
fetch_list += endpoints['label']
out = self._executor.run( out = self._executor.run(
compiled_prog, feed=feed, compiled_prog, feed=feed,
fetch_list=fetch_list) fetch_list=fetch_list)
if self.mode == 'test': if self.mode == 'test':
return out[:num_output] return out[:num_output]
else: else:
return out[:num_output], out[num_output:] return out[:num_output], out[num_output:-1], out[-1:]
def _make_program(self, inputs): def _make_program(self, inputs):
prog = self._orig_prog.clone() prog = self._orig_prog.clone()
...@@ -293,6 +301,7 @@ class StaticGraphAdapter(object): ...@@ -293,6 +301,7 @@ class StaticGraphAdapter(object):
# HACK workaround learning rate map issue # HACK workaround learning rate map issue
lr_var = self.model._optimizer._learning_rate_map[self._orig_prog] lr_var = self.model._optimizer._learning_rate_map[self._orig_prog]
self.model._optimizer._learning_rate_map[prog] = lr_var self.model._optimizer._learning_rate_map[prog] = lr_var
losses = [] losses = []
with fluid.program_guard(prog, self._startup_prog): with fluid.program_guard(prog, self._startup_prog):
outputs = to_list(self.model.forward(*inputs)) outputs = to_list(self.model.forward(*inputs))
...@@ -302,13 +311,27 @@ class StaticGraphAdapter(object): ...@@ -302,13 +311,27 @@ class StaticGraphAdapter(object):
losses = self.model._loss_function(outputs, label_vars) losses = self.model._loss_function(outputs, label_vars)
if self.mode == 'train': if self.mode == 'train':
self._loss_endpoint = fluid.layers.sum(losses) self._loss_endpoint = fluid.layers.sum(losses)
if self._nranks > 1:
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
dist_strategy = DistributedStrategy()
dist_strategy.mode = "collective"
dist_strategy.collective_mode = "grad_allreduce"
self.model._optimizer = fleet.distributed_optimizer(self.model._optimizer, strategy=dist_strategy)
self.model._optimizer.minimize(self._loss_endpoint) self.model._optimizer.minimize(self._loss_endpoint)
if self.mode != 'train':
outputs = [distributed._all_gather(o, self._nranks) for o in outputs]
if self.mode != 'test':
label_vars = [distributed._all_gather(l, self._nranks) for l in label_vars]
if self.mode != 'train': # clone again to put it in test mode if self.mode != 'train': # clone again to put it in test mode
prog = prog.clone(for_test=True) prog = prog.clone(for_test=True)
self._progs[self.mode] = prog self._progs[self.mode] = prog
self._endpoints[self.mode] = { self._endpoints[self.mode] = {
"output": outputs, "output": outputs,
"loss": losses "loss": losses,
"label": label_vars
} }
def _infer_input_vars(self, inputs): def _infer_input_vars(self, inputs):
...@@ -346,7 +369,12 @@ class StaticGraphAdapter(object): ...@@ -346,7 +369,12 @@ class StaticGraphAdapter(object):
# even if `forward()` may run different code path for different mode # even if `forward()` may run different code path for different mode
# therefore startup program only needs to run once # therefore startup program only needs to run once
if self._executor is None: if self._executor is None:
self._executor = fluid.Executor(places[0]) if self._nranks > 1 and device.lower() == 'gpu':
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
else:
place = places[0]
self._executor = fluid.Executor(place)
# XXX incremental initialization # XXX incremental initialization
uninitialized = [] uninitialized = []
for var_py in self._startup_prog.list_vars(): for var_py in self._startup_prog.list_vars():
...@@ -362,7 +390,10 @@ class StaticGraphAdapter(object): ...@@ -362,7 +390,10 @@ class StaticGraphAdapter(object):
self._load_optimizer(self._lazy_load_optimizer) self._load_optimizer(self._lazy_load_optimizer)
self._lazy_load_optimizer = None self._lazy_load_optimizer = None
compiled_prog = fluid.CompiledProgram(prog) if self._nranks < 2:
compiled_prog = fluid.CompiledProgram(prog)
else:
compiled_prog = prog#fleet.main_program
if len(device_ids) > 1: if len(device_ids) > 1:
loss_name = None loss_name = None
if self.mode == 'train' and self._loss_endpoint is not None: if self.mode == 'train' and self._loss_endpoint is not None:
...@@ -389,6 +420,11 @@ class DynamicGraphAdapter(object): ...@@ -389,6 +420,11 @@ class DynamicGraphAdapter(object):
def __init__(self, model): def __init__(self, model):
super(DynamicGraphAdapter, self).__init__() super(DynamicGraphAdapter, self).__init__()
self.model = model self.model = model
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
if self._nranks > 1:
self.ddp_model = distributed.DistributedDataParallel(self.model)
@property @property
def mode(self): def mode(self):
...@@ -406,14 +442,22 @@ class DynamicGraphAdapter(object): ...@@ -406,14 +442,22 @@ class DynamicGraphAdapter(object):
self.mode = 'train' self.mode = 'train'
inputs = to_list(inputs) inputs = to_list(inputs)
labels = to_list(labels) labels = to_list(labels)
outputs = self.model.forward(*[to_variable(x) for x in inputs]) if self._nranks > 1:
losses = self.model._loss_function(outputs, labels) outputs = self.ddp_model.forward(*[to_variable(x) for x in inputs])
final_loss = fluid.layers.sum(losses) losses = self.model._loss_function(outputs, labels)
final_loss.backward() final_loss = fluid.layers.sum(losses)
final_loss = self.ddp_model.scale_loss(final_loss)
final_loss.backward()
self.ddp_model.apply_collective_grads()
else:
outputs = self.model.forward(*[to_variable(x) for x in inputs])
losses = self.model._loss_function(outputs, labels)
final_loss = fluid.layers.sum(losses)
final_loss.backward()
self.model._optimizer.minimize(final_loss) self.model._optimizer.minimize(final_loss)
self.model.clear_gradients() self.model.clear_gradients()
return [to_numpy(o) for o in to_list(outputs)], \ return [to_numpy(o) for o in to_list(outputs)], \
[to_numpy(l) for l in losses] [to_numpy(l) for l in losses], [l for l in labels]
def eval(self, inputs, labels, device='CPU', device_ids=None): def eval(self, inputs, labels, device='CPU', device_ids=None):
assert self.model._loss_function, \ assert self.model._loss_function, \
...@@ -422,16 +466,22 @@ class DynamicGraphAdapter(object): ...@@ -422,16 +466,22 @@ class DynamicGraphAdapter(object):
self.mode = 'eval' self.mode = 'eval'
inputs = to_list(inputs) inputs = to_list(inputs)
labels = to_list(labels) labels = to_list(labels)
labels = [to_variable(l) for l in labels]
outputs = self.model.forward(*[to_variable(x) for x in inputs]) outputs = self.model.forward(*[to_variable(x) for x in inputs])
losses = self.model._loss_function(outputs, labels) losses = self.model._loss_function(outputs, labels)
if self._nranks > 1:
outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
labels = [distributed._all_gather(l, self._nranks) for l in labels]
return [to_numpy(o) for o in to_list(outputs)], \ return [to_numpy(o) for o in to_list(outputs)], \
[to_numpy(l) for l in losses] [to_numpy(l) for l in losses], [to_numpy(l) for l in labels]
def test(self, inputs, device='CPU', device_ids=None): def test(self, inputs, device='CPU', device_ids=None):
super(Model, self.model).eval() super(Model, self.model).eval()
self.mode = 'test' self.mode = 'test'
inputs = [to_variable(x) for x in to_list(inputs)] inputs = [to_variable(x) for x in to_list(inputs)]
outputs = self.model.forward(*inputs) outputs = self.model.forward(*inputs)
if self._nranks > 2:
outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
return [to_numpy(o) for o in to_list(outputs)] return [to_numpy(o) for o in to_list(outputs)]
def parameters(self, *args, **kwargs): def parameters(self, *args, **kwargs):
...@@ -476,7 +526,8 @@ class Model(fluid.dygraph.Layer): ...@@ -476,7 +526,8 @@ class Model(fluid.dygraph.Layer):
return self._adapter.test(*args, **kwargs) return self._adapter.test(*args, **kwargs)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
return self._adapter.save(*args, **kwargs) if distributed.get_local_rank() == 0:
return self._adapter.save(*args, **kwargs)
def load(self, *args, **kwargs): def load(self, *args, **kwargs):
return self._adapter.load(*args, **kwargs) return self._adapter.load(*args, **kwargs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册