提交 a560b0ee 编写于 作者: L LielinJiang

make multiple gpus support fit

上级 fba7ea99
......@@ -16,7 +16,7 @@ import six
import copy
from progressbar import ProgressBar
from distributed import get_local_rank
def config_callbacks(callbacks=None,
model=None,
......@@ -193,7 +193,7 @@ class ProgBarLogger(Callback):
self.steps = self.params['steps']
self.epoch = epoch
self.train_step = 0
if self.verbose and self.epochs:
if self.verbose and self.epochs and get_local_rank() == 0:
print('Epoch %d/%d' % (epoch + 1, self.epochs))
self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose)
......@@ -230,6 +230,7 @@ class ProgBarLogger(Callback):
self.evaled_samples = 0
self.eval_progbar = ProgressBar(
num=self.eval_steps, verbose=self.verbose)
if get_local_rank() == 0:
print('Eval begin...')
def on_eval_batch_end(self, step, logs=None):
......@@ -240,7 +241,7 @@ class ProgBarLogger(Callback):
def on_eval_end(self, logs=None):
logs = logs or {}
if self.verbose:
if self.verbose and get_local_rank() == 0:
self._updates(logs, 'eval')
print('Eval samples: %d' % (self.evaled_samples))
......@@ -254,13 +255,13 @@ class ModelCheckpoint(Callback):
self.epoch = epoch
def on_epoch_end(self, epoch, logs=None):
if self.model and self.epoch % self.save_freq == 0:
if self.model and self.epoch % self.save_freq == 0 and get_local_rank() == 0:
path = '{}/{}'.format(self.save_file, epoch)
print('save checkpoint at {}'.format(path))
self.model.save(path)
def on_train_end(self, logs=None):
if self.model:
if self.model and get_local_rank() == 0:
path = '{}/final'.format(self.save_file)
print('save checkpoint at {}'.format(path))
self.model.save(path)
......@@ -28,7 +28,8 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from model import Model, CrossEntropy, Input
from metrics import Accuracy
from distributed import prepare_context, all_gather, Env, get_nranks, get_local_rank, DistributedBatchSampler, to_numpy
from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset
class SimpleImgConvPool(fluid.dygraph.Layer):
def __init__(self,
......@@ -97,6 +98,7 @@ class MNIST(Model):
act="softmax")
def forward(self, inputs):
inputs = fluid.layers.reshape(inputs, [-1, 1, 28, 28])
x = self._simple_img_conv_pool_1(inputs)
x = self._simple_img_conv_pool_2(x)
x = fluid.layers.flatten(x, axis=1)
......@@ -104,17 +106,17 @@ class MNIST(Model):
return x
def accuracy(pred, label, topk=(1, )):
maxk = max(topk)
pred = np.argsort(pred)[:, ::-1][:, :maxk]
correct = (pred == np.repeat(label, maxk, 1))
class CustromMnistDataset(MnistDataset):
def __init__(self,
image_filename=None,
label_filename=None,
mode='train',
download=True):
super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download)
batch_size = label.shape[0]
res = []
for k in topk:
correct_k = correct[:, :k].sum()
res.append(100.0 * correct_k / batch_size)
return res
def __getitem__(self, idx):
return self.images[idx], [self.labels[idx]]
def main():
......@@ -122,63 +124,64 @@ def main():
def null_guard():
yield
guard = fluid.dygraph.guard() if FLAGS.dynamic else null_guard()
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0)
guard = fluid.dygraph.guard(place) if FLAGS.dynamic else null_guard()
if fluid.dygraph.parallel.Env().nranks > 1:
prepare_context(place)
if not os.path.exists('mnist_checkpoints'):
os.mkdir('mnist_checkpoints')
train_loader = fluid.io.xmap_readers(
lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28),
np.array([x[1] for x in b]).reshape(-1, 1)],
paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4),
batch_size=FLAGS.batch_size, drop_last=True), 1, 1)
val_loader = fluid.io.xmap_readers(
lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28),
np.array([x[1] for x in b]).reshape(-1, 1)],
paddle.batch(paddle.dataset.mnist.test(),
batch_size=FLAGS.batch_size, drop_last=True), 1, 1)
# train_loader = fluid.io.xmap_readers(
# lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28),
# np.array([x[1] for x in b]).reshape(-1, 1)],
# paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4),
# batch_size=FLAGS.batch_size, drop_last=True), 1, 1)
# val_loader = fluid.io.xmap_readers(
# lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28),
# np.array([x[1] for x in b]).reshape(-1, 1)],
# paddle.batch(paddle.dataset.mnist.test(),
# batch_size=FLAGS.batch_size, drop_last=True), 1, 1)
with guard:
train_dataset = CustromMnistDataset(mode='train')
val_dataset = CustromMnistDataset(mode='test')
inputs = [Input([None, 784], 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in inputs + labels]
if get_nranks() > 1:
train_sampler = DistributedBatchSampler(train_dataset, batch_size=FLAGS.batch_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_sampler = DistributedBatchSampler(val_dataset, batch_size=FLAGS.batch_size)
val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
else:
train_loader = DataLoader(train_dataset, batch_size=FLAGS.batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_loader = DataLoader(val_dataset, batch_size=FLAGS.batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
model = MNIST()
optim = Momentum(
learning_rate=FLAGS.lr,
momentum=.9,
parameter_list=model.parameters())
inputs = [Input([None, 1, 28, 28], 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels)
if FLAGS.resume is not None:
model.load(FLAGS.resume)
for e in range(FLAGS.epoch):
train_loss = 0.0
val_loss = 0.0
print("======== train epoch {} ========".format(e))
for idx, batch in enumerate(train_loader()):
losses, metrics = model.train(batch[0], batch[1])
train_loss += np.sum(losses)
if idx % 10 == 0:
print("{:04d}: loss {:0.3f} top1: {:0.3f}% top2: {:0.3f}%".format(
idx, train_loss / (idx + 1), metrics[0][0], metrics[0][1]))
for metric in model._metrics:
res = metric.accumulate()
print("train epoch {:03d}: top1: {:0.3f}%, top2: {:0.3f}".format(e, res[0], res[1]))
metric.reset()
print("======== eval epoch {} ========".format(e))
for idx, batch in enumerate(val_loader()):
losses, metrics = model.eval(batch[0], batch[1])
val_loss += np.sum(losses)
if idx % 10 == 0:
print("{:04d}: loss {:0.3f} top1: {:0.3f}% top2: {:0.3f}%".format(
idx, val_loss / (idx + 1), metrics[0][0], metrics[0][1]))
for metric in model._metrics:
res = metric.accumulate()
print("eval epoch {:03d}: top1: {:0.3f}%, top2: {:0.3f}".format(e, res[0], res[1]))
metric.reset()
model.save('mnist_checkpoints/{:02d}'.format(e))
model.fit(train_loader, val_loader, epochs=FLAGS.epoch)
if __name__ == '__main__':
......
......@@ -140,6 +140,7 @@ class StaticGraphAdapter(object):
self._progs = {}
self._compiled_progs = {}
self._merge_count = {'eval': 0, 'test': 0}
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
......@@ -360,11 +361,16 @@ class StaticGraphAdapter(object):
metrics = []
for metric, state in zip(self.model._metrics, metric_states):
# cut off padding size
if self.model._dataset is not None and self._nranks > 1:
total_size = len(self.model._dataset)
if self.mode != 'train' and self.model._test_dataloader is not None and self._nranks > 1:
total_size = len(self.model._test_dataloader.dataset)
samples = state[0].shape[0]
if metric.count[0] + samples > total_size:
state = [s[:total_size - metric.count[0], ...] for s in state]
current_count = self._merge_count.get(self.mode, 0)
if current_count + samples > total_size:
state = [s[:total_size - current_count, ...] for s in state]
self._merge_count[self.mode] = 0
else:
self._merge_count[self.mode] += samples
metrics.append(metric.update(*state))
return (losses, metrics) if len(metrics) > 0 else losses
......@@ -422,7 +428,7 @@ class StaticGraphAdapter(object):
self.model._optimizer = fleet.distributed_optimizer(self.model._optimizer, strategy=dist_strategy)
self.model._optimizer.minimize(self._loss_endpoint)
if self._nranks > 1 and mode != 'train' and self.model._dataset is not None:
if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None:
outputs = [distributed._all_gather(o, self._nranks) for o in outputs]
if mode != 'test':
labels = [distributed._all_gather(l, self._nranks) for l in labels]
......@@ -471,8 +477,9 @@ class StaticGraphAdapter(object):
uninitialized = []
for var_py in self._startup_prog.list_vars():
var = fluid.global_scope().find_var(var_py.name)
if var and var.get_tensor()._is_initialized():
if not var_py.name.startswith('nccl_id') and var and var.get_tensor()._is_initialized():
continue
uninitialized.append(var_py)
if uninitialized:
startup_prog = self._startup_prog._prune(uninitialized)
......@@ -498,6 +505,7 @@ class DynamicGraphAdapter(object):
self.model = model
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
self._merge_count = {'eval': 0, 'test': 0}
if self._nranks > 1:
self.ddp_model = distributed.DistributedDataParallel(self.model)
......@@ -560,12 +568,16 @@ class DynamicGraphAdapter(object):
metrics = []
for metric in self.model._metrics:
# cut off padding value.
if self.model._dataset is not None and self._nranks > 1:
total_size = len(self.model._dataset)
if self.model._test_dataloader is not None and self._nranks > 1:
total_size = len(self.model._test_dataloader.dataset)
samples = outputs[0].shape[0]
if metric.count[0] + samples > total_size:
current_count = self._merge_count.get(self.mode, 0)
if current_count + samples > total_size:
outputs = [o[:total_size - metric.count[0]] for o in outputs]
labels = [l[:total_size - metric.count[0]] for l in labels]
self._merge_count[self.mode] = 0
else:
self._merge_count[self.mode] += samples
metric_outs = metric.add_metric_op(to_list(outputs), labels)
m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
......@@ -664,8 +676,9 @@ class Model(fluid.dygraph.Layer):
self._device = None
self._device_ids = None
self._optimizer = None
self._dataset = None
self._distributed_sampler = None
self._test_dataloader = None
if in_dygraph_mode():
self._adapter = DynamicGraphAdapter(self)
else:
......@@ -696,7 +709,6 @@ class Model(fluid.dygraph.Layer):
metrics=None,
inputs=None,
labels=None,
dataset=None,
device=None,
device_ids=None):
"""
......@@ -755,7 +767,7 @@ class Model(fluid.dygraph.Layer):
self._inputs = inputs
self._labels = labels
self._device = device
self._dataset = dataset
if device is None:
self._device = 'GPU' if fluid.is_compiled_with_cuda() else 'CPU'
self._device_ids = device_ids
......@@ -788,6 +800,7 @@ class Model(fluid.dygraph.Layer):
during training.
"""
do_eval = eval_loader is not None
self._test_dataloader = eval_loader
metrics_name = self._metrics_name()
cbks = config_callbacks(
callbacks,
......@@ -806,6 +819,12 @@ class Model(fluid.dygraph.Layer):
'metrics_name': metrics_name,
}
for step, data in enumerate(data_loader):
if not fluid.in_dygraph_mode():
data = data[0]
batch_size = data[0].shape()[0]
else:
batch_size = data[0].shape[0]
cbks.on_batch_begin(mode, step, logs)
if mode == 'train':
outs = self.train(*data)
......@@ -820,12 +839,13 @@ class Model(fluid.dygraph.Layer):
for metric in self._metrics:
res = metric.accumulate()
metrics.extend(to_list(res))
assert len(metrics_name) == len(metrics)
for k, v in zip(metrics_name, metrics):
logs[k] = v
logs['step'] = step
logs['batch_size'] = data[0].shape[0]
logs['batch_size'] = batch_size
cbks.on_batch_end(mode, step, logs)
self._reset_metrics()
......
......@@ -2,6 +2,7 @@ import sys
import time
import numpy as np
from distributed import get_local_rank
class ProgressBar(object):
"""progress bar """
......@@ -59,6 +60,7 @@ class ProgressBar(object):
else:
fps = ' - %.0fus/%s' % (time_per_unit * 1e6, 'step')
if get_local_rank() == 0:
info = ''
if self._verbose == 1:
prev_total_width = self._total_width
......
......@@ -18,6 +18,10 @@ from __future__ import print_function
import unittest
import os
import sys
sys.path.append('../')
import numpy as np
import contextlib
......@@ -27,7 +31,8 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from model import Model, CrossEntropy, Input, Loss
from metrics import Accuracy
from callbacks import ProgBarLogger
from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset
from distributed import *
class SimpleImgConvPool(fluid.dygraph.Layer):
def __init__(self,
......@@ -96,6 +101,7 @@ class MNIST(Model):
act="softmax")
def forward(self, inputs):
inputs = fluid.layers.reshape(inputs, [-1, 1, 28, 28])
x = self._simple_img_conv_pool_1(inputs)
x = self._simple_img_conv_pool_2(x)
x = fluid.layers.flatten(x, axis=1)
......@@ -137,24 +143,56 @@ class MyCrossEntropy(Loss):
return [loss1, loss2]
class CustromMnistDataset(MnistDataset):
def __init__(self,
image_filename=None,
label_filename=None,
mode='train',
download=True):
super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download)
def __getitem__(self, idx):
return self.images[idx], [self.labels[idx]]
class TestModel(unittest.TestCase):
def fit(self, dynamic, is_mlp=False):
im_shape = (-1, 784) if is_mlp else (-1, 1, 28, 28)
im_shape = (-1, 784)
guard = fluid.dygraph.guard() if dynamic else null_guard()
batch_size = 128
train_loader = fluid.io.xmap_readers(
lambda b: [np.array([x[0] for x in b]).reshape(im_shape),
np.array([x[1] for x in b]).reshape(-1, 1)],
paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4),
batch_size=batch_size, drop_last=True), 1, 1)
val_loader = fluid.io.xmap_readers(
lambda b: [np.array([x[0] for x in b]).reshape(im_shape),
np.array([x[1] for x in b]).reshape(-1, 1)],
paddle.batch(paddle.dataset.mnist.test(),
batch_size=batch_size, drop_last=False), 1, 1)
place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \
if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0)
guard = fluid.dygraph.guard(place) if dynamic else null_guard()
if fluid.dygraph.parallel.Env().nranks > 1:
prepare_context(place)
with guard:
inputs = [Input(im_shape, 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in inputs + labels]
train_dataset = CustromMnistDataset(mode='train')
val_dataset = CustromMnistDataset(mode='test')
if get_nranks() > 1:
train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_sampler = DistributedBatchSampler(val_dataset, batch_size=batch_size)
val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
else:
train_loader = DataLoader(train_dataset, batch_size=batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, places=place,
feed_list=feed_list, num_workers=4, return_list=True)
model = MNIST() if not is_mlp else MLP()
optim = fluid.optimizer.Momentum(
learning_rate=0.01,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册