提交 abc1ecaa 编写于 作者: L LielinJiang

refine mupltiple gpus codes

上级 994975bd
......@@ -36,6 +36,7 @@ from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadc
_c_sync_comm_stream, _c_sync_calc_stream
from paddle.fluid.io import BatchSampler, DataLoader
__parallel_context_init = False
class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset.
......@@ -109,53 +110,6 @@ class DistributedBatchSampler(BatchSampler):
return num_samples // self.batch_size
@contextlib.contextmanager
def null_guard():
yield
def to_numpy(var):
assert isinstance(var, (Variable, fluid.core.VarBase)), "not a variable"
if isinstance(var, fluid.core.VarBase):
return var.numpy()
t = global_scope().find_var(var.name).get_tensor()
return np.array(t)
def all_gather(input):
place = fluid.CUDAPlace(Env().dev_id) \
if Env().nranks > 1 else fluid.CUDAPlace(0)
guard = null_guard() if fluid.in_dygraph_mode() else fluid.dygraph.guard(place)
with guard:
input = to_variable(input)
output = _all_gather(input, Env().nranks)
return to_numpy(output)
def _all_reduce(x, out=None, reduce_type="sum", sync_mode=True):
out = _c_allreduce(x, out, reduce_type)
if sync_mode:
return _c_sync_calc_stream(out)
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
def _bradcast(x, root=0, ring_id=0, use_calc_stream=True):
return _c_broadcast(x, root, ring_id, use_calc_stream)
def _sync_comm_stream(x, ring_id):
return _c_sync_comm_stream(x, ring_id)
def barrier():
pass
def get_local_rank():
return Env().local_rank
......@@ -224,24 +178,45 @@ def init_communicator(program, rank, nranks, wait_port,
})
def prepare_context(place):
def prepare_distributed_context(place=None):
if place is None:
place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \
else fluid.CUDAPlace(0)
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
if strategy.nranks < 2:
return
if isinstance(place, core.CUDAPlace):
global __parallel_context_init
if not __parallel_context_init and isinstance(place, core.CUDAPlace):
def _init_context():
communicator_prog = framework.Program()
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks, True,
strategy.current_endpoint, strategy.trainer_endpoints)
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks,
True, strategy.current_endpoint, strategy.trainer_endpoints)
exe = fluid.Executor(place)
exe.run(communicator_prog)
if fluid.in_dygraph_mode():
cnt = 0
while fluid.in_dygraph_mode():
cnt += 1
print('debug', cnt)
fluid.disable_dygraph()
_init_context()
fluid.enable_dygraph(place)
else:
_init_context()
else:
assert ("Only support CUDAPlace for now.")
__parallel_context_init = True
return strategy
......
......@@ -21,15 +21,14 @@ import os
import numpy as np
import paddle
from paddle import fluid
from paddle.fluid.optimizer import Momentum
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from paddle.fluid.io import MNIST as MnistDataset
from model import Model, CrossEntropy, Input
from metrics import Accuracy
from distributed import prepare_context, Env, get_nranks, DistributedBatchSampler
from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset
class SimpleImgConvPool(fluid.dygraph.Layer):
def __init__(self,
......@@ -106,60 +105,17 @@ class MNIST(Model):
return x
class CustromMnistDataset(MnistDataset):
def __init__(self,
image_filename=None,
label_filename=None,
mode='train',
download=True):
super(CustromMnistDataset, self).__init__(image_filename,
label_filename, mode, download)
def __getitem__(self, idx):
return self.images[idx], [self.labels[idx]]
def main():
@contextlib.contextmanager
def null_guard():
yield
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0)
guard = fluid.dygraph.guard(place) if FLAGS.dynamic else null_guard()
if fluid.dygraph.parallel.Env().nranks > 1:
prepare_context(place)
fluid.enable_dygraph(place) if FLAGS.dynamic else None
if not os.path.exists('mnist_checkpoints'):
os.mkdir('mnist_checkpoints')
with guard:
train_dataset = CustromMnistDataset(mode='train')
val_dataset = CustromMnistDataset(mode='test')
train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test')
inputs = [Input([None, 784], 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in inputs + labels]
if get_nranks() > 1:
train_sampler = DistributedBatchSampler(train_dataset, batch_size=FLAGS.batch_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_sampler = DistributedBatchSampler(val_dataset, batch_size=FLAGS.batch_size)
val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
else:
train_loader = DataLoader(train_dataset, batch_size=FLAGS.batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_loader = DataLoader(val_dataset, batch_size=FLAGS.batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
model = MNIST()
optim = Momentum(
learning_rate=FLAGS.lr,
......@@ -170,7 +126,7 @@ def main():
if FLAGS.resume is not None:
model.load(FLAGS.resume)
model.fit(train_loader, val_loader, epochs=FLAGS.epoch)
model.fit(train_dataset, val_dataset, epochs=FLAGS.epoch, batch_size=FLAGS.batch_size)
if __name__ == '__main__':
......@@ -178,7 +134,7 @@ if __name__ == '__main__':
parser.add_argument(
"-d", "--dynamic", action='store_true', help="enable dygraph mode")
parser.add_argument(
"-e", "--epoch", default=100, type=int, help="number of epoch")
"-e", "--epoch", default=2, type=int, help="number of epoch")
parser.add_argument(
'--lr',
'--learning-rate',
......
......@@ -33,7 +33,8 @@ from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import distributed
from distributed import DistributedBatchSampler
from paddle.fluid.io import DataLoader
from metrics import Metric
from callbacks import config_callbacks
......@@ -348,6 +349,7 @@ class StaticGraphAdapter(object):
for metric, state in zip(self.model._metrics, metric_states):
# cut off padding size
if self.mode != 'train' and self.model._test_dataloader is not None \
and isinstance(self.model._test_dataloader, DataLoader) \
and self._nranks > 1:
total_size = len(self.model._test_dataloader.dataset)
# TODO: fixme if have better way to get batch size
......@@ -417,7 +419,8 @@ class StaticGraphAdapter(object):
strategy=dist_strategy)
self.model._optimizer.minimize(self._loss_endpoint)
if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None:
if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None \
and isinstance(self.model._test_dataloader, DataLoader):
outputs = [distributed._all_gather(o, self._nranks) for o in outputs]
if mode != 'test':
labels = [distributed._all_gather(l, self._nranks) for l in labels]
......@@ -457,7 +460,7 @@ class StaticGraphAdapter(object):
# therefore startup program only needs to run once
if self._executor is None:
if self._nranks > 1 and device.lower() == 'gpu':
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
gpu_id = int(distributed.Env().dev_id)
place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
else:
place = places[0]
......@@ -529,6 +532,7 @@ class DynamicGraphAdapter(object):
losses = self.model._loss_function(outputs, labels)
final_loss = fluid.layers.sum(losses)
final_loss.backward()
self.model._optimizer.minimize(final_loss)
self.model.clear_gradients()
metrics = []
......@@ -536,6 +540,7 @@ class DynamicGraphAdapter(object):
metric_outs = metric.add_metric_op(to_list(outputs), to_list(labels))
m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
metrics.append(m)
return ([to_numpy(l) for l in losses], metrics) \
if len(metrics) > 0 else [to_numpy(l) for l in losses]
......@@ -667,10 +672,16 @@ class Model(fluid.dygraph.Layer):
self._device = None
self._device_ids = None
self._optimizer = None
self._distributed_sampler = None
self._test_dataloader = None
if in_dygraph_mode():
# init multiple gpus context
self._place = fluid.CUDAPlace(distributed.Env().dev_id) \
if distributed.Env().nranks > 1 else fluid.CUDAPlace(0)
if distributed.get_nranks() > 1:
distributed.prepare_distributed_context(self._place)
# init backend
if fluid.in_dygraph_mode():
self._adapter = DynamicGraphAdapter(self)
else:
self._adapter = StaticGraphAdapter(self)
......@@ -799,6 +810,7 @@ class Model(fluid.dygraph.Layer):
the variable to the environment variable and set its value to 1.
The default is None.
"""
self._optimizer = optimizer
if loss_function:
if not isinstance(loss_function, Loss):
......@@ -830,13 +842,17 @@ class Model(fluid.dygraph.Layer):
def fit(
self,
train_dataset=None,
eval_dataset=None,
train_loader=None,
eval_loader=None,
batch_size=1,
epochs=1,
eval_freq=1,
log_freq=10,
save_freq=1,
verbose=2,
num_workers=0,
callbacks=None, ):
"""
FIXME: add more comments and usage
......@@ -853,6 +869,57 @@ class Model(fluid.dygraph.Layer):
callbacks (Callback|None): list of `Callback` instances to apply
during training.
"""
assert train_dataset is not None or train_loader is not None, \
"train_dataset or train_loader must be given"
assert (train_loader is not None and train_dataset is None) or \
(train_loader is None and train_dataset is not None), \
"train_dataset should not be set when train_loader is given"
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in self._inputs + self._labels]
if train_loader is None:
if distributed.get_nranks() > 1:
train_sampler = DistributedBatchSampler(train_dataset,
batch_size=batch_size,
shuffle=True)
train_loader = DataLoader(train_dataset,
batch_sampler=train_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
else:
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
places=self._place,
feed_list=feed_list,
num_workers=4,
return_list=True)
if eval_loader is None and eval_dataset is not None:
if distributed.get_nranks() > 1:
eval_sampler = DistributedBatchSampler(eval_dataset,
batch_size=batch_size)
eval_loader = DataLoader(eval_dataset,
batch_sampler=eval_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
else:
eval_loader = DataLoader(eval_dataset,
batch_size=batch_size,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
do_eval = eval_loader is not None
self._test_dataloader = eval_loader
metrics_name = self._metrics_name()
......
......@@ -31,8 +31,9 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from model import Model, CrossEntropy, Input, Loss
from metrics import Accuracy
from callbacks import ProgBarLogger
from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset
from distributed import *
from paddle.fluid.io import BatchSampler, DataLoader
from paddle.fluid.io import MNIST as MnistDataset
class SimpleImgConvPool(fluid.dygraph.Layer):
def __init__(self,
......@@ -143,55 +144,20 @@ class MyCrossEntropy(Loss):
return [loss1, loss2]
class CustromMnistDataset(MnistDataset):
def __init__(self,
image_filename=None,
label_filename=None,
mode='train',
download=True):
super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download)
def __getitem__(self, idx):
return self.images[idx], [self.labels[idx]]
class TestModel(unittest.TestCase):
def fit(self, dynamic, is_mlp=False):
im_shape = (-1, 784)
guard = fluid.dygraph.guard() if dynamic else null_guard()
batch_size = 128
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0)
guard = fluid.dygraph.guard(place) if dynamic else null_guard()
if fluid.dygraph.parallel.Env().nranks > 1:
prepare_context(place)
fluid.enable_dygraph(place) if dynamic else None
with guard:
inputs = [Input(im_shape, 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in inputs + labels]
train_dataset = CustromMnistDataset(mode='train')
val_dataset = CustromMnistDataset(mode='test')
if get_nranks() > 1:
train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_sampler = DistributedBatchSampler(val_dataset, batch_size=batch_size)
val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
else:
train_loader = DataLoader(train_dataset, batch_size=batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test')
model = MNIST() if not is_mlp else MLP()
optim = fluid.optimizer.Momentum(
......@@ -201,7 +167,7 @@ class TestModel(unittest.TestCase):
loss = CrossEntropy() if not is_mlp else MyCrossEntropy()
model.prepare(optim, loss, Accuracy(), inputs, labels)
cbk = ProgBarLogger(50)
model.fit(train_loader, val_loader, epochs=2, callbacks=cbk)
model.fit(train_dataset, val_dataset, epochs=2, batch_size=batch_size, callbacks=cbk)
def test_fit_static(self):
self.fit(False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册