diff --git a/callbacks.py b/callbacks.py index b31e4e33e601aed1b4201aeb1c1c913be72e9ef7..66690cf288efe8ba0d8dcc9eec64031674c8a18b 100644 --- a/callbacks.py +++ b/callbacks.py @@ -16,7 +16,7 @@ import six import copy from progressbar import ProgressBar -from distributed import get_local_rank +from paddle.fluid.dygraph.parallel import ParallelEnv def config_callbacks(callbacks=None, @@ -195,7 +195,7 @@ class ProgBarLogger(Callback): self.steps = self.params['steps'] self.epoch = epoch self.train_step = 0 - if self.verbose and self.epochs and get_local_rank() == 0: + if self.verbose and self.epochs and ParallelEnv().local_rank == 0: print('Epoch %d/%d' % (epoch + 1, self.epochs)) self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose) @@ -213,8 +213,8 @@ class ProgBarLogger(Callback): logs = logs or {} self.train_step += 1 - if self.train_step % self.log_freq == 0 and self.verbose and get_local_rank( - ) == 0: + if self.train_step % self.log_freq == 0 and self.verbose and ParallelEnv( + ).local_rank == 0: # if steps is not None, last step will update in on_epoch_end if self.steps and self.train_step < self.steps: self._updates(logs, 'train') @@ -223,7 +223,7 @@ class ProgBarLogger(Callback): def on_epoch_end(self, epoch, logs=None): logs = logs or {} - if self.verbose and get_local_rank() == 0: + if self.verbose and ParallelEnv().local_rank == 0: self._updates(logs, 'train') def on_eval_begin(self, logs=None): @@ -233,7 +233,7 @@ class ProgBarLogger(Callback): self.evaled_samples = 0 self.eval_progbar = ProgressBar( num=self.eval_steps, verbose=self.verbose) - if get_local_rank() == 0: + if ParallelEnv().local_rank == 0: print('Eval begin...') def on_eval_batch_end(self, step, logs=None): @@ -242,9 +242,15 @@ class ProgBarLogger(Callback): samples = logs.get('batch_size', 1) self.evaled_samples += samples + if self.eval_step % self.log_freq == 0 and self.verbose and ParallelEnv( + ).local_rank == 0: + # if steps is not None, last step will update in on_epoch_end + if self.eval_steps and self.eval_step < self.eval_steps: + self._updates(logs, 'eval') + def on_eval_end(self, logs=None): logs = logs or {} - if self.verbose and get_local_rank() == 0: + if self.verbose and ParallelEnv().local_rank == 0: self._updates(logs, 'eval') print('Eval samples: %d' % (self.evaled_samples)) @@ -258,7 +264,7 @@ class ModelCheckpoint(Callback): self.epoch = epoch def _is_save(self): - return self.model and self.save_dir and get_local_rank() == 0 + return self.model and self.save_dir and ParallelEnv().local_rank == 0 def on_epoch_end(self, epoch, logs=None): if self._is_save() and self.epoch % self.save_freq == 0: diff --git a/distributed.py b/distributed.py index 23a4d13e6ce4787baa9fac5ef082de9d5ca95ce4..87818545671c45cf4faba234406e87762e897784 100644 --- a/distributed.py +++ b/distributed.py @@ -13,30 +13,20 @@ # limitations under the License. import os import sys +import six import time import math import socket import contextlib -from contextlib import closing -from six import string_types import numpy as np -from collections import OrderedDict -from paddle import fluid -import paddle.fluid.unique_name as nameGen -from paddle.fluid import core -from paddle.fluid import framework +from paddle import fluid from paddle.fluid.layers import collective -from paddle.fluid.dygraph import to_variable, no_grad, layers -from paddle.fluid.framework import Variable -from paddle.fluid.executor import global_scope +from paddle.fluid.dygraph.parallel import ParallelEnv, ParallelStrategy +from paddle.fluid.io import BatchSampler -from paddle.fluid.dygraph.parallel import Env, DataParallel, ParallelStrategy -from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadcast, \ - _c_sync_comm_stream, _c_sync_calc_stream -from paddle.fluid.io import BatchSampler, DataLoader +_parallel_context_initialized = False -__parallel_context_init = False class DistributedBatchSampler(BatchSampler): """Sampler that restricts data loading to a subset of the dataset. @@ -71,11 +61,13 @@ class DistributedBatchSampler(BatchSampler): self.shuffle = shuffle assert isinstance(drop_last, bool), \ "drop_last should be a boolean number" + self.drop_last = drop_last - self.nranks = get_nranks() - self.local_rank = get_local_rank() + self.nranks = ParallelEnv().nranks + self.local_rank = ParallelEnv().local_rank self.epoch = 0 - self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks)) + self.num_samples = int( + math.ceil(len(self.dataset) * 1.0 / self.nranks)) self.total_size = self.num_samples * self.nranks def __iter__(self): @@ -86,9 +78,28 @@ class DistributedBatchSampler(BatchSampler): if self.shuffle: np.random.RandomState(self.epoch).shuffle(indices) self.epoch += 1 + # subsample - indices = indices[self.local_rank * self.num_samples: - (self.local_rank + 1) * self.num_samples] + def _get_indices_by_batch_size(indices): + subsampled_indices = [] + last_batch_size = self.total_size % (self.batch_size * self.nranks) + assert last_batch_size % self.nranks == 0 + last_local_batch_size = last_batch_size // self.nranks + + for i in range(self.local_rank * self.batch_size, + len(indices) - last_batch_size, + self.batch_size * self.nranks): + subsampled_indices.extend(indices[i:i + self.batch_size]) + + indices = indices[len(indices) - last_batch_size:] + subsampled_indices.extend(indices[ + self.local_rank * last_local_batch_size:( + self.local_rank + 1) * last_local_batch_size]) + return subsampled_indices + + if self.nranks > 1: + indices = _get_indices_by_batch_size(indices) + assert len(indices) == self.num_samples _sample_iter = iter(indices) @@ -106,46 +117,37 @@ class DistributedBatchSampler(BatchSampler): num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size - -def _all_gather(x, nranks, ring_id=0, use_calc_stream=True): - return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream) - - -def get_local_rank(): - return Env().local_rank + def set_epoch(self, epoch): + self.epoch = epoch -def get_nranks(): - return Env().nranks +def _all_gather(x, nranks, ring_id=0, use_calc_stream=True): + return collective._c_allgather( + x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream) def wait_server_ready(endpoints): - assert not isinstance(endpoints, string_types) + assert not isinstance(endpoints, six.string_types) while True: all_ok = True not_ready_endpoints = [] for ep in endpoints: ip_port = ep.split(":") - with closing( - socket.socket(socket.AF_INET, - socket.SOCK_STREAM)) as sock: + with contextlib.closing( + socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: sock.settimeout(2) result = sock.connect_ex((ip_port[0], int(ip_port[1]))) if result != 0: all_ok = False not_ready_endpoints.append(ep) if not all_ok: - sys.stderr.write("server not ready, wait 3 sec to retry...\n") - sys.stderr.write("not ready endpoints:" + str( - not_ready_endpoints) + "\n") - sys.stderr.flush() time.sleep(3) else: break -def init_communicator(program, rank, nranks, wait_port, - current_endpoint, endpoints): +def init_communicator(program, rank, nranks, wait_port, current_endpoint, + endpoints): if nranks < 2: return other_endpoints = endpoints[:] @@ -154,9 +156,9 @@ def init_communicator(program, rank, nranks, wait_port, wait_server_ready(other_endpoints) block = program.global_block() nccl_id_var = block.create_var( - name=nameGen.generate('nccl_id'), + name=fluid.unique_name.generate('nccl_id'), persistable=True, - type=core.VarDesc.VarType.RAW) + type=fluid.core.VarDesc.VarType.RAW) block.append_op( type='c_gen_nccl_id', @@ -181,25 +183,28 @@ def init_communicator(program, rank, nranks, wait_port, def prepare_distributed_context(place=None): if place is None: - place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \ + place = fluid.CUDAPlace(ParallelEnv().dev_id) if ParallelEnv().nranks > 1 \ else fluid.CUDAPlace(0) - + strategy = ParallelStrategy() - strategy.nranks = Env().nranks - strategy.local_rank = Env().local_rank - strategy.trainer_endpoints = Env().trainer_endpoints - strategy.current_endpoint = Env().current_endpoint + strategy.nranks = ParallelEnv().nranks + strategy.local_rank = ParallelEnv().local_rank + strategy.trainer_endpoints = ParallelEnv().trainer_endpoints + strategy.current_endpoint = ParallelEnv().current_endpoint if strategy.nranks < 2: return - global __parallel_context_init + global _parallel_context_initialized + + if not _parallel_context_initialized and isinstance(place, + fluid.CUDAPlace): - if not __parallel_context_init and isinstance(place, core.CUDAPlace): def _init_context(): - communicator_prog = framework.Program() - init_communicator(communicator_prog, strategy.local_rank, strategy.nranks, - True, strategy.current_endpoint, strategy.trainer_endpoints) + communicator_prog = fluid.Program() + init_communicator(communicator_prog, strategy.local_rank, + strategy.nranks, True, strategy.current_endpoint, + strategy.trainer_endpoints) exe = fluid.Executor(place) exe.run(communicator_prog) @@ -213,57 +218,5 @@ def prepare_distributed_context(place=None): else: assert ("Only support CUDAPlace for now.") - __parallel_context_init = True + _parallel_context_initialized = True return strategy - - -class DistributedDataParallel(DataParallel): - def __init__(self, layers, strategy=None): - if strategy is None: - strategy = ParallelStrategy() - strategy.nranks = Env().nranks - strategy.local_rank = Env().local_rank - strategy.trainer_endpoints = Env().trainer_endpoints - strategy.current_endpoint = Env().current_endpoint - - super(DistributedDataParallel, self).__init__(layers, strategy) - - @no_grad - def apply_collective_grads(self): - """ - AllReduce the Parameters' gradient. - """ - if not self._is_data_parallel_mode(): - return - - grad_var_set = set() - grad_vars = [] - for param in self._layers.parameters(): - # NOTE(zcd): The grad_ivar maybe no generated. - if param.trainable and param._grad_ivar(): - g_var = param._grad_ivar() - grad_vars.append(g_var) - assert g_var not in grad_var_set - grad_var_set.add(g_var) - - mega_bytes = 128 * 1024 * 1024 - group_idx = 0 - memory_counter = 0 - grad_var_groups = OrderedDict() - dtype = grad_vars[0].dtype - for g_var in grad_vars: - # Note: the dtype of the same group should be the same. - bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype) - if memory_counter < mega_bytes and dtype == g_var.dtype: - memory_counter += bytes - else: - memory_counter = bytes - group_idx += 1 - grad_var_groups.setdefault(group_idx, []).append(g_var) - - coalesced_grads_and_vars = self._coalesce_tensors(grad_var_groups) - - for coalesced_grad, _, _ in coalesced_grads_and_vars: - collective._c_allreduce(coalesced_grad, coalesced_grad, use_calc_stream=True) - - self._split_tensors(coalesced_grads_and_vars) diff --git a/mnist.py b/mnist.py index a6540599d90802772331efdd4f9d0b2283b708d1..745dc2f06e54136756ce5ae4f3b077c24468dd1d 100644 --- a/mnist.py +++ b/mnist.py @@ -26,7 +26,7 @@ from paddle.fluid.optimizer import Momentum from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from paddle.fluid.io import MNIST as MnistDataset -from model import Model, CrossEntropy, Input, init_context +from model import Model, CrossEntropy, Input, set_device from metrics import Accuracy @@ -106,7 +106,8 @@ class MNIST(Model): def main(): - init_context('dynamic' if FLAGS.dynamic else 'static') + device = set_device(FLAGS.device) + fluid.enable_dygraph(device) if FLAGS.dynamic else None train_dataset = MnistDataset(mode='train') val_dataset = MnistDataset(mode='test') @@ -118,7 +119,13 @@ def main(): optim = Momentum( learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters()) - model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels) + model.prepare( + optim, + CrossEntropy(), + Accuracy(topk=(1, 2)), + inputs, + labels, + device=FLAGS.device) if FLAGS.resume is not None: model.load(FLAGS.resume) @@ -131,6 +138,8 @@ def main(): if __name__ == '__main__': parser = argparse.ArgumentParser("CNN training on MNIST") + parser.add_argument( + "--device", type=str, default='gpu', help="device to use, gpu or cpu") parser.add_argument( "-d", "--dynamic", action='store_true', help="enable dygraph mode") parser.add_argument( diff --git a/model.py b/model.py index 265fbaa1fce29a64b257d05aec4a101d823209ff..dea21bb98329404d02c10e2a563f21d76f7851e1 100644 --- a/model.py +++ b/model.py @@ -20,26 +20,36 @@ import pickle import numpy as np import six import warnings -from collections import Iterable -from collections import OrderedDict +import tqdm -from collections import OrderedDict +from collections import Iterable from paddle import fluid from paddle.fluid.framework import in_dygraph_mode, Variable from paddle.fluid.executor import global_scope from paddle.fluid.io import is_belong_to_optimizer from paddle.fluid.dygraph.base import to_variable +from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.fluid.layers.utils import flatten - from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy -import paddle.fluid.incubate.fleet.base.role_maker as role_maker -import distributed -from distributed import DistributedBatchSampler -from paddle.fluid.io import DataLoader +from paddle.fluid.incubate.fleet.base import role_maker +from paddle.fluid.io import DataLoader, Dataset + +from distributed import DistributedBatchSampler, _all_gather, prepare_distributed_context, _parallel_context_initialized from metrics import Metric from callbacks import config_callbacks -__all__ = ['Model', 'Loss', 'CrossEntropy', 'Input'] +__all__ = ['Model', 'Loss', 'CrossEntropy', 'Input', 'set_device'] + + +def set_device(device): + assert isinstance(device, six.string_types) and device.lower() in ['cpu', 'gpu'], \ + "Expected device in ['cpu', 'gpu'], but got {}".format(device) + + place = fluid.CUDAPlace(ParallelEnv().dev_id) \ + if device.lower() == 'gpu' and fluid.is_compiled_with_cuda() \ + else fluid.CPUPlace() + + return place def to_list(value): @@ -85,18 +95,6 @@ def extract_args(func): return inspect.getargspec(func)[0] -def init_context(backend): - assert isinstance(backend, str) and backend.lower() in ['dynamic', 'static'], \ - "Expected backend in ['dynamic', 'static'], but got {}".format(backend) - - place = fluid.CUDAPlace(distributed.Env().dev_id) if \ - distributed.Env().nranks > 1 else fluid.CUDAPlace(0) - distributed.prepare_distributed_context(place) - backend = backend.lower() - if backend == 'dynamic': - fluid.enable_dygraph(place) - - class Input(fluid.dygraph.Layer): def __init__(self, shape=None, dtype=None, name=None): super(Input, self).__init__() @@ -162,8 +160,8 @@ class StaticGraphAdapter(object): 'test_batch': 0 } - self._nranks = distributed.Env().nranks - self._local_rank = distributed.Env().local_rank + self._nranks = ParallelEnv().nranks + self._local_rank = ParallelEnv().local_rank @property def mode(self): @@ -268,7 +266,8 @@ class StaticGraphAdapter(object): # When using static learning rate, static-graph would make it # a persistable var named 'unique_name.generate("learning_rate")', # However, dygraph wouldn't save it. - if var.name not in state: continue + if var.name not in state: + continue else: # moment and other accumulators if var.name not in converted_state: @@ -367,8 +366,8 @@ class StaticGraphAdapter(object): for metric, state in zip(self.model._metrics, metric_states): # cut off padding size if self.mode != 'train' and self.model._test_dataloader is not None \ - and isinstance(self.model._test_dataloader, DataLoader) \ - and self._nranks > 1: + and isinstance(self.model._test_dataloader, DataLoader) \ + and self._nranks > 1: total_size = len(self.model._test_dataloader.dataset) # TODO: fixme if have better way to get batch size samples = state[0].shape[0] @@ -408,7 +407,7 @@ class StaticGraphAdapter(object): for op in list(prog.global_block().ops): prog.global_block()._remove_op(0) if mode == 'train' and self.model._optimizer \ - and self.model._optimizer._learning_rate_map: + and self.model._optimizer._learning_rate_map: # HACK workaround learning rate map issue lr_var = self.model._optimizer._learning_rate_map[self._orig_prog] self.model._optimizer._learning_rate_map[prog] = lr_var @@ -427,14 +426,9 @@ class StaticGraphAdapter(object): losses = self.model._loss_function(outputs, labels) if self._nranks > 1 and mode != 'train': - outputs = [ - distributed._all_gather(o, self._nranks) for o in outputs - ] + outputs = [_all_gather(o, self._nranks) for o in outputs] if mode != 'test': - labels = [ - distributed._all_gather(l, self._nranks) - for l in labels - ] + labels = [_all_gather(l, self._nranks) for l in labels] if mode != 'test': for metric in self.model._metrics: @@ -471,31 +465,22 @@ class StaticGraphAdapter(object): if compiled_prog is not None: return compiled_prog - device = self.model._device - device_ids = self.model._device_ids + assert self.model._place is not None, \ + "device is not set, please call `model.prepare()` first" - if device.lower() == 'gpu': - places = fluid.cuda_places(device_ids) - else: - places = fluid.cpu_places(len(device_ids) if device_ids else None) + place = self.model._place # XXX *ALL WEIGHTS* should be initialized upon model construction # even if `forward()` may run different code path for different mode # therefore startup program only needs to run once if self._executor is None: - if self._nranks > 1 and device.lower() == 'gpu': - gpu_id = int(distributed.Env().dev_id) - place = fluid.CUDAPlace(gpu_id) if device.lower( - ) == 'gpu' else fluid.CPUPlace() - else: - place = places[0] self._executor = fluid.Executor(place) # XXX incremental initialization uninitialized = [] for var_py in self._startup_prog.list_vars(): var = fluid.global_scope().find_var(var_py.name) if not var_py.name.startswith('nccl_id') and var and \ - var.get_tensor()._is_initialized(): + var.get_tensor()._is_initialized(): continue uninitialized.append(var_py) @@ -506,14 +491,8 @@ class StaticGraphAdapter(object): if self._nranks < 2: compiled_prog = fluid.CompiledProgram(prog) else: - compiled_prog = prog #fleet.main_program - - if len(places) > 1: - loss_name = None - if mode == 'train' and self._loss_endpoint is not None: - loss_name = self._loss_endpoint.name - compiled_prog = compiled_prog.with_data_parallel( - loss_name=loss_name, places=places) + compiled_prog = prog + self._compiled_progs[mode] = compiled_prog @@ -521,8 +500,8 @@ class DynamicGraphAdapter(object): def __init__(self, model): super(DynamicGraphAdapter, self).__init__() self.model = model - self._nranks = distributed.Env().nranks - self._local_rank = distributed.Env().local_rank + self._nranks = ParallelEnv().nranks + self._local_rank = ParallelEnv().local_rank self._merge_count = { 'eval_total': 0, 'test_total': 0, @@ -531,7 +510,13 @@ class DynamicGraphAdapter(object): } if self._nranks > 1: - self.ddp_model = distributed.DistributedDataParallel(self.model) + stradegy = fluid.dygraph.parallel.ParallelStrategy() + stradegy.nranks = ParallelEnv().nranks + stradegy.local_rank = ParallelEnv().local_rank + stradegy.trainer_endpoints = ParallelEnv().trainer_endpoints + stradegy.current_endpoint = ParallelEnv().current_endpoint + self.ddp_model = fluid.dygraph.parallel.DataParallel(self.model, + stradegy) @property def mode(self): @@ -551,15 +536,14 @@ class DynamicGraphAdapter(object): if labels is not None: labels = [to_variable(l) for l in to_list(labels)] if self._nranks > 1: - outputs = self.ddp_model.forward( - * [to_variable(x) for x in inputs]) + outputs = self.ddp_model.forward(*[to_variable(x) for x in inputs]) losses = self.model._loss_function(outputs, labels) final_loss = fluid.layers.sum(losses) final_loss = self.ddp_model.scale_loss(final_loss) final_loss.backward() self.ddp_model.apply_collective_grads() else: - outputs = self.model.forward(* [to_variable(x) for x in inputs]) + outputs = self.model.forward(*[to_variable(x) for x in inputs]) losses = self.model._loss_function(outputs, labels) final_loss = fluid.layers.sum(losses) final_loss.backward() @@ -570,11 +554,11 @@ class DynamicGraphAdapter(object): for metric in self.model._metrics: metric_outs = metric.add_metric_op( to_list(outputs), to_list(labels)) - m = metric.update(* [to_numpy(m) for m in to_list(metric_outs)]) + m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)]) metrics.append(m) return ([to_numpy(l) for l in losses], metrics) \ - if len(metrics) > 0 else [to_numpy(l) for l in losses] + if len(metrics) > 0 else [to_numpy(l) for l in losses] def eval(self, inputs, labels=None): super(Model, self.model).eval() @@ -582,17 +566,14 @@ class DynamicGraphAdapter(object): inputs = to_list(inputs) if labels is not None: labels = [to_variable(l) for l in to_list(labels)] - outputs = self.model.forward(* [to_variable(x) for x in inputs]) + outputs = self.model.forward(*[to_variable(x) for x in inputs]) if self.model._loss_function: losses = self.model._loss_function(outputs, labels) else: losses = [] if self._nranks > 1: - outputs = [ - distributed._all_gather(o, self._nranks) - for o in to_list(outputs) - ] - labels = [distributed._all_gather(l, self._nranks) for l in labels] + outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)] + labels = [_all_gather(l, self._nranks) for l in labels] metrics = [] for metric in self.model._metrics: # cut off padding value. @@ -602,10 +583,8 @@ class DynamicGraphAdapter(object): samples = outputs[0].shape[0] current_count = self._merge_count.get(self.mode + '_total', 0) if current_count + samples >= total_size: - outputs = [ - o[:total_size - metric.count[0]] for o in outputs - ] - labels = [l[:total_size - metric.count[0]] for l in labels] + outputs = [o[:total_size - current_count] for o in outputs] + labels = [l[:total_size - current_count] for l in labels] self._merge_count[self.mode + '_total'] = 0 self._merge_count[self.mode + '_batch'] = total_size - current_count @@ -614,24 +593,22 @@ class DynamicGraphAdapter(object): self._merge_count[self.mode + '_batch'] = samples metric_outs = metric.add_metric_op(to_list(outputs), labels) - m = metric.update(* [to_numpy(m) for m in to_list(metric_outs)]) + m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)]) metrics.append(m) # To be consistent with static graph # return empty loss if loss_function is None return ([to_numpy(l) for l in losses], metrics) \ - if len(metrics) > 0 else [to_numpy(l) for l in losses] + if len(metrics) > 0 else [to_numpy(l) for l in losses] def test(self, inputs): super(Model, self.model).eval() self.mode = 'test' inputs = [to_variable(x) for x in to_list(inputs)] outputs = self.model.forward(*inputs) - if self._nranks > 2: - outputs = [ - distributed._all_gather(o, self._nranks) - for o in to_list(outputs) - ] + if self._nranks > 1 and isinstance(self.model._place, fluid.CUDAPlace): + outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)] + return [to_numpy(o) for o in to_list(outputs)] def parameters(self, *args, **kwargs): @@ -714,14 +691,9 @@ class Model(fluid.dygraph.Layer): self._loss_weights = None self._optimizer = None self._device = None - self._device_ids = None self._optimizer = None self._test_dataloader = None - # init multiple gpus context - self._place = fluid.CUDAPlace(distributed.Env().dev_id) \ - if distributed.Env().nranks > 1 else fluid.CUDAPlace(0) - # init backend if fluid.in_dygraph_mode(): self._adapter = DynamicGraphAdapter(self) @@ -738,7 +710,7 @@ class Model(fluid.dygraph.Layer): return self._adapter.test(*args, **kwargs) def save(self, *args, **kwargs): - if distributed.get_local_rank() == 0: + if ParallelEnv().local_rank == 0: return self._adapter.save(*args, **kwargs) def load(self, path, skip_mismatch=False, reset_optimizer=False): @@ -816,8 +788,7 @@ class Model(fluid.dygraph.Layer): metrics=None, inputs=None, labels=None, - device=None, - device_ids=None): + device=None): """ FIXME: add comments Args: @@ -840,19 +811,37 @@ class Model(fluid.dygraph.Layer): device (str|None): specify device type, 'CPU' or 'GPU'. If None, automatically select device according to installation package version. - device_ids (list[int]|None): specify device index. If None, - the available device will be obtained from the environment - variable when the model is executed: If the GPU is used, the - currently available device ID is obtained from the environment - variable FLAGS_selected_gpus or CUDA_VISIBLE_DEVICES when the - model is executed; CPU, when the model is executed, - the currently available CPU number is obtained from the - environment variable CPU_NUM. For example, export CPU_NUM=4, - if the environment variable is not set, the executor will add - the variable to the environment variable and set its value to 1. - The default is None. """ + if isinstance(device, fluid.CUDAPlace) or \ + (isinstance(device, six.string_types) and device.lower() == 'gpu') \ + or (device is None and fluid.is_compiled_with_cuda()): + if isinstance(device, fluid.CUDAPlace): + self._place = device + else: + self._place = fluid.CUDAPlace(ParallelEnv().dev_id) \ + if ParallelEnv().nranks > 1 else fluid.CUDAPlace(0) + + global _parallel_context_initialized + if ParallelEnv().nranks > 1 and not _parallel_context_initialized: + if fluid.in_dygraph_mode(): + fluid.disable_dygraph() + fluid.enable_dygraph(self._place) + fluid.dygraph.parallel.prepare_context() + else: + prepare_distributed_context(self._place) + + _parallel_context_initialized = True + elif isinstance(device, fluid.CPUPlace): + self._place = device + elif (isinstance(device, six.string_types) and device.lower() == 'cpu') \ + or (device is None): + self._place = fluid.CPUPlace() + else: + raise ValueError( + "Expected device in ('gpu', 'cpu', fluid.CUDAPlace, fluid.CPUPlace, None), \ + but got {}".format(device)) + self._optimizer = optimizer if loss_function: if not isinstance(loss_function, Loss): @@ -869,27 +858,22 @@ class Model(fluid.dygraph.Layer): metrics = metrics or [] for metric in to_list(metrics): assert isinstance(metric, Metric), \ - "{} is not sub class of Metric".format(metric.__class__.__name__) + "{} is not sub class of Metric".format( + metric.__class__.__name__) self._metrics = to_list(metrics) self._inputs = to_list(inputs) if not isinstance(inputs, dict) else [ inputs[n] for n in extract_args(self.forward) if n != 'self' ] self._labels = to_list(labels) - self._device = device - if device is None: - self._device = 'GPU' if fluid.is_compiled_with_cuda() else 'CPU' - self._device_ids = device_ids if not in_dygraph_mode(): self._adapter.prepare() def fit( self, - train_dataset=None, - eval_dataset=None, - train_loader=None, - eval_loader=None, + train_data=None, + eval_data=None, batch_size=1, epochs=1, eval_freq=1, @@ -904,60 +888,77 @@ class Model(fluid.dygraph.Layer): """ FIXME: add more comments and usage Args: - train_loader (DataLoader): An iterable data loader is used for train. - eval_loader (DataLoader): An iterable data loader is used for - evaluation at the end of epoch. If None, will not do evaluation. + train_data (Dataset|DataLoader): An iterable data loader is used for + train. An instance of paddle.fluid.io.Dataset or + paddle.fluid.io.Dataloader is recomended. + eval_data (Dataset|DataLoader): An iterable data loader is used for + evaluation at the end of epoch. If None, will not do evaluation. + An instance of paddle.fluid.io.Dataset or paddle.fluid.io.Dataloader + is recomended. + batch_size (int): Integer number. The batch size of train_data and eval_data. + When train_data and eval_data are both the instance of Dataloader, this + parameter will be ignored. epochs (int): Integer number. The number of epochs to train the model. eval_freq (int): The frequency, in number of epochs, an evalutation is performed. log_freq (int): The frequency, in number of steps, the training logs - is printed. + are printed. save_dir(str|None): The directory to save checkpoint during training. If None, will not save checkpoint. save_freq (int): The frequency, in number of epochs, to save checkpoint. verbose (int): The verbosity mode, should be 0, 1, or 2. 0 = silent, 1 = progress bar, 2 = one line per epoch. + drop_last (bool): whether drop the last incomplete batch of train_data + when dataset size is not divisible by the batch size. When train_data + is an instance of Dataloader, this parameter will be ignored. + shuffle (bool): whther to shuffle train_data. When train_data is an instance + of Dataloader, this parameter will be ignored. + num_workers (int): the number of subprocess to load data, 0 for no subprocess + used and loading data in main process. When train_data and eval_data are + both the instance of Dataloader, this parameter will be ignored. callbacks (Callback|None): A list of `Callback` instances to apply during training. If None, `ProgBarLogger` and `ModelCheckpoint` are automatically inserted. """ - assert train_dataset is not None or train_loader is not None, \ - "train_dataset or train_loader must be given" - - assert (train_loader is not None and train_dataset is None) or \ - (train_loader is None and train_dataset is not None), \ - "train_dataset should not be set when train_loader is given" + assert train_data is not None, \ + "train_data must be given!" if fluid.in_dygraph_mode(): feed_list = None else: feed_list = [x.forward() for x in self._inputs + self._labels] - if train_loader is None: + if isinstance(train_data, Dataset): train_sampler = DistributedBatchSampler( - train_dataset, + train_data, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) train_loader = DataLoader( - train_dataset, + train_data, batch_sampler=train_sampler, places=self._place, feed_list=feed_list, num_workers=num_workers, return_list=True) + else: + train_loader = train_data - if eval_loader is None and eval_dataset is not None: + if eval_data is not None and isinstance(eval_data, Dataset): eval_sampler = DistributedBatchSampler( - eval_dataset, batch_size=batch_size) + eval_data, batch_size=batch_size) eval_loader = DataLoader( - eval_dataset, + eval_data, batch_sampler=eval_sampler, places=self._place, feed_list=feed_list, num_workers=num_workers, return_list=True) + elif eval_data is not None: + eval_loader = eval_data + else: + eval_loader = None do_eval = eval_loader is not None self._test_dataloader = eval_loader @@ -974,84 +975,256 @@ class Model(fluid.dygraph.Layer): verbose=verbose, metrics=self._metrics_name(), ) - def _run_one_epoch(data_loader, callbacks, mode): - size = len(data_loader) if hasattr(data_loader, - '__len__') else None - logs = { - 'steps': size, - 'metrics_name': metrics_name, - } - for step, data in enumerate(data_loader): - # data might come from different types of data_loader and have - # different format, as following: - # 1. DataLoader in static graph: - # [[input1, input2, ..., label1, lable2, ...]] - # 2. DataLoader in dygraph - # [input1, input2, ..., label1, lable2, ...] - # 3. custumed iterator yield concated inputs and labels: - # [input1, input2, ..., label1, lable2, ...] - # 4. custumed iterator yield seperated inputs and labels: - # ([input1, input2, ...], [label1, lable2, ...]) - # To handle all of these, flatten (nested) list to list. - data = flatten(data) - # LoDTensor.shape is callable, where LoDTensor comes from - # DataLoader in static graph - batch_size = data[0].shape()[0] if callable(data[ - 0].shape) else data[0].shape[0] - - cbks.on_batch_begin(mode, step, logs) - if mode == 'train': - outs = self.train(data[:len(self._inputs)], - data[len(self._inputs):]) - else: - outs = self.eval(data[:len(self._inputs)], - data[len(self._inputs):]) - - # losses - loss = outs[0] if self._metrics else outs - metrics = [[l[0] for l in loss]] - - # metrics - for metric in self._metrics: - res = metric.accumulate() - metrics.extend(to_list(res)) - - assert len(metrics_name) == len(metrics) - for k, v in zip(metrics_name, metrics): - logs[k] = v - - logs['step'] = step - if mode == 'train' or self._adapter._merge_count.get( - mode + '_batch', 0) <= 0: - logs['batch_size'] = batch_size * distributed.Env().nranks - else: - logs['batch_size'] = self._adapter._merge_count[mode + - '_batch'] - - cbks.on_batch_end(mode, step, logs) - self._reset_metrics() - return logs - cbks.on_begin('train') for epoch in range(epochs): - cbks.on_epoch_begin(epoch) + # FIXME: adapt to DataLoader loader = train_loader if not isinstance(train_loader, Iterable): loader = train_loader() - logs = _run_one_epoch(loader, cbks, 'train') - cbks.on_epoch_end(epoch, logs) + logs = self._run_one_epoch( + loader, cbks, 'train', metrics_name, epoch=epoch) if do_eval and epoch % eval_freq == 0: - cbks.on_begin('eval', logs) # FIXME: adapt to DataLoader loader = eval_loader if not isinstance(eval_loader, Iterable): loader = eval_loader() - logs = _run_one_epoch(eval_loader(), cbks, 'eval') + + eval_steps = len(loader) if hasattr(loader, + '__len__') else None + cbks.on_begin('eval', { + 'steps': eval_steps, + 'metrics_name': metrics_name + }) + + logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name) + cbks.on_end('eval', logs) cbks.on_end('train', logs) + self._test_dataloader = None + + def evaluate( + self, + eval_data, + batch_size=1, + log_freq=10, + verbose=2, + num_workers=0, + callbacks=None, ): + """ + FIXME: add more comments and usage + Args: + eval_data (Dataset|DataLoader): An iterable data loader is used for + evaluation. An instance of paddle.fluid.io.Dataset or + paddle.fluid.io.Dataloader is recomended. + batch_size (int): Integer number. The batch size of train_data and eval_data. + When train_data and eval_data are both the instance of Dataloader, this + parameter will be ignored. + log_freq (int): The frequency, in number of steps, the eval logs + are printed. + verbose (int): The verbosity mode, should be 0, 1, or 2. + 0 = silent, 1 = progress bar, 2 = one line per epoch. + num_workers (int): The number of subprocess to load data, 0 for no subprocess + used and loading data in main process. When train_data and eval_data are + both the instance of Dataloader, this parameter will be ignored. + callbacks (Callback|None): A list of `Callback` instances to apply + during training. If None, `ProgBarLogger` and `ModelCheckpoint` + are automatically inserted. + """ + + if fluid.in_dygraph_mode(): + feed_list = None + else: + feed_list = [x.forward() for x in self._inputs + self._labels] + + if eval_data is not None and isinstance(eval_data, Dataset): + eval_sampler = DistributedBatchSampler( + eval_data, batch_size=batch_size) + eval_loader = DataLoader( + eval_data, + batch_sampler=eval_sampler, + places=self._place, + feed_list=feed_list, + num_workers=num_workers, + return_list=True) + else: + eval_loader = eval_data + + self._test_dataloader = eval_loader + metrics_name = self._metrics_name() + + cbks = config_callbacks( + callbacks, + model=self, + log_freq=log_freq, + verbose=verbose, + metrics=self._metrics_name(), ) + + loader = eval_loader + if not isinstance(eval_loader, Iterable): + loader = eval_loader() + + eval_steps = len(loader) if hasattr(loader, '__len__') else None + cbks.on_begin('eval', + {'steps': eval_steps, + 'metrics_name': metrics_name}) + + logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name) + + cbks.on_end('eval', logs) + + self._test_dataloader = None + + eval_result = {} + for k in self._metrics_name(): + eval_result[k] = logs[k] + + return eval_result + + def predict(self, test_data, batch_size=1, num_workers=0): + """ + FIXME: add more comments and usage + Args: + test_data (Dataset|DataLoader): An iterable data loader is used for + predict. An instance of paddle.fluid.io.Dataset or paddle.fluid.io.Dataloader + is recomended. + batch_size (int): Integer number. The batch size of train_data and eval_data. + When train_data and eval_data are both the instance of Dataloader, this + parameter will be ignored. + num_workers (int): the number of subprocess to load data, 0 for no subprocess + used and loading data in main process. When train_data and eval_data are + both the instance of Dataloader, this parameter will be ignored. + """ + + if fluid.in_dygraph_mode(): + feed_list = None + else: + feed_list = [x.forward() for x in self._inputs + self._labels] + + if test_data is not None and isinstance(test_data, Dataset): + test_sampler = DistributedBatchSampler( + test_data, batch_size=batch_size) + test_loader = DataLoader( + test_data, + batch_sampler=test_sampler, + places=self._place, + feed_list=feed_list, + num_workers=num_workers, + return_list=True) + else: + test_loader = test_data + + self._test_dataloader = test_loader + + loader = test_loader + if not isinstance(test_loader, Iterable): + loader = test_loader() + + outputs = None + for data in tqdm.tqdm(loader): + if not fluid.in_dygraph_mode(): + data = data[0] + + outs = self.test(*data) + + if outputs is None: + outputs = outs + else: + outputs = [ + np.vstack([x, outs[i]]) for i, x in enumerate(outputs) + ] + + self._test_dataloader = None + if test_loader is not None and self._adapter._nranks > 1 \ + and isinstance(test_loader, DataLoader): + outputs = [o[:len(test_loader.dataset)] for o in outputs] + return outputs + + def set_eval_data(self, eval_data): + """ + Args: + eval_data (Dataset|DataLoader|None): An iterable data loader is used for + eval. An instance of paddle.fluid.io.Dataset or + paddle.fluid.io.Dataloader is recomended. + """ + assert isinstance( + eval_data, + DataLoader), "eval_data must be a instance of Dataloader!" + self._test_dataloader = eval_data + + def _run_one_epoch(self, + data_loader, + callbacks, + mode, + metrics_name, + epoch=None): + size = len(data_loader) if hasattr(data_loader, '__len__') else None + logs = { + 'steps': size, + 'metrics_name': metrics_name, + } + + if mode == 'train': + assert epoch is not None, 'when mode is train, epoch must be given' + callbacks.on_epoch_begin(epoch) + + for step, data in enumerate(data_loader): + # data might come from different types of data_loader and have + # different format, as following: + # 1. DataLoader in static graph: + # [[input1, input2, ..., label1, lable2, ...]] + # 2. DataLoader in dygraph + # [input1, input2, ..., label1, lable2, ...] + # 3. custumed iterator yield concated inputs and labels: + # [input1, input2, ..., label1, lable2, ...] + # 4. custumed iterator yield seperated inputs and labels: + # ([input1, input2, ...], [label1, lable2, ...]) + # To handle all of these, flatten (nested) list to list. + data = flatten(data) + # LoDTensor.shape is callable, where LoDTensor comes from + # DataLoader in static graph + batch_size = data[0].shape()[0] if callable(data[ + 0].shape) else data[0].shape[0] + + callbacks.on_batch_begin(mode, step, logs) + if mode == 'train': + outs = self.train(data[:len(self._inputs)], + data[len(self._inputs):]) + else: + outs = self.eval(data[:len(self._inputs)], + data[len(self._inputs):]) + + # losses + loss = outs[0] if self._metrics else outs + metrics = [[l[0] for l in loss]] + + # metrics + for metric in self._metrics: + res = metric.accumulate() + metrics.extend(to_list(res)) + + assert len(metrics_name) == len(metrics) + for k, v in zip(metrics_name, metrics): + logs[k] = v + + logs['step'] = step + if mode == 'train' or self._adapter._merge_count.get( + mode + '_batch', 0) <= 0: + logs['batch_size'] = batch_size * ParallelEnv().nranks + else: + logs['batch_size'] = self._adapter._merge_count[mode + + '_batch'] + + callbacks.on_batch_end(mode, step, logs) + self._reset_metrics() + + if mode == 'train': + assert epoch is not None, 'when mode is train, epoch must be given' + callbacks.on_epoch_end(epoch) + + return logs def _reset_metrics(self): for metric in self._metrics: diff --git a/tests/test_model.py b/tests/test_model.py index 9a75f98e6bff3803400210fcb475d4988974fb5c..9e8c880e461c684bc46e392c362ace3d00e67f53 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -28,7 +28,7 @@ import contextlib import paddle from paddle import fluid from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear -from model import Model, CrossEntropy, Input, Loss, init_context +from model import Model, CrossEntropy, Input, Loss, set_device from metrics import Accuracy from callbacks import ProgBarLogger from paddle.fluid.io import BatchSampler, DataLoader @@ -139,9 +139,30 @@ class MyCrossEntropy(Loss): return [loss1, loss2] +class TestMnistDataset(MnistDataset): + def __init__(self): + super(TestMnistDataset, self).__init__(mode='test') + + def __getitem__(self, idx): + return self.images[idx], + + def __len__(self): + return len(self.images) + + +def get_predict_accuracy(pred, gt): + pred = np.argmax(pred, -1) + gt = np.array(gt) + + correct = pred[:, np.newaxis] == gt + + return np.sum(correct) / correct.shape[0] + + class TestModel(unittest.TestCase): def fit(self, dynamic, is_mlp=False): - init_context('dynamic' if dynamic else 'static') + device = set_device('gpu') + fluid.enable_dygraph(device) if dynamic else None im_shape = (-1, 784) batch_size = 128 @@ -151,19 +172,31 @@ class TestModel(unittest.TestCase): train_dataset = MnistDataset(mode='train') val_dataset = MnistDataset(mode='test') + test_dataset = TestMnistDataset() model = MNIST() if not is_mlp else MLP() optim = fluid.optimizer.Momentum( learning_rate=0.01, momentum=.9, parameter_list=model.parameters()) loss = CrossEntropy() if not is_mlp else MyCrossEntropy() - model.prepare(optim, loss, Accuracy(), inputs, labels) + model.prepare(optim, loss, Accuracy(), inputs, labels, device=device) cbk = ProgBarLogger(50) + model.fit(train_dataset, val_dataset, epochs=2, batch_size=batch_size, callbacks=cbk) + eval_result = model.evaluate(val_dataset, batch_size=batch_size) + + output = model.predict(test_dataset, batch_size=batch_size) + + np.testing.assert_equal(output[0].shape[0], len(test_dataset)) + + acc = get_predict_accuracy(output[0], val_dataset.labels) + + np.testing.assert_allclose(acc, eval_result['acc']) + def test_fit_static(self): self.fit(False)