diff --git a/callbacks.py b/callbacks.py index 24a42a18e0d7d88e3901fb0265a348cbc3628526..2e4d37d34e61f2f10a856bbead243f069eb6ed92 100644 --- a/callbacks.py +++ b/callbacks.py @@ -16,7 +16,7 @@ import six import copy from progressbar import ProgressBar - +from distributed import get_local_rank def config_callbacks(callbacks=None, model=None, @@ -193,7 +193,7 @@ class ProgBarLogger(Callback): self.steps = self.params['steps'] self.epoch = epoch self.train_step = 0 - if self.verbose and self.epochs: + if self.verbose and self.epochs and get_local_rank() == 0: print('Epoch %d/%d' % (epoch + 1, self.epochs)) self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose) @@ -230,7 +230,8 @@ class ProgBarLogger(Callback): self.evaled_samples = 0 self.eval_progbar = ProgressBar( num=self.eval_steps, verbose=self.verbose) - print('Eval begin...') + if get_local_rank() == 0: + print('Eval begin...') def on_eval_batch_end(self, step, logs=None): logs = logs or {} @@ -240,7 +241,7 @@ class ProgBarLogger(Callback): def on_eval_end(self, logs=None): logs = logs or {} - if self.verbose: + if self.verbose and get_local_rank() == 0: self._updates(logs, 'eval') print('Eval samples: %d' % (self.evaled_samples)) @@ -254,13 +255,13 @@ class ModelCheckpoint(Callback): self.epoch = epoch def on_epoch_end(self, epoch, logs=None): - if self.model and self.epoch % self.save_freq == 0: + if self.model and self.epoch % self.save_freq == 0 and get_local_rank() == 0: path = '{}/{}'.format(self.save_file, epoch) print('save checkpoint at {}'.format(path)) self.model.save(path) def on_train_end(self, logs=None): - if self.model: + if self.model and get_local_rank() == 0: path = '{}/final'.format(self.save_file) print('save checkpoint at {}'.format(path)) self.model.save(path) diff --git a/mnist.py b/mnist.py index 9871511133a0a2181e8b628f0299d0152129c722..00d12990fd7cd636cd4a2183a3e7ece54a641aff 100644 --- a/mnist.py +++ b/mnist.py @@ -28,7 +28,8 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from model import Model, CrossEntropy, Input from metrics import Accuracy - +from distributed import prepare_context, all_gather, Env, get_nranks, get_local_rank, DistributedBatchSampler, to_numpy +from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset class SimpleImgConvPool(fluid.dygraph.Layer): def __init__(self, @@ -97,6 +98,7 @@ class MNIST(Model): act="softmax") def forward(self, inputs): + inputs = fluid.layers.reshape(inputs, [-1, 1, 28, 28]) x = self._simple_img_conv_pool_1(inputs) x = self._simple_img_conv_pool_2(x) x = fluid.layers.flatten(x, axis=1) @@ -104,17 +106,17 @@ class MNIST(Model): return x -def accuracy(pred, label, topk=(1, )): - maxk = max(topk) - pred = np.argsort(pred)[:, ::-1][:, :maxk] - correct = (pred == np.repeat(label, maxk, 1)) +class CustromMnistDataset(MnistDataset): + def __init__(self, + image_filename=None, + label_filename=None, + mode='train', + download=True): + super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download) + - batch_size = label.shape[0] - res = [] - for k in topk: - correct_k = correct[:, :k].sum() - res.append(100.0 * correct_k / batch_size) - return res + def __getitem__(self, idx): + return self.images[idx], [self.labels[idx]] def main(): @@ -122,63 +124,64 @@ def main(): def null_guard(): yield - guard = fluid.dygraph.guard() if FLAGS.dynamic else null_guard() + place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \ + if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0) + guard = fluid.dygraph.guard(place) if FLAGS.dynamic else null_guard() + if fluid.dygraph.parallel.Env().nranks > 1: + prepare_context(place) + if not os.path.exists('mnist_checkpoints'): os.mkdir('mnist_checkpoints') - train_loader = fluid.io.xmap_readers( - lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28), - np.array([x[1] for x in b]).reshape(-1, 1)], - paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4), - batch_size=FLAGS.batch_size, drop_last=True), 1, 1) - val_loader = fluid.io.xmap_readers( - lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28), - np.array([x[1] for x in b]).reshape(-1, 1)], - paddle.batch(paddle.dataset.mnist.test(), - batch_size=FLAGS.batch_size, drop_last=True), 1, 1) + # train_loader = fluid.io.xmap_readers( + # lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28), + # np.array([x[1] for x in b]).reshape(-1, 1)], + # paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4), + # batch_size=FLAGS.batch_size, drop_last=True), 1, 1) + # val_loader = fluid.io.xmap_readers( + # lambda b: [np.array([x[0] for x in b]).reshape(-1, 1, 28, 28), + # np.array([x[1] for x in b]).reshape(-1, 1)], + # paddle.batch(paddle.dataset.mnist.test(), + # batch_size=FLAGS.batch_size, drop_last=True), 1, 1) with guard: + + train_dataset = CustromMnistDataset(mode='train') + val_dataset = CustromMnistDataset(mode='test') + + inputs = [Input([None, 784], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + if fluid.in_dygraph_mode(): + feed_list = None + else: + feed_list = [x.forward() for x in inputs + labels] + + if get_nranks() > 1: + train_sampler = DistributedBatchSampler(train_dataset, batch_size=FLAGS.batch_size, shuffle=True) + train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + val_sampler = DistributedBatchSampler(val_dataset, batch_size=FLAGS.batch_size) + val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + else: + train_loader = DataLoader(train_dataset, batch_size=FLAGS.batch_size, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + val_loader = DataLoader(val_dataset, batch_size=FLAGS.batch_size, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + model = MNIST() optim = Momentum( learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters()) - inputs = [Input([None, 1, 28, 28], 'float32', name='image')] - labels = [Input([None, 1], 'int64', name='label')] + model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels) if FLAGS.resume is not None: model.load(FLAGS.resume) - for e in range(FLAGS.epoch): - train_loss = 0.0 - val_loss = 0.0 - print("======== train epoch {} ========".format(e)) - for idx, batch in enumerate(train_loader()): - losses, metrics = model.train(batch[0], batch[1]) - - train_loss += np.sum(losses) - if idx % 10 == 0: - print("{:04d}: loss {:0.3f} top1: {:0.3f}% top2: {:0.3f}%".format( - idx, train_loss / (idx + 1), metrics[0][0], metrics[0][1])) - for metric in model._metrics: - res = metric.accumulate() - print("train epoch {:03d}: top1: {:0.3f}%, top2: {:0.3f}".format(e, res[0], res[1])) - metric.reset() - - print("======== eval epoch {} ========".format(e)) - for idx, batch in enumerate(val_loader()): - losses, metrics = model.eval(batch[0], batch[1]) - - val_loss += np.sum(losses) - if idx % 10 == 0: - print("{:04d}: loss {:0.3f} top1: {:0.3f}% top2: {:0.3f}%".format( - idx, val_loss / (idx + 1), metrics[0][0], metrics[0][1])) - for metric in model._metrics: - res = metric.accumulate() - print("eval epoch {:03d}: top1: {:0.3f}%, top2: {:0.3f}".format(e, res[0], res[1])) - metric.reset() - model.save('mnist_checkpoints/{:02d}'.format(e)) + model.fit(train_loader, val_loader, epochs=FLAGS.epoch) if __name__ == '__main__': diff --git a/model.py b/model.py index 7cf5d9df690e270ef53fdadb02680073303efc9b..f3ef27e4fbd0022ae8c6800a51ed0d2f298c21e0 100644 --- a/model.py +++ b/model.py @@ -140,6 +140,7 @@ class StaticGraphAdapter(object): self._progs = {} self._compiled_progs = {} + self._merge_count = {'eval': 0, 'test': 0} self._nranks = distributed.Env().nranks self._local_rank = distributed.Env().local_rank @@ -360,11 +361,16 @@ class StaticGraphAdapter(object): metrics = [] for metric, state in zip(self.model._metrics, metric_states): # cut off padding size - if self.model._dataset is not None and self._nranks > 1: - total_size = len(self.model._dataset) + if self.mode != 'train' and self.model._test_dataloader is not None and self._nranks > 1: + total_size = len(self.model._test_dataloader.dataset) samples = state[0].shape[0] - if metric.count[0] + samples > total_size: - state = [s[:total_size - metric.count[0], ...] for s in state] + current_count = self._merge_count.get(self.mode, 0) + if current_count + samples > total_size: + state = [s[:total_size - current_count, ...] for s in state] + self._merge_count[self.mode] = 0 + else: + self._merge_count[self.mode] += samples + metrics.append(metric.update(*state)) return (losses, metrics) if len(metrics) > 0 else losses @@ -422,7 +428,7 @@ class StaticGraphAdapter(object): self.model._optimizer = fleet.distributed_optimizer(self.model._optimizer, strategy=dist_strategy) self.model._optimizer.minimize(self._loss_endpoint) - if self._nranks > 1 and mode != 'train' and self.model._dataset is not None: + if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None: outputs = [distributed._all_gather(o, self._nranks) for o in outputs] if mode != 'test': labels = [distributed._all_gather(l, self._nranks) for l in labels] @@ -471,8 +477,9 @@ class StaticGraphAdapter(object): uninitialized = [] for var_py in self._startup_prog.list_vars(): var = fluid.global_scope().find_var(var_py.name) - if var and var.get_tensor()._is_initialized(): + if not var_py.name.startswith('nccl_id') and var and var.get_tensor()._is_initialized(): continue + uninitialized.append(var_py) if uninitialized: startup_prog = self._startup_prog._prune(uninitialized) @@ -498,6 +505,7 @@ class DynamicGraphAdapter(object): self.model = model self._nranks = distributed.Env().nranks self._local_rank = distributed.Env().local_rank + self._merge_count = {'eval': 0, 'test': 0} if self._nranks > 1: self.ddp_model = distributed.DistributedDataParallel(self.model) @@ -560,12 +568,16 @@ class DynamicGraphAdapter(object): metrics = [] for metric in self.model._metrics: # cut off padding value. - if self.model._dataset is not None and self._nranks > 1: - total_size = len(self.model._dataset) + if self.model._test_dataloader is not None and self._nranks > 1: + total_size = len(self.model._test_dataloader.dataset) samples = outputs[0].shape[0] - if metric.count[0] + samples > total_size: + current_count = self._merge_count.get(self.mode, 0) + if current_count + samples > total_size: outputs = [o[:total_size - metric.count[0]] for o in outputs] labels = [l[:total_size - metric.count[0]] for l in labels] + self._merge_count[self.mode] = 0 + else: + self._merge_count[self.mode] += samples metric_outs = metric.add_metric_op(to_list(outputs), labels) m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)]) @@ -664,8 +676,9 @@ class Model(fluid.dygraph.Layer): self._device = None self._device_ids = None self._optimizer = None - self._dataset = None self._distributed_sampler = None + self._test_dataloader = None + if in_dygraph_mode(): self._adapter = DynamicGraphAdapter(self) else: @@ -696,7 +709,6 @@ class Model(fluid.dygraph.Layer): metrics=None, inputs=None, labels=None, - dataset=None, device=None, device_ids=None): """ @@ -755,7 +767,7 @@ class Model(fluid.dygraph.Layer): self._inputs = inputs self._labels = labels self._device = device - self._dataset = dataset + if device is None: self._device = 'GPU' if fluid.is_compiled_with_cuda() else 'CPU' self._device_ids = device_ids @@ -788,6 +800,7 @@ class Model(fluid.dygraph.Layer): during training. """ do_eval = eval_loader is not None + self._test_dataloader = eval_loader metrics_name = self._metrics_name() cbks = config_callbacks( callbacks, @@ -806,6 +819,12 @@ class Model(fluid.dygraph.Layer): 'metrics_name': metrics_name, } for step, data in enumerate(data_loader): + if not fluid.in_dygraph_mode(): + data = data[0] + batch_size = data[0].shape()[0] + else: + batch_size = data[0].shape[0] + cbks.on_batch_begin(mode, step, logs) if mode == 'train': outs = self.train(*data) @@ -820,12 +839,13 @@ class Model(fluid.dygraph.Layer): for metric in self._metrics: res = metric.accumulate() metrics.extend(to_list(res)) + assert len(metrics_name) == len(metrics) for k, v in zip(metrics_name, metrics): logs[k] = v logs['step'] = step - logs['batch_size'] = data[0].shape[0] + logs['batch_size'] = batch_size cbks.on_batch_end(mode, step, logs) self._reset_metrics() diff --git a/progressbar.py b/progressbar.py index 123ff6f27c577b4b30a8f2b8fd43936a85e1612d..1f07424df3242ab9e44841ffb9f962aa817ba18b 100644 --- a/progressbar.py +++ b/progressbar.py @@ -2,6 +2,7 @@ import sys import time import numpy as np +from distributed import get_local_rank class ProgressBar(object): """progress bar """ @@ -59,105 +60,106 @@ class ProgressBar(object): else: fps = ' - %.0fus/%s' % (time_per_unit * 1e6, 'step') - info = '' - if self._verbose == 1: - prev_total_width = self._total_width + if get_local_rank() == 0: + info = '' + if self._verbose == 1: + prev_total_width = self._total_width - if self._dynamic_display: - sys.stdout.write('\b' * prev_total_width) - sys.stdout.write('\r') - else: - sys.stdout.write('\n') - - if self._num is not None: - numdigits = int(np.log10(self._num)) + 1 - - bar_chars = ('step %' + str(numdigits) + 'd/%d [') % ( - current_num, self._num) - prog = float(current_num) / self._num - prog_width = int(self._width * prog) - - if prog_width > 0: - bar_chars += ('=' * (prog_width - 1)) - if current_num < self._num: - bar_chars += '>' - else: - bar_chars += '=' - bar_chars += ('.' * (self._width - prog_width)) - bar_chars += ']' - else: - bar_chars = 'step %3d' % current_num - - self._total_width = len(bar_chars) - sys.stdout.write(bar_chars) - - for k, val in values: - info += ' - %s:' % k - val = val if isinstance(val, list) else [val] - for i, v in enumerate(val): - if isinstance(v, (float, np.float32, np.float64)): - if abs(v) > 1e-3: - info += ' %.4f' % v - else: - info += ' %.4e' % v - else: - info += ' %s' % v - - if self._num is not None and current_num < self._num: - eta = time_per_unit * (self._num - current_num) - if eta > 3600: - eta_format = '%d:%02d:%02d' % (eta // 3600, (eta % 3600) // - 60, eta % 60) - elif eta > 60: - eta_format = '%d:%02d' % (eta // 60, eta % 60) + if self._dynamic_display: + sys.stdout.write('\b' * prev_total_width) + sys.stdout.write('\r') else: - eta_format = '%ds' % eta + sys.stdout.write('\n') - info += ' - ETA: %s' % eta_format + if self._num is not None: + numdigits = int(np.log10(self._num)) + 1 - info += fps - self._total_width += len(info) - if prev_total_width > self._total_width: - info += (' ' * (prev_total_width - self._total_width)) + bar_chars = ('step %' + str(numdigits) + 'd/%d [') % ( + current_num, self._num) + prog = float(current_num) / self._num + prog_width = int(self._width * prog) - # newline for another epoch - if self._num is not None and current_num >= self._num: - info += '\n' - if self._num is None: - info += '\n' - - sys.stdout.write(info) - sys.stdout.flush() - self._last_update = now - elif self._verbose == 2: - if self._num: - numdigits = int(np.log10(self._num)) + 1 - count = ('step %' + str(numdigits) + 'd/%d') % (current_num, - self._num) - else: - count = 'step %3d' % current_num - info = count + info - - for k, val in values: - info += ' - %s:' % k - val = val if isinstance(val, list) else [val] - for v in val: - if isinstance(v, (float, np.float32, np.float64)): - if abs(v) > 1e-3: - info += ' %.4f' % v + if prog_width > 0: + bar_chars += ('=' * (prog_width - 1)) + if current_num < self._num: + bar_chars += '>' else: - info += ' %.4e' % v - elif isinstance(v, np.ndarray) and \ - isinstance(v.size, 1) and \ - isinstance(v.dtype, (np.float32, np.float64)): - if abs(v[0]) > 1e-3: - info += ' %.4f' % v[0] + bar_chars += '=' + bar_chars += ('.' * (self._width - prog_width)) + bar_chars += ']' + else: + bar_chars = 'step %3d' % current_num + + self._total_width = len(bar_chars) + sys.stdout.write(bar_chars) + + for k, val in values: + info += ' - %s:' % k + val = val if isinstance(val, list) else [val] + for i, v in enumerate(val): + if isinstance(v, (float, np.float32, np.float64)): + if abs(v) > 1e-3: + info += ' %.4f' % v + else: + info += ' %.4e' % v else: - info += ' %.4e' % v[0] + info += ' %s' % v + + if self._num is not None and current_num < self._num: + eta = time_per_unit * (self._num - current_num) + if eta > 3600: + eta_format = '%d:%02d:%02d' % (eta // 3600, (eta % 3600) // + 60, eta % 60) + elif eta > 60: + eta_format = '%d:%02d' % (eta // 60, eta % 60) else: - info += ' %s' % v + eta_format = '%ds' % eta + + info += ' - ETA: %s' % eta_format + + info += fps + self._total_width += len(info) + if prev_total_width > self._total_width: + info += (' ' * (prev_total_width - self._total_width)) + + # newline for another epoch + if self._num is not None and current_num >= self._num: + info += '\n' + if self._num is None: + info += '\n' + + sys.stdout.write(info) + sys.stdout.flush() + self._last_update = now + elif self._verbose == 2: + if self._num: + numdigits = int(np.log10(self._num)) + 1 + count = ('step %' + str(numdigits) + 'd/%d') % (current_num, + self._num) + else: + count = 'step %3d' % current_num + info = count + info + + for k, val in values: + info += ' - %s:' % k + val = val if isinstance(val, list) else [val] + for v in val: + if isinstance(v, (float, np.float32, np.float64)): + if abs(v) > 1e-3: + info += ' %.4f' % v + else: + info += ' %.4e' % v + elif isinstance(v, np.ndarray) and \ + isinstance(v.size, 1) and \ + isinstance(v.dtype, (np.float32, np.float64)): + if abs(v[0]) > 1e-3: + info += ' %.4f' % v[0] + else: + info += ' %.4e' % v[0] + else: + info += ' %s' % v - info += fps - info += '\n' - sys.stdout.write(info) - sys.stdout.flush() + info += fps + info += '\n' + sys.stdout.write(info) + sys.stdout.flush() diff --git a/tests/test_model.py b/tests/test_model.py index 13e7981f37deb57b9028995d2a08120c51225ade..02e7bd3589996d98fbbc2f3813cb7dce2cded882 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -18,6 +18,10 @@ from __future__ import print_function import unittest import os + +import sys +sys.path.append('../') + import numpy as np import contextlib @@ -27,7 +31,8 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from model import Model, CrossEntropy, Input, Loss from metrics import Accuracy from callbacks import ProgBarLogger - +from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset +from distributed import * class SimpleImgConvPool(fluid.dygraph.Layer): def __init__(self, @@ -96,6 +101,7 @@ class MNIST(Model): act="softmax") def forward(self, inputs): + inputs = fluid.layers.reshape(inputs, [-1, 1, 28, 28]) x = self._simple_img_conv_pool_1(inputs) x = self._simple_img_conv_pool_2(x) x = fluid.layers.flatten(x, axis=1) @@ -137,24 +143,56 @@ class MyCrossEntropy(Loss): return [loss1, loss2] +class CustromMnistDataset(MnistDataset): + def __init__(self, + image_filename=None, + label_filename=None, + mode='train', + download=True): + super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download) + + + def __getitem__(self, idx): + return self.images[idx], [self.labels[idx]] + + + class TestModel(unittest.TestCase): def fit(self, dynamic, is_mlp=False): - im_shape = (-1, 784) if is_mlp else (-1, 1, 28, 28) + im_shape = (-1, 784) guard = fluid.dygraph.guard() if dynamic else null_guard() batch_size = 128 - train_loader = fluid.io.xmap_readers( - lambda b: [np.array([x[0] for x in b]).reshape(im_shape), - np.array([x[1] for x in b]).reshape(-1, 1)], - paddle.batch(fluid.io.shuffle(paddle.dataset.mnist.train(), 6e4), - batch_size=batch_size, drop_last=True), 1, 1) - val_loader = fluid.io.xmap_readers( - lambda b: [np.array([x[0] for x in b]).reshape(im_shape), - np.array([x[1] for x in b]).reshape(-1, 1)], - paddle.batch(paddle.dataset.mnist.test(), - batch_size=batch_size, drop_last=False), 1, 1) + place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \ + if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0) + guard = fluid.dygraph.guard(place) if dynamic else null_guard() + if fluid.dygraph.parallel.Env().nranks > 1: + prepare_context(place) + with guard: inputs = [Input(im_shape, 'float32', name='image')] labels = [Input([None, 1], 'int64', name='label')] + + if fluid.in_dygraph_mode(): + feed_list = None + else: + feed_list = [x.forward() for x in inputs + labels] + train_dataset = CustromMnistDataset(mode='train') + val_dataset = CustromMnistDataset(mode='test') + + if get_nranks() > 1: + train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True) + train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + val_sampler = DistributedBatchSampler(val_dataset, batch_size=batch_size) + val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + else: + train_loader = DataLoader(train_dataset, batch_size=batch_size, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + val_loader = DataLoader(val_dataset, batch_size=batch_size, places=place, + feed_list=feed_list, num_workers=4, return_list=True) + + model = MNIST() if not is_mlp else MLP() optim = fluid.optimizer.Momentum( learning_rate=0.01,