From db5f36976fcb4eee5c82d607509854728105dcab Mon Sep 17 00:00:00 2001 From: LielinJiang Date: Wed, 29 Apr 2020 03:19:23 +0000 Subject: [PATCH] refine callback, add unittest for callback --- hapi/callbacks.py | 178 +++++++++++++++++++++++--- hapi/model.py | 237 +++++++++++++++++++++-------------- hapi/tests/test_callbacks.py | 57 +++++++-- 3 files changed, 347 insertions(+), 125 deletions(-) diff --git a/hapi/callbacks.py b/hapi/callbacks.py index 62d6402..014fd30 100644 --- a/hapi/callbacks.py +++ b/hapi/callbacks.py @@ -33,7 +33,7 @@ def config_callbacks(callbacks=None, cbks = callbacks or [] cbks = cbks if isinstance(cbks, (list, tuple)) else [cbks] if not any(isinstance(k, ProgBarLogger) for k in cbks) and verbose: - cbks = cbks + [ProgBarLogger(log_freq, verbose=verbose)] + cbks = [ProgBarLogger(log_freq, verbose=verbose)] + cbks if not any(isinstance(k, ModelCheckpoint) for k in cbks): cbks = cbks + [ModelCheckpoint(save_freq, save_dir)] @@ -110,6 +110,9 @@ class CallbackList(object): class Callback(object): + """Base class used to build new callbacks. + """ + def __init__(self): self.model = None self.params = {} @@ -121,63 +124,115 @@ class Callback(object): self.model = model def on_train_begin(self, logs=None): - """ + """Called at the start of train. """ def on_train_end(self, logs=None): - """ + """Called at the end of train. """ def on_eval_begin(self, logs=None): - """ + """Called at the start of eval. """ def on_eval_end(self, logs=None): - """ + """Called at the end of train. """ def on_test_begin(self, logs=None): - """ + """Called at the test of begin. """ def on_test_end(self, logs=None): - """ + """Called at the test of end. """ def on_epoch_begin(self, epoch, logs=None): - """ + """Called at the beginning of each epoch. """ def on_epoch_end(self, epoch, logs=None): - """ + """Called at the end of each epoch. """ def on_train_batch_begin(self, step, logs=None): - """ + """Called at the beginning of each batch in `train` mode. """ def on_train_batch_end(self, step, logs=None): - """ + """Called at the end of each batch in `train` mode. """ def on_eval_batch_begin(self, step, logs=None): - """ + """Called at the beginning of each batch in `eval` mode. """ def on_eval_batch_end(self, step, logs=None): - """ + """Called at the end of each batch in `eval` mode. """ - def on_eval_batch_begin(self, step, logs=None): - """ + def on_test_batch_begin(self, step, logs=None): + """Called at the beginning of each batch in `test` mode. """ - def on_eval_batch_end(self, step, logs=None): - """ + def on_test_batch_end(self, step, logs=None): + """Called at the end of each batch in `test` mode. """ class ProgBarLogger(Callback): + """Logger callback function + Args: + log_freq (int): The frequency, in number of steps, the training logs + are printed. Default: 1. + verbose (int): The verbosity mode, should be 0, 1, or 2. + 0 = silent, 1 = progress bar, 2 = one line per epoch. Default: 2. + + Examples: + .. code-block:: python + + import numpy as np + from paddle import fluid + from hapi.metrics import Accuracy + from hapi.loss import CrossEntropy + from hapi.datasets import MNIST + from hapi.vision.transforms import Compose, Resize + from hapi.vision.models import LeNet + from hapi.callbacks import ProgBarLogger + from hapi.model import Input, set_device + + class MnistDataset(MNIST): + def __init__(self, mode, return_label=True): + super(MnistDataset, self).__init__(mode=mode) + self.return_label = return_label + + def __getitem__(self, idx): + img = np.reshape(self.images[idx], [1, 28, 28]) + if self.return_label: + return img, np.array(self.labels[idx]).astype('int64') + return img, + + def __len__(self): + return len(self.images) + + inputs = [Input([-1, 1, 28, 28], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + train_dataset = MnistDataset(mode='train') + + model = LeNet() + + optim = fluid.optimizer.Adam(0.001) + model.prepare(optimizer=optim, + loss_function=CrossEntropy(), + metrics=Accuracy(), + inputs=inputs, + labels=labels) + + callback = ProgBarLogger(log_freq=10) + model.fit(train_dataset, batch_size=64, callbacks=callback) + """ + def __init__(self, log_freq=1, verbose=2): self.epochs = None self.steps = None @@ -204,9 +259,11 @@ class ProgBarLogger(Callback): metrics = getattr(self, '%s_metrics' % (mode)) progbar = getattr(self, '%s_progbar' % (mode)) steps = getattr(self, '%s_step' % (mode)) + for k in metrics: if k in logs: values.append((k, logs[k])) + progbar.update(steps, values) def on_train_batch_end(self, step, logs=None): @@ -220,7 +277,9 @@ class ProgBarLogger(Callback): def on_epoch_end(self, epoch, logs=None): logs = logs or {} - if self.train_step % self.log_freq != 0 and self.verbose and ParallelEnv( + if self.verbose == 1 and ParallelEnv().local_rank == 0: + self._updates(logs, 'train') + elif self.train_step % self.log_freq != 0 and self.verbose and ParallelEnv( ).local_rank == 0: self._updates(logs, 'train') @@ -229,6 +288,7 @@ class ProgBarLogger(Callback): self.eval_metrics = logs.get('metrics_name', []) self.eval_step = 0 self.evaled_samples = 0 + self.eval_progbar = ProgressBar( num=self.eval_steps, verbose=self.verbose) if ParallelEnv().local_rank == 0: @@ -245,15 +305,95 @@ class ProgBarLogger(Callback): if self.eval_steps is None or self.eval_step < self.eval_steps: self._updates(logs, 'eval') + def on_test_begin(self, logs=None): + self.test_steps = logs.get('steps', None) + self.test_metrics = logs.get('metrics_name', []) + self.test_step = 0 + self.tested_samples = 0 + self.test_progbar = ProgressBar( + num=self.test_steps, verbose=self.verbose) + if ParallelEnv().local_rank == 0: + print('Predict begin...') + + def on_test_batch_end(self, step, logs=None): + logs = logs or {} + self.test_step += 1 + samples = logs.get('batch_size', 1) + self.tested_samples += samples + + if self.test_step % self.log_freq == 0 and self.verbose and ParallelEnv( + ).local_rank == 0: + if self.test_steps is None or self.test_step < self.test_steps: + self._updates(logs, 'test') + def on_eval_end(self, logs=None): logs = logs or {} if self.verbose and ParallelEnv().local_rank == 0: - if self.eval_step % self.log_freq != 0: + if self.eval_step % self.log_freq != 0 or self.verbose == 1: self._updates(logs, 'eval') print('Eval samples: %d' % (self.evaled_samples)) + def on_test_end(self, logs=None): + logs = logs or {} + if self.verbose and ParallelEnv().local_rank == 0: + if self.test_step % self.log_freq != 0 or self.verbose == 1: + self._updates(logs, 'test') + print('Predict samples: %d' % (self.tested_samples)) + class ModelCheckpoint(Callback): + """Model checkpoint callback function + Args: + save_freq(int): The frequency, in number of epochs, the model checkpoint + are saved. Default: 1. + save_dir(str|None): The directory to save checkpoint during training. + If None, will not save checkpoint. Default: None. + + Examples: + .. code-block:: python + + import numpy as np + from paddle import fluid + from hapi.metrics import Accuracy + from hapi.loss import CrossEntropy + from hapi.datasets import MNIST + from hapi.vision.transforms import Compose, Resize + from hapi.vision.models import LeNet + from hapi.callbacks import ModelCheckpoint + from hapi.model import Input, set_device + + class MnistDataset(MNIST): + def __init__(self, mode, return_label=True): + super(MnistDataset, self).__init__(mode=mode) + self.return_label = return_label + + def __getitem__(self, idx): + img = np.reshape(self.images[idx], [1, 28, 28]) + if self.return_label: + return img, np.array(self.labels[idx]).astype('int64') + return img, + + def __len__(self): + return len(self.images) + + inputs = [Input([-1, 1, 28, 28], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + train_dataset = MnistDataset(mode='train') + + model = LeNet() + + optim = fluid.optimizer.Adam(0.001) + model.prepare(optimizer=optim, + loss_function=CrossEntropy(), + metrics=Accuracy(), + inputs=inputs, + labels=labels) + + callback = ModelCheckpoint(save_dir='./temp') + model.fit(train_dataset, batch_size=64, callbacks=callback) + """ + def __init__(self, save_freq=1, save_dir=None): self.save_freq = save_freq self.save_dir = save_dir diff --git a/hapi/model.py b/hapi/model.py index 8c1c521..3d91527 100644 --- a/hapi/model.py +++ b/hapi/model.py @@ -20,7 +20,6 @@ import pickle import numpy as np import six import warnings -import tqdm from collections import Iterable from paddle import fluid @@ -859,7 +858,6 @@ class Model(fluid.dygraph.Layer): num_workers=0, callbacks=None, ): """ - FIXME: add more comments and usage Args: train_data (Dataset|DataLoader): An iterable data loader is used for train. An instance of paddle paddle.io.Dataset or @@ -897,11 +895,6 @@ class Model(fluid.dygraph.Layer): assert train_data is not None, \ "train_data must be given!" - if fluid.in_dygraph_mode(): - feed_list = None - else: - feed_list = [x.forward() for x in self._inputs + self._labels] - if isinstance(train_data, Dataset): train_sampler = DistributedBatchSampler( train_data, @@ -912,7 +905,6 @@ class Model(fluid.dygraph.Layer): train_data, batch_sampler=train_sampler, places=self._place, - feed_list=feed_list, num_workers=num_workers, return_list=True) else: @@ -925,7 +917,6 @@ class Model(fluid.dygraph.Layer): eval_data, batch_sampler=eval_sampler, places=self._place, - feed_list=feed_list, num_workers=num_workers, return_list=True) elif eval_data is not None: @@ -935,7 +926,7 @@ class Model(fluid.dygraph.Layer): do_eval = eval_loader is not None self._test_dataloader = eval_loader - metrics_name = self._metrics_name() + steps = self._len_data_loader(train_loader) cbks = config_callbacks( callbacks, @@ -951,26 +942,22 @@ class Model(fluid.dygraph.Layer): cbks.on_begin('train') for epoch in range(epochs): - # FIXME: adapt to DataLoader loader = train_loader - if not isinstance(train_loader, Iterable): - loader = train_loader() - logs = self._run_one_epoch( - loader, cbks, 'train', metrics_name, epoch=epoch) + + cbks.on_epoch_begin(epoch) + logs = self._run_one_epoch(loader, cbks, 'train') + cbks.on_epoch_end(epoch, logs) if do_eval and epoch % eval_freq == 0: - # FIXME: adapt to DataLoader loader = eval_loader - if not isinstance(eval_loader, Iterable): - loader = eval_loader() eval_steps = self._len_data_loader(loader) cbks.on_begin('eval', { 'steps': eval_steps, - 'metrics_name': metrics_name + 'metrics_name': self._metrics_name() }) - logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name) + logs = self._run_one_epoch(loader, cbks, 'eval') cbks.on_end('eval', logs) @@ -986,7 +973,6 @@ class Model(fluid.dygraph.Layer): num_workers=0, callbacks=None, ): """ - FIXME: add more comments and usage Args: eval_data (Dataset|DataLoader): An iterable data loader is used for evaluation. An instance of paddle.io.Dataset or @@ -1006,12 +992,55 @@ class Model(fluid.dygraph.Layer): are automatically inserted. Default: None. Returns: dict: Result of metric. - """ - if fluid.in_dygraph_mode(): - feed_list = None - else: - feed_list = [x.forward() for x in self._inputs + self._labels] + Examples: + .. code-block:: python + + # declarative mode + import numpy as np + from hapi.metrics import Accuracy + from hapi.datasets import MNIST + from hapi.vision.transforms import Compose,Resize + from hapi.vision.models import LeNet + from hapi.model import Input, set_device + + + class MnistDataset(MNIST): + def __init__(self, mode, return_label=True): + super(MnistDataset, self).__init__(mode=mode) + self.return_label = return_label + + def __getitem__(self, idx): + img = np.reshape(self.images[idx], [1, 28, 28]) + if self.return_label: + return img, np.array(self.labels[idx]).astype('int64') + return img, + + def __len__(self): + return len(self.images) + + inputs = [Input([-1, 1, 28, 28], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + val_dataset = MnistDataset(mode='test') + + model = LeNet() + model.prepare(metrics=Accuracy(), inputs=inputs, labels=labels) + + result = model.evaluate(val_dataset, batch_size=64) + print(result) + + # imperative mode + import paddle.fluid.dygraph as dg + place = set_device('cpu') + with dg.guard(place) as g: + model = LeNet() + model.prepare(metrics=Accuracy(), inputs=inputs, labels=labels) + + result = model.evaluate(val_dataset, batch_size=64) + print(result) + + """ if eval_data is not None and isinstance(eval_data, Dataset): eval_sampler = DistributedBatchSampler( @@ -1020,14 +1049,12 @@ class Model(fluid.dygraph.Layer): eval_data, batch_sampler=eval_sampler, places=self._place, - feed_list=feed_list, num_workers=num_workers, return_list=True) else: eval_loader = eval_data self._test_dataloader = eval_loader - metrics_name = self._metrics_name() cbks = config_callbacks( callbacks, @@ -1037,15 +1064,14 @@ class Model(fluid.dygraph.Layer): metrics=self._metrics_name(), ) loader = eval_loader - if not isinstance(eval_loader, Iterable): - loader = eval_loader() eval_steps = self._len_data_loader(loader) - cbks.on_begin('eval', - {'steps': eval_steps, - 'metrics_name': metrics_name}) + cbks.on_begin('eval', { + 'steps': eval_steps, + 'metrics_name': self._metrics_name() + }) - logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name) + logs = self._run_one_epoch(loader, cbks, 'eval') cbks.on_end('eval', logs) @@ -1061,9 +1087,9 @@ class Model(fluid.dygraph.Layer): test_data, batch_size=1, num_workers=0, - stack_outputs=False): + stack_outputs=False, + callbacks=None): """ - FIXME: add more comments and usage Args: test_data (Dataset|DataLoader): An iterable data loader is used for predict. An instance of paddle.io.Dataset or paddle.io.Dataloader @@ -1082,12 +1108,53 @@ class Model(fluid.dygraph.Layer): it is recommended set as True if outputs contains no LoDTensor. Default: False. Returns: list: output of models. - """ - if fluid.in_dygraph_mode(): - feed_list = None - else: - feed_list = [x.forward() for x in self._inputs] + Examples: + .. code-block:: python + + # declarative mode + import numpy as np + from hapi.metrics import Accuracy + from hapi.datasets import MNIST + from hapi.vision.transforms import Compose,Resize + from hapi.vision.models import LeNet + from hapi.model import Input, set_device + + class MnistDataset(MNIST): + def __init__(self, mode, return_label=True): + super(MnistDataset, self).__init__(mode=mode) + self.return_label = return_label + + def __getitem__(self, idx): + img = np.reshape(self.images[idx], [1, 28, 28]) + if self.return_label: + return img, np.array(self.labels[idx]).astype('int64') + return img, + + def __len__(self): + return len(self.images) + + inputs = [Input([-1, 1, 28, 28], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + test_dataset = MnistDataset(mode='test', return_label=False) + + model = LeNet() + model.prepare(metrics=Accuracy(), inputs=inputs, labels=labels) + + result = model.predict(test_dataset, batch_size=64) + print(result) + + # imperative mode + import paddle.fluid.dygraph as dg + place = set_device('cpu') + with dg.guard(place) as g: + model = LeNet() + model.prepare(metrics=Accuracy(), inputs=inputs, labels=labels) + + result = model.predict(test_dataset, batch_size=64) + print(result) + """ if test_data is not None and isinstance(test_data, Dataset): test_sampler = DistributedBatchSampler( @@ -1096,7 +1163,6 @@ class Model(fluid.dygraph.Layer): test_data, batch_sampler=test_sampler, places=self._place, - feed_list=feed_list, num_workers=num_workers, return_list=True) else: @@ -1105,33 +1171,28 @@ class Model(fluid.dygraph.Layer): self._test_dataloader = test_loader loader = test_loader - if not isinstance(test_loader, Iterable): - loader = test_loader() - outputs = [] - count = 0 - for data in tqdm.tqdm(loader): - data = flatten(data) - out = to_list(self.test_batch(data[:len(self._inputs)])) - outputs.append(out) - count += out[0].shape[0] + cbks = config_callbacks(callbacks, model=self, verbose=1) - if test_loader is not None and self._adapter._nranks > 1 \ - and isinstance(test_loader, DataLoader) \ - and count > len(test_loader.dataset): - size = outputs[-1][0].shape[0] - (count - len(test_loader.dataset)) - outputs[-1] = [o[:size] for o in outputs[-1]] + test_steps = self._len_data_loader(loader) + logs = {'steps': test_steps} - # NOTE: for lod tensor output, we should not stack outputs - # for stacking may loss its detail info + cbks.on_begin('test', logs) + + outputs = [] + + logs, outputs = self._run_one_epoch(loader, cbks, 'test') outputs = list(zip(*outputs)) + # NOTE: for lod tensor output, we should not stack outputs + # for stacking may loss its detail info if stack_outputs: outputs = [np.vstack(outs) for outs in outputs] self._test_dataloader = None + cbks.on_end('test', logs) return outputs def save_inference_model(self, @@ -1179,22 +1240,8 @@ class Model(fluid.dygraph.Layer): params_filename=params_filename, program_only=program_only) - def _run_one_epoch(self, - data_loader, - callbacks, - mode, - metrics_name, - epoch=None): - size = self._len_data_loader(data_loader) - logs = { - 'steps': size, - 'metrics_name': metrics_name, - } - - if mode == 'train': - assert epoch is not None, 'when mode is train, epoch must be given' - callbacks.on_epoch_begin(epoch) - + def _run_one_epoch(self, data_loader, callbacks, mode, logs={}): + outputs = [] for step, data in enumerate(data_loader): # data might come from different types of data_loader and have # different format, as following: @@ -1214,25 +1261,25 @@ class Model(fluid.dygraph.Layer): 0].shape) else data[0].shape[0] callbacks.on_batch_begin(mode, step, logs) - if mode == 'train': - outs = self.train_batch(data[:len(self._inputs)], - data[len(self._inputs):]) - else: - outs = self.eval_batch(data[:len(self._inputs)], - data[len(self._inputs):]) - # losses - loss = outs[0] if self._metrics else outs - metrics = [[l[0] for l in loss]] - - # metrics - for metric in self._metrics: - res = metric.accumulate() - metrics.extend(to_list(res)) - - assert len(metrics_name) == len(metrics) - for k, v in zip(metrics_name, metrics): - logs[k] = v + if mode != 'test': + outs = getattr(self, mode + '_batch')(data[:len(self._inputs)], + data[len(self._inputs):]) + # losses + loss = outs[0] if self._metrics else outs + metrics = [[l[0] for l in loss]] + + # metrics + for metric in self._metrics: + res = metric.accumulate() + metrics.extend(to_list(res)) + + assert len(self._metrics_name()) == len(metrics) + for k, v in zip(self._metrics_name(), metrics): + logs[k] = v + else: + outs = getattr(self, mode + '_batch')(data) + outputs.append(outs) logs['step'] = step if mode == 'train' or self._adapter._merge_count.get( @@ -1245,10 +1292,8 @@ class Model(fluid.dygraph.Layer): callbacks.on_batch_end(mode, step, logs) self._reset_metrics() - if mode == 'train': - assert epoch is not None, 'when mode is train, epoch must be given' - callbacks.on_epoch_end(epoch, logs) - + if mode == 'test': + return logs, outputs return logs def _reset_metrics(self): diff --git a/hapi/tests/test_callbacks.py b/hapi/tests/test_callbacks.py index b9f42d9..bb0e6f2 100644 --- a/hapi/tests/test_callbacks.py +++ b/hapi/tests/test_callbacks.py @@ -12,27 +12,43 @@ # See the License for the specific language governing permissions and # limitations under the License. -# when test, you should add hapi root path to the PYTHONPATH, -# export PYTHONPATH=PATH_TO_HAPI:$PYTHONPATH import unittest import time import random +import tempfile +import shutil +from hapi.model import Input +from hapi.vision.models import LeNet from hapi.callbacks import config_callbacks class TestCallbacks(unittest.TestCase): - def test_callback(self): + def setUp(self): + self.save_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.save_dir) + + def run_callback(self): epochs = 2 steps = 50 - freq = 1 + freq = 2 eval_steps = 20 + + lenet = LeNet() + inputs = [Input([None, 1, 28, 28], 'float32', name='image')] + lenet.prepare(inputs=inputs) + cbks = config_callbacks( + model=lenet, batch_size=128, epochs=epochs, steps=steps, - verbose=2, - metrics=['loss', 'acc'], ) + log_freq=freq, + verbose=self.verbose, + metrics=['loss', 'acc'], + save_dir=self.save_dir) cbks.on_begin('train') logs = {'loss': 50.341673, 'acc': 0.00256} @@ -48,13 +64,12 @@ class TestCallbacks(unittest.TestCase): eval_logs = {'eval_loss': 20.341673, 'eval_acc': 0.256} params = { - 'eval_steps': eval_steps, - 'eval_metrics': ['eval_loss', 'eval_acc'], - 'log_freq': 10, + 'steps': eval_steps, + 'metrics_name': ['eval_loss', 'eval_acc'], } cbks.on_begin('eval', params) for step in range(eval_steps): - cbks.on_batch_begin('eval', step, logs) + cbks.on_batch_begin('eval', step, eval_logs) eval_logs['eval_loss'] -= random.random() * 0.1 eval_logs['eval_acc'] += random.random() * 0.1 eval_logs['batch_size'] = 2 @@ -62,8 +77,30 @@ class TestCallbacks(unittest.TestCase): cbks.on_batch_end('eval', step, eval_logs) cbks.on_end('eval', eval_logs) + test_logs = {} + params = {'steps': eval_steps} + cbks.on_begin('test', params) + for step in range(eval_steps): + cbks.on_batch_begin('test', step, test_logs) + test_logs['batch_size'] = 2 + time.sleep(0.005) + cbks.on_batch_end('test', step, test_logs) + cbks.on_end('test', test_logs) + cbks.on_end('train') + def test_callback_verbose_0(self): + self.verbose = 0 + self.run_callback() + + def test_callback_verbose_1(self): + self.verbose = 1 + self.run_callback() + + def test_callback_verbose_2(self): + self.verbose = 2 + self.run_callback() + if __name__ == '__main__': unittest.main() -- GitLab