提交 863897ce 编写于 作者: G guosheng

Merge branch 'master' of https://github.com/PaddlePaddle/hapi into fix-data-train

...@@ -16,7 +16,7 @@ import six ...@@ -16,7 +16,7 @@ import six
import copy import copy
from progressbar import ProgressBar from progressbar import ProgressBar
from distributed import get_local_rank from paddle.fluid.dygraph.parallel import ParallelEnv
def config_callbacks(callbacks=None, def config_callbacks(callbacks=None,
...@@ -195,7 +195,7 @@ class ProgBarLogger(Callback): ...@@ -195,7 +195,7 @@ class ProgBarLogger(Callback):
self.steps = self.params['steps'] self.steps = self.params['steps']
self.epoch = epoch self.epoch = epoch
self.train_step = 0 self.train_step = 0
if self.verbose and self.epochs and get_local_rank() == 0: if self.verbose and self.epochs and ParallelEnv().local_rank == 0:
print('Epoch %d/%d' % (epoch + 1, self.epochs)) print('Epoch %d/%d' % (epoch + 1, self.epochs))
self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose) self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose)
...@@ -213,8 +213,8 @@ class ProgBarLogger(Callback): ...@@ -213,8 +213,8 @@ class ProgBarLogger(Callback):
logs = logs or {} logs = logs or {}
self.train_step += 1 self.train_step += 1
if self.train_step % self.log_freq == 0 and self.verbose and get_local_rank( if self.train_step % self.log_freq == 0 and self.verbose and ParallelEnv(
) == 0: ).local_rank == 0:
# if steps is not None, last step will update in on_epoch_end # if steps is not None, last step will update in on_epoch_end
if self.steps and self.train_step < self.steps: if self.steps and self.train_step < self.steps:
self._updates(logs, 'train') self._updates(logs, 'train')
...@@ -223,7 +223,7 @@ class ProgBarLogger(Callback): ...@@ -223,7 +223,7 @@ class ProgBarLogger(Callback):
def on_epoch_end(self, epoch, logs=None): def on_epoch_end(self, epoch, logs=None):
logs = logs or {} logs = logs or {}
if self.verbose and get_local_rank() == 0: if self.verbose and ParallelEnv().local_rank == 0:
self._updates(logs, 'train') self._updates(logs, 'train')
def on_eval_begin(self, logs=None): def on_eval_begin(self, logs=None):
...@@ -233,7 +233,7 @@ class ProgBarLogger(Callback): ...@@ -233,7 +233,7 @@ class ProgBarLogger(Callback):
self.evaled_samples = 0 self.evaled_samples = 0
self.eval_progbar = ProgressBar( self.eval_progbar = ProgressBar(
num=self.eval_steps, verbose=self.verbose) num=self.eval_steps, verbose=self.verbose)
if get_local_rank() == 0: if ParallelEnv().local_rank == 0:
print('Eval begin...') print('Eval begin...')
def on_eval_batch_end(self, step, logs=None): def on_eval_batch_end(self, step, logs=None):
...@@ -242,9 +242,15 @@ class ProgBarLogger(Callback): ...@@ -242,9 +242,15 @@ class ProgBarLogger(Callback):
samples = logs.get('batch_size', 1) samples = logs.get('batch_size', 1)
self.evaled_samples += samples self.evaled_samples += samples
if self.eval_step % self.log_freq == 0 and self.verbose and ParallelEnv(
).local_rank == 0:
# if steps is not None, last step will update in on_epoch_end
if self.eval_steps and self.eval_step < self.eval_steps:
self._updates(logs, 'eval')
def on_eval_end(self, logs=None): def on_eval_end(self, logs=None):
logs = logs or {} logs = logs or {}
if self.verbose and get_local_rank() == 0: if self.verbose and ParallelEnv().local_rank == 0:
self._updates(logs, 'eval') self._updates(logs, 'eval')
print('Eval samples: %d' % (self.evaled_samples)) print('Eval samples: %d' % (self.evaled_samples))
...@@ -258,7 +264,7 @@ class ModelCheckpoint(Callback): ...@@ -258,7 +264,7 @@ class ModelCheckpoint(Callback):
self.epoch = epoch self.epoch = epoch
def _is_save(self): def _is_save(self):
return self.model and self.save_dir and get_local_rank() == 0 return self.model and self.save_dir and ParallelEnv().local_rank == 0
def on_epoch_end(self, epoch, logs=None): def on_epoch_end(self, epoch, logs=None):
if self._is_save() and self.epoch % self.save_freq == 0: if self._is_save() and self.epoch % self.save_freq == 0:
......
...@@ -13,30 +13,20 @@ ...@@ -13,30 +13,20 @@
# limitations under the License. # limitations under the License.
import os import os
import sys import sys
import six
import time import time
import math import math
import socket import socket
import contextlib import contextlib
from contextlib import closing
from six import string_types
import numpy as np import numpy as np
from collections import OrderedDict
from paddle import fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
from paddle.fluid import framework from paddle import fluid
from paddle.fluid.layers import collective from paddle.fluid.layers import collective
from paddle.fluid.dygraph import to_variable, no_grad, layers from paddle.fluid.dygraph.parallel import ParallelEnv, ParallelStrategy
from paddle.fluid.framework import Variable from paddle.fluid.io import BatchSampler
from paddle.fluid.executor import global_scope
from paddle.fluid.dygraph.parallel import Env, DataParallel, ParallelStrategy _parallel_context_initialized = False
from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadcast, \
_c_sync_comm_stream, _c_sync_calc_stream
from paddle.fluid.io import BatchSampler, DataLoader
__parallel_context_init = False
class DistributedBatchSampler(BatchSampler): class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset. """Sampler that restricts data loading to a subset of the dataset.
...@@ -71,11 +61,13 @@ class DistributedBatchSampler(BatchSampler): ...@@ -71,11 +61,13 @@ class DistributedBatchSampler(BatchSampler):
self.shuffle = shuffle self.shuffle = shuffle
assert isinstance(drop_last, bool), \ assert isinstance(drop_last, bool), \
"drop_last should be a boolean number" "drop_last should be a boolean number"
self.drop_last = drop_last self.drop_last = drop_last
self.nranks = get_nranks() self.nranks = ParallelEnv().nranks
self.local_rank = get_local_rank() self.local_rank = ParallelEnv().local_rank
self.epoch = 0 self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks)) self.num_samples = int(
math.ceil(len(self.dataset) * 1.0 / self.nranks))
self.total_size = self.num_samples * self.nranks self.total_size = self.num_samples * self.nranks
def __iter__(self): def __iter__(self):
...@@ -86,9 +78,28 @@ class DistributedBatchSampler(BatchSampler): ...@@ -86,9 +78,28 @@ class DistributedBatchSampler(BatchSampler):
if self.shuffle: if self.shuffle:
np.random.RandomState(self.epoch).shuffle(indices) np.random.RandomState(self.epoch).shuffle(indices)
self.epoch += 1 self.epoch += 1
# subsample # subsample
indices = indices[self.local_rank * self.num_samples: def _get_indices_by_batch_size(indices):
(self.local_rank + 1) * self.num_samples] subsampled_indices = []
last_batch_size = self.total_size % (self.batch_size * self.nranks)
assert last_batch_size % self.nranks == 0
last_local_batch_size = last_batch_size // self.nranks
for i in range(self.local_rank * self.batch_size,
len(indices) - last_batch_size,
self.batch_size * self.nranks):
subsampled_indices.extend(indices[i:i + self.batch_size])
indices = indices[len(indices) - last_batch_size:]
subsampled_indices.extend(indices[
self.local_rank * last_local_batch_size:(
self.local_rank + 1) * last_local_batch_size])
return subsampled_indices
if self.nranks > 1:
indices = _get_indices_by_batch_size(indices)
assert len(indices) == self.num_samples assert len(indices) == self.num_samples
_sample_iter = iter(indices) _sample_iter = iter(indices)
...@@ -106,46 +117,37 @@ class DistributedBatchSampler(BatchSampler): ...@@ -106,46 +117,37 @@ class DistributedBatchSampler(BatchSampler):
num_samples += int(not self.drop_last) * (self.batch_size - 1) num_samples += int(not self.drop_last) * (self.batch_size - 1)
return num_samples // self.batch_size return num_samples // self.batch_size
def set_epoch(self, epoch):
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True): self.epoch = epoch
return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
def get_local_rank():
return Env().local_rank
def get_nranks(): def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return Env().nranks return collective._c_allgather(
x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
def wait_server_ready(endpoints): def wait_server_ready(endpoints):
assert not isinstance(endpoints, string_types) assert not isinstance(endpoints, six.string_types)
while True: while True:
all_ok = True all_ok = True
not_ready_endpoints = [] not_ready_endpoints = []
for ep in endpoints: for ep in endpoints:
ip_port = ep.split(":") ip_port = ep.split(":")
with closing( with contextlib.closing(
socket.socket(socket.AF_INET, socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
socket.SOCK_STREAM)) as sock:
sock.settimeout(2) sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1]))) result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0: if result != 0:
all_ok = False all_ok = False
not_ready_endpoints.append(ep) not_ready_endpoints.append(ep)
if not all_ok: if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write("not ready endpoints:" + str(
not_ready_endpoints) + "\n")
sys.stderr.flush()
time.sleep(3) time.sleep(3)
else: else:
break break
def init_communicator(program, rank, nranks, wait_port, def init_communicator(program, rank, nranks, wait_port, current_endpoint,
current_endpoint, endpoints): endpoints):
if nranks < 2: if nranks < 2:
return return
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
...@@ -154,9 +156,9 @@ def init_communicator(program, rank, nranks, wait_port, ...@@ -154,9 +156,9 @@ def init_communicator(program, rank, nranks, wait_port,
wait_server_ready(other_endpoints) wait_server_ready(other_endpoints)
block = program.global_block() block = program.global_block()
nccl_id_var = block.create_var( nccl_id_var = block.create_var(
name=nameGen.generate('nccl_id'), name=fluid.unique_name.generate('nccl_id'),
persistable=True, persistable=True,
type=core.VarDesc.VarType.RAW) type=fluid.core.VarDesc.VarType.RAW)
block.append_op( block.append_op(
type='c_gen_nccl_id', type='c_gen_nccl_id',
...@@ -181,25 +183,28 @@ def init_communicator(program, rank, nranks, wait_port, ...@@ -181,25 +183,28 @@ def init_communicator(program, rank, nranks, wait_port,
def prepare_distributed_context(place=None): def prepare_distributed_context(place=None):
if place is None: if place is None:
place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \ place = fluid.CUDAPlace(ParallelEnv().dev_id) if ParallelEnv().nranks > 1 \
else fluid.CUDAPlace(0) else fluid.CUDAPlace(0)
strategy = ParallelStrategy() strategy = ParallelStrategy()
strategy.nranks = Env().nranks strategy.nranks = ParallelEnv().nranks
strategy.local_rank = Env().local_rank strategy.local_rank = ParallelEnv().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint strategy.current_endpoint = ParallelEnv().current_endpoint
if strategy.nranks < 2: if strategy.nranks < 2:
return return
global __parallel_context_init global _parallel_context_initialized
if not _parallel_context_initialized and isinstance(place,
fluid.CUDAPlace):
if not __parallel_context_init and isinstance(place, core.CUDAPlace):
def _init_context(): def _init_context():
communicator_prog = framework.Program() communicator_prog = fluid.Program()
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks, init_communicator(communicator_prog, strategy.local_rank,
True, strategy.current_endpoint, strategy.trainer_endpoints) strategy.nranks, True, strategy.current_endpoint,
strategy.trainer_endpoints)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(communicator_prog) exe.run(communicator_prog)
...@@ -213,57 +218,5 @@ def prepare_distributed_context(place=None): ...@@ -213,57 +218,5 @@ def prepare_distributed_context(place=None):
else: else:
assert ("Only support CUDAPlace for now.") assert ("Only support CUDAPlace for now.")
__parallel_context_init = True _parallel_context_initialized = True
return strategy return strategy
class DistributedDataParallel(DataParallel):
def __init__(self, layers, strategy=None):
if strategy is None:
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
super(DistributedDataParallel, self).__init__(layers, strategy)
@no_grad
def apply_collective_grads(self):
"""
AllReduce the Parameters' gradient.
"""
if not self._is_data_parallel_mode():
return
grad_var_set = set()
grad_vars = []
for param in self._layers.parameters():
# NOTE(zcd): The grad_ivar maybe no generated.
if param.trainable and param._grad_ivar():
g_var = param._grad_ivar()
grad_vars.append(g_var)
assert g_var not in grad_var_set
grad_var_set.add(g_var)
mega_bytes = 128 * 1024 * 1024
group_idx = 0
memory_counter = 0
grad_var_groups = OrderedDict()
dtype = grad_vars[0].dtype
for g_var in grad_vars:
# Note: the dtype of the same group should be the same.
bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype)
if memory_counter < mega_bytes and dtype == g_var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
group_idx += 1
grad_var_groups.setdefault(group_idx, []).append(g_var)
coalesced_grads_and_vars = self._coalesce_tensors(grad_var_groups)
for coalesced_grad, _, _ in coalesced_grads_and_vars:
collective._c_allreduce(coalesced_grad, coalesced_grad, use_calc_stream=True)
self._split_tensors(coalesced_grads_and_vars)
...@@ -26,7 +26,7 @@ from paddle.fluid.optimizer import Momentum ...@@ -26,7 +26,7 @@ from paddle.fluid.optimizer import Momentum
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from paddle.fluid.io import MNIST as MnistDataset from paddle.fluid.io import MNIST as MnistDataset
from model import Model, CrossEntropy, Input, init_context from model import Model, CrossEntropy, Input, set_device
from metrics import Accuracy from metrics import Accuracy
...@@ -106,7 +106,8 @@ class MNIST(Model): ...@@ -106,7 +106,8 @@ class MNIST(Model):
def main(): def main():
init_context('dynamic' if FLAGS.dynamic else 'static') device = set_device(FLAGS.device)
fluid.enable_dygraph(device) if FLAGS.dynamic else None
train_dataset = MnistDataset(mode='train') train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test') val_dataset = MnistDataset(mode='test')
...@@ -118,7 +119,13 @@ def main(): ...@@ -118,7 +119,13 @@ def main():
optim = Momentum( optim = Momentum(
learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters()) learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters())
model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels) model.prepare(
optim,
CrossEntropy(),
Accuracy(topk=(1, 2)),
inputs,
labels,
device=FLAGS.device)
if FLAGS.resume is not None: if FLAGS.resume is not None:
model.load(FLAGS.resume) model.load(FLAGS.resume)
...@@ -131,6 +138,8 @@ def main(): ...@@ -131,6 +138,8 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser("CNN training on MNIST") parser = argparse.ArgumentParser("CNN training on MNIST")
parser.add_argument(
"--device", type=str, default='gpu', help="device to use, gpu or cpu")
parser.add_argument( parser.add_argument(
"-d", "--dynamic", action='store_true', help="enable dygraph mode") "-d", "--dynamic", action='store_true', help="enable dygraph mode")
parser.add_argument( parser.add_argument(
......
...@@ -20,26 +20,36 @@ import pickle ...@@ -20,26 +20,36 @@ import pickle
import numpy as np import numpy as np
import six import six
import warnings import warnings
from collections import Iterable import tqdm
from collections import OrderedDict
from collections import OrderedDict from collections import Iterable
from paddle import fluid from paddle import fluid
from paddle.fluid.framework import in_dygraph_mode, Variable from paddle.fluid.framework import in_dygraph_mode, Variable
from paddle.fluid.executor import global_scope from paddle.fluid.executor import global_scope
from paddle.fluid.io import is_belong_to_optimizer from paddle.fluid.io import is_belong_to_optimizer
from paddle.fluid.dygraph.base import to_variable from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.layers.utils import flatten from paddle.fluid.layers.utils import flatten
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.base import role_maker
import distributed from paddle.fluid.io import DataLoader, Dataset
from distributed import DistributedBatchSampler
from paddle.fluid.io import DataLoader from distributed import DistributedBatchSampler, _all_gather, prepare_distributed_context, _parallel_context_initialized
from metrics import Metric from metrics import Metric
from callbacks import config_callbacks from callbacks import config_callbacks
__all__ = ['Model', 'Loss', 'CrossEntropy', 'Input'] __all__ = ['Model', 'Loss', 'CrossEntropy', 'Input', 'set_device']
def set_device(device):
assert isinstance(device, six.string_types) and device.lower() in ['cpu', 'gpu'], \
"Expected device in ['cpu', 'gpu'], but got {}".format(device)
place = fluid.CUDAPlace(ParallelEnv().dev_id) \
if device.lower() == 'gpu' and fluid.is_compiled_with_cuda() \
else fluid.CPUPlace()
return place
def to_list(value): def to_list(value):
...@@ -85,18 +95,6 @@ def extract_args(func): ...@@ -85,18 +95,6 @@ def extract_args(func):
return inspect.getargspec(func)[0] return inspect.getargspec(func)[0]
def init_context(backend):
assert isinstance(backend, str) and backend.lower() in ['dynamic', 'static'], \
"Expected backend in ['dynamic', 'static'], but got {}".format(backend)
place = fluid.CUDAPlace(distributed.Env().dev_id) if \
distributed.Env().nranks > 1 else fluid.CUDAPlace(0)
distributed.prepare_distributed_context(place)
backend = backend.lower()
if backend == 'dynamic':
fluid.enable_dygraph(place)
class Input(fluid.dygraph.Layer): class Input(fluid.dygraph.Layer):
def __init__(self, shape=None, dtype=None, name=None): def __init__(self, shape=None, dtype=None, name=None):
super(Input, self).__init__() super(Input, self).__init__()
...@@ -162,8 +160,8 @@ class StaticGraphAdapter(object): ...@@ -162,8 +160,8 @@ class StaticGraphAdapter(object):
'test_batch': 0 'test_batch': 0
} }
self._nranks = distributed.Env().nranks self._nranks = ParallelEnv().nranks
self._local_rank = distributed.Env().local_rank self._local_rank = ParallelEnv().local_rank
@property @property
def mode(self): def mode(self):
...@@ -268,7 +266,8 @@ class StaticGraphAdapter(object): ...@@ -268,7 +266,8 @@ class StaticGraphAdapter(object):
# When using static learning rate, static-graph would make it # When using static learning rate, static-graph would make it
# a persistable var named 'unique_name.generate("learning_rate")', # a persistable var named 'unique_name.generate("learning_rate")',
# However, dygraph wouldn't save it. # However, dygraph wouldn't save it.
if var.name not in state: continue if var.name not in state:
continue
else: else:
# moment and other accumulators # moment and other accumulators
if var.name not in converted_state: if var.name not in converted_state:
...@@ -367,8 +366,8 @@ class StaticGraphAdapter(object): ...@@ -367,8 +366,8 @@ class StaticGraphAdapter(object):
for metric, state in zip(self.model._metrics, metric_states): for metric, state in zip(self.model._metrics, metric_states):
# cut off padding size # cut off padding size
if self.mode != 'train' and self.model._test_dataloader is not None \ if self.mode != 'train' and self.model._test_dataloader is not None \
and isinstance(self.model._test_dataloader, DataLoader) \ and isinstance(self.model._test_dataloader, DataLoader) \
and self._nranks > 1: and self._nranks > 1:
total_size = len(self.model._test_dataloader.dataset) total_size = len(self.model._test_dataloader.dataset)
# TODO: fixme if have better way to get batch size # TODO: fixme if have better way to get batch size
samples = state[0].shape[0] samples = state[0].shape[0]
...@@ -408,7 +407,7 @@ class StaticGraphAdapter(object): ...@@ -408,7 +407,7 @@ class StaticGraphAdapter(object):
for op in list(prog.global_block().ops): for op in list(prog.global_block().ops):
prog.global_block()._remove_op(0) prog.global_block()._remove_op(0)
if mode == 'train' and self.model._optimizer \ if mode == 'train' and self.model._optimizer \
and self.model._optimizer._learning_rate_map: and self.model._optimizer._learning_rate_map:
# HACK workaround learning rate map issue # HACK workaround learning rate map issue
lr_var = self.model._optimizer._learning_rate_map[self._orig_prog] lr_var = self.model._optimizer._learning_rate_map[self._orig_prog]
self.model._optimizer._learning_rate_map[prog] = lr_var self.model._optimizer._learning_rate_map[prog] = lr_var
...@@ -427,14 +426,9 @@ class StaticGraphAdapter(object): ...@@ -427,14 +426,9 @@ class StaticGraphAdapter(object):
losses = self.model._loss_function(outputs, labels) losses = self.model._loss_function(outputs, labels)
if self._nranks > 1 and mode != 'train': if self._nranks > 1 and mode != 'train':
outputs = [ outputs = [_all_gather(o, self._nranks) for o in outputs]
distributed._all_gather(o, self._nranks) for o in outputs
]
if mode != 'test': if mode != 'test':
labels = [ labels = [_all_gather(l, self._nranks) for l in labels]
distributed._all_gather(l, self._nranks)
for l in labels
]
if mode != 'test': if mode != 'test':
for metric in self.model._metrics: for metric in self.model._metrics:
...@@ -471,31 +465,22 @@ class StaticGraphAdapter(object): ...@@ -471,31 +465,22 @@ class StaticGraphAdapter(object):
if compiled_prog is not None: if compiled_prog is not None:
return compiled_prog return compiled_prog
device = self.model._device assert self.model._place is not None, \
device_ids = self.model._device_ids "device is not set, please call `model.prepare()` first"
if device.lower() == 'gpu': place = self.model._place
places = fluid.cuda_places(device_ids)
else:
places = fluid.cpu_places(len(device_ids) if device_ids else None)
# XXX *ALL WEIGHTS* should be initialized upon model construction # XXX *ALL WEIGHTS* should be initialized upon model construction
# even if `forward()` may run different code path for different mode # even if `forward()` may run different code path for different mode
# therefore startup program only needs to run once # therefore startup program only needs to run once
if self._executor is None: if self._executor is None:
if self._nranks > 1 and device.lower() == 'gpu':
gpu_id = int(distributed.Env().dev_id)
place = fluid.CUDAPlace(gpu_id) if device.lower(
) == 'gpu' else fluid.CPUPlace()
else:
place = places[0]
self._executor = fluid.Executor(place) self._executor = fluid.Executor(place)
# XXX incremental initialization # XXX incremental initialization
uninitialized = [] uninitialized = []
for var_py in self._startup_prog.list_vars(): for var_py in self._startup_prog.list_vars():
var = fluid.global_scope().find_var(var_py.name) var = fluid.global_scope().find_var(var_py.name)
if not var_py.name.startswith('nccl_id') and var and \ if not var_py.name.startswith('nccl_id') and var and \
var.get_tensor()._is_initialized(): var.get_tensor()._is_initialized():
continue continue
uninitialized.append(var_py) uninitialized.append(var_py)
...@@ -506,14 +491,8 @@ class StaticGraphAdapter(object): ...@@ -506,14 +491,8 @@ class StaticGraphAdapter(object):
if self._nranks < 2: if self._nranks < 2:
compiled_prog = fluid.CompiledProgram(prog) compiled_prog = fluid.CompiledProgram(prog)
else: else:
compiled_prog = prog #fleet.main_program compiled_prog = prog
if len(places) > 1:
loss_name = None
if mode == 'train' and self._loss_endpoint is not None:
loss_name = self._loss_endpoint.name
compiled_prog = compiled_prog.with_data_parallel(
loss_name=loss_name, places=places)
self._compiled_progs[mode] = compiled_prog self._compiled_progs[mode] = compiled_prog
...@@ -521,8 +500,8 @@ class DynamicGraphAdapter(object): ...@@ -521,8 +500,8 @@ class DynamicGraphAdapter(object):
def __init__(self, model): def __init__(self, model):
super(DynamicGraphAdapter, self).__init__() super(DynamicGraphAdapter, self).__init__()
self.model = model self.model = model
self._nranks = distributed.Env().nranks self._nranks = ParallelEnv().nranks
self._local_rank = distributed.Env().local_rank self._local_rank = ParallelEnv().local_rank
self._merge_count = { self._merge_count = {
'eval_total': 0, 'eval_total': 0,
'test_total': 0, 'test_total': 0,
...@@ -531,7 +510,13 @@ class DynamicGraphAdapter(object): ...@@ -531,7 +510,13 @@ class DynamicGraphAdapter(object):
} }
if self._nranks > 1: if self._nranks > 1:
self.ddp_model = distributed.DistributedDataParallel(self.model) stradegy = fluid.dygraph.parallel.ParallelStrategy()
stradegy.nranks = ParallelEnv().nranks
stradegy.local_rank = ParallelEnv().local_rank
stradegy.trainer_endpoints = ParallelEnv().trainer_endpoints
stradegy.current_endpoint = ParallelEnv().current_endpoint
self.ddp_model = fluid.dygraph.parallel.DataParallel(self.model,
stradegy)
@property @property
def mode(self): def mode(self):
...@@ -551,15 +536,14 @@ class DynamicGraphAdapter(object): ...@@ -551,15 +536,14 @@ class DynamicGraphAdapter(object):
if labels is not None: if labels is not None:
labels = [to_variable(l) for l in to_list(labels)] labels = [to_variable(l) for l in to_list(labels)]
if self._nranks > 1: if self._nranks > 1:
outputs = self.ddp_model.forward( outputs = self.ddp_model.forward(*[to_variable(x) for x in inputs])
* [to_variable(x) for x in inputs])
losses = self.model._loss_function(outputs, labels) losses = self.model._loss_function(outputs, labels)
final_loss = fluid.layers.sum(losses) final_loss = fluid.layers.sum(losses)
final_loss = self.ddp_model.scale_loss(final_loss) final_loss = self.ddp_model.scale_loss(final_loss)
final_loss.backward() final_loss.backward()
self.ddp_model.apply_collective_grads() self.ddp_model.apply_collective_grads()
else: else:
outputs = self.model.forward(* [to_variable(x) for x in inputs]) outputs = self.model.forward(*[to_variable(x) for x in inputs])
losses = self.model._loss_function(outputs, labels) losses = self.model._loss_function(outputs, labels)
final_loss = fluid.layers.sum(losses) final_loss = fluid.layers.sum(losses)
final_loss.backward() final_loss.backward()
...@@ -570,11 +554,11 @@ class DynamicGraphAdapter(object): ...@@ -570,11 +554,11 @@ class DynamicGraphAdapter(object):
for metric in self.model._metrics: for metric in self.model._metrics:
metric_outs = metric.add_metric_op( metric_outs = metric.add_metric_op(
to_list(outputs), to_list(labels)) to_list(outputs), to_list(labels))
m = metric.update(* [to_numpy(m) for m in to_list(metric_outs)]) m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
metrics.append(m) metrics.append(m)
return ([to_numpy(l) for l in losses], metrics) \ return ([to_numpy(l) for l in losses], metrics) \
if len(metrics) > 0 else [to_numpy(l) for l in losses] if len(metrics) > 0 else [to_numpy(l) for l in losses]
def eval(self, inputs, labels=None): def eval(self, inputs, labels=None):
super(Model, self.model).eval() super(Model, self.model).eval()
...@@ -582,17 +566,14 @@ class DynamicGraphAdapter(object): ...@@ -582,17 +566,14 @@ class DynamicGraphAdapter(object):
inputs = to_list(inputs) inputs = to_list(inputs)
if labels is not None: if labels is not None:
labels = [to_variable(l) for l in to_list(labels)] labels = [to_variable(l) for l in to_list(labels)]
outputs = self.model.forward(* [to_variable(x) for x in inputs]) outputs = self.model.forward(*[to_variable(x) for x in inputs])
if self.model._loss_function: if self.model._loss_function:
losses = self.model._loss_function(outputs, labels) losses = self.model._loss_function(outputs, labels)
else: else:
losses = [] losses = []
if self._nranks > 1: if self._nranks > 1:
outputs = [ outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)]
distributed._all_gather(o, self._nranks) labels = [_all_gather(l, self._nranks) for l in labels]
for o in to_list(outputs)
]
labels = [distributed._all_gather(l, self._nranks) for l in labels]
metrics = [] metrics = []
for metric in self.model._metrics: for metric in self.model._metrics:
# cut off padding value. # cut off padding value.
...@@ -602,10 +583,8 @@ class DynamicGraphAdapter(object): ...@@ -602,10 +583,8 @@ class DynamicGraphAdapter(object):
samples = outputs[0].shape[0] samples = outputs[0].shape[0]
current_count = self._merge_count.get(self.mode + '_total', 0) current_count = self._merge_count.get(self.mode + '_total', 0)
if current_count + samples >= total_size: if current_count + samples >= total_size:
outputs = [ outputs = [o[:total_size - current_count] for o in outputs]
o[:total_size - metric.count[0]] for o in outputs labels = [l[:total_size - current_count] for l in labels]
]
labels = [l[:total_size - metric.count[0]] for l in labels]
self._merge_count[self.mode + '_total'] = 0 self._merge_count[self.mode + '_total'] = 0
self._merge_count[self.mode + self._merge_count[self.mode +
'_batch'] = total_size - current_count '_batch'] = total_size - current_count
...@@ -614,24 +593,22 @@ class DynamicGraphAdapter(object): ...@@ -614,24 +593,22 @@ class DynamicGraphAdapter(object):
self._merge_count[self.mode + '_batch'] = samples self._merge_count[self.mode + '_batch'] = samples
metric_outs = metric.add_metric_op(to_list(outputs), labels) metric_outs = metric.add_metric_op(to_list(outputs), labels)
m = metric.update(* [to_numpy(m) for m in to_list(metric_outs)]) m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
metrics.append(m) metrics.append(m)
# To be consistent with static graph # To be consistent with static graph
# return empty loss if loss_function is None # return empty loss if loss_function is None
return ([to_numpy(l) for l in losses], metrics) \ return ([to_numpy(l) for l in losses], metrics) \
if len(metrics) > 0 else [to_numpy(l) for l in losses] if len(metrics) > 0 else [to_numpy(l) for l in losses]
def test(self, inputs): def test(self, inputs):
super(Model, self.model).eval() super(Model, self.model).eval()
self.mode = 'test' self.mode = 'test'
inputs = [to_variable(x) for x in to_list(inputs)] inputs = [to_variable(x) for x in to_list(inputs)]
outputs = self.model.forward(*inputs) outputs = self.model.forward(*inputs)
if self._nranks > 2: if self._nranks > 1 and isinstance(self.model._place, fluid.CUDAPlace):
outputs = [ outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)]
distributed._all_gather(o, self._nranks)
for o in to_list(outputs)
]
return [to_numpy(o) for o in to_list(outputs)] return [to_numpy(o) for o in to_list(outputs)]
def parameters(self, *args, **kwargs): def parameters(self, *args, **kwargs):
...@@ -714,14 +691,9 @@ class Model(fluid.dygraph.Layer): ...@@ -714,14 +691,9 @@ class Model(fluid.dygraph.Layer):
self._loss_weights = None self._loss_weights = None
self._optimizer = None self._optimizer = None
self._device = None self._device = None
self._device_ids = None
self._optimizer = None self._optimizer = None
self._test_dataloader = None self._test_dataloader = None
# init multiple gpus context
self._place = fluid.CUDAPlace(distributed.Env().dev_id) \
if distributed.Env().nranks > 1 else fluid.CUDAPlace(0)
# init backend # init backend
if fluid.in_dygraph_mode(): if fluid.in_dygraph_mode():
self._adapter = DynamicGraphAdapter(self) self._adapter = DynamicGraphAdapter(self)
...@@ -738,7 +710,7 @@ class Model(fluid.dygraph.Layer): ...@@ -738,7 +710,7 @@ class Model(fluid.dygraph.Layer):
return self._adapter.test(*args, **kwargs) return self._adapter.test(*args, **kwargs)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
if distributed.get_local_rank() == 0: if ParallelEnv().local_rank == 0:
return self._adapter.save(*args, **kwargs) return self._adapter.save(*args, **kwargs)
def load(self, path, skip_mismatch=False, reset_optimizer=False): def load(self, path, skip_mismatch=False, reset_optimizer=False):
...@@ -816,8 +788,7 @@ class Model(fluid.dygraph.Layer): ...@@ -816,8 +788,7 @@ class Model(fluid.dygraph.Layer):
metrics=None, metrics=None,
inputs=None, inputs=None,
labels=None, labels=None,
device=None, device=None):
device_ids=None):
""" """
FIXME: add comments FIXME: add comments
Args: Args:
...@@ -840,19 +811,37 @@ class Model(fluid.dygraph.Layer): ...@@ -840,19 +811,37 @@ class Model(fluid.dygraph.Layer):
device (str|None): specify device type, 'CPU' or 'GPU'. device (str|None): specify device type, 'CPU' or 'GPU'.
If None, automatically select device according to If None, automatically select device according to
installation package version. installation package version.
device_ids (list[int]|None): specify device index. If None,
the available device will be obtained from the environment
variable when the model is executed: If the GPU is used, the
currently available device ID is obtained from the environment
variable FLAGS_selected_gpus or CUDA_VISIBLE_DEVICES when the
model is executed; CPU, when the model is executed,
the currently available CPU number is obtained from the
environment variable CPU_NUM. For example, export CPU_NUM=4,
if the environment variable is not set, the executor will add
the variable to the environment variable and set its value to 1.
The default is None.
""" """
if isinstance(device, fluid.CUDAPlace) or \
(isinstance(device, six.string_types) and device.lower() == 'gpu') \
or (device is None and fluid.is_compiled_with_cuda()):
if isinstance(device, fluid.CUDAPlace):
self._place = device
else:
self._place = fluid.CUDAPlace(ParallelEnv().dev_id) \
if ParallelEnv().nranks > 1 else fluid.CUDAPlace(0)
global _parallel_context_initialized
if ParallelEnv().nranks > 1 and not _parallel_context_initialized:
if fluid.in_dygraph_mode():
fluid.disable_dygraph()
fluid.enable_dygraph(self._place)
fluid.dygraph.parallel.prepare_context()
else:
prepare_distributed_context(self._place)
_parallel_context_initialized = True
elif isinstance(device, fluid.CPUPlace):
self._place = device
elif (isinstance(device, six.string_types) and device.lower() == 'cpu') \
or (device is None):
self._place = fluid.CPUPlace()
else:
raise ValueError(
"Expected device in ('gpu', 'cpu', fluid.CUDAPlace, fluid.CPUPlace, None), \
but got {}".format(device))
self._optimizer = optimizer self._optimizer = optimizer
if loss_function: if loss_function:
if not isinstance(loss_function, Loss): if not isinstance(loss_function, Loss):
...@@ -869,27 +858,22 @@ class Model(fluid.dygraph.Layer): ...@@ -869,27 +858,22 @@ class Model(fluid.dygraph.Layer):
metrics = metrics or [] metrics = metrics or []
for metric in to_list(metrics): for metric in to_list(metrics):
assert isinstance(metric, Metric), \ assert isinstance(metric, Metric), \
"{} is not sub class of Metric".format(metric.__class__.__name__) "{} is not sub class of Metric".format(
metric.__class__.__name__)
self._metrics = to_list(metrics) self._metrics = to_list(metrics)
self._inputs = to_list(inputs) if not isinstance(inputs, dict) else [ self._inputs = to_list(inputs) if not isinstance(inputs, dict) else [
inputs[n] for n in extract_args(self.forward) if n != 'self' inputs[n] for n in extract_args(self.forward) if n != 'self'
] ]
self._labels = to_list(labels) self._labels = to_list(labels)
self._device = device
if device is None:
self._device = 'GPU' if fluid.is_compiled_with_cuda() else 'CPU'
self._device_ids = device_ids
if not in_dygraph_mode(): if not in_dygraph_mode():
self._adapter.prepare() self._adapter.prepare()
def fit( def fit(
self, self,
train_dataset=None, train_data=None,
eval_dataset=None, eval_data=None,
train_loader=None,
eval_loader=None,
batch_size=1, batch_size=1,
epochs=1, epochs=1,
eval_freq=1, eval_freq=1,
...@@ -904,60 +888,77 @@ class Model(fluid.dygraph.Layer): ...@@ -904,60 +888,77 @@ class Model(fluid.dygraph.Layer):
""" """
FIXME: add more comments and usage FIXME: add more comments and usage
Args: Args:
train_loader (DataLoader): An iterable data loader is used for train. train_data (Dataset|DataLoader): An iterable data loader is used for
eval_loader (DataLoader): An iterable data loader is used for train. An instance of paddle.fluid.io.Dataset or
evaluation at the end of epoch. If None, will not do evaluation. paddle.fluid.io.Dataloader is recomended.
eval_data (Dataset|DataLoader): An iterable data loader is used for
evaluation at the end of epoch. If None, will not do evaluation.
An instance of paddle.fluid.io.Dataset or paddle.fluid.io.Dataloader
is recomended.
batch_size (int): Integer number. The batch size of train_data and eval_data.
When train_data and eval_data are both the instance of Dataloader, this
parameter will be ignored.
epochs (int): Integer number. The number of epochs to train the model. epochs (int): Integer number. The number of epochs to train the model.
eval_freq (int): The frequency, in number of epochs, an evalutation eval_freq (int): The frequency, in number of epochs, an evalutation
is performed. is performed.
log_freq (int): The frequency, in number of steps, the training logs log_freq (int): The frequency, in number of steps, the training logs
is printed. are printed.
save_dir(str|None): The directory to save checkpoint during training. save_dir(str|None): The directory to save checkpoint during training.
If None, will not save checkpoint. If None, will not save checkpoint.
save_freq (int): The frequency, in number of epochs, to save checkpoint. save_freq (int): The frequency, in number of epochs, to save checkpoint.
verbose (int): The verbosity mode, should be 0, 1, or 2. verbose (int): The verbosity mode, should be 0, 1, or 2.
0 = silent, 1 = progress bar, 2 = one line per epoch. 0 = silent, 1 = progress bar, 2 = one line per epoch.
drop_last (bool): whether drop the last incomplete batch of train_data
when dataset size is not divisible by the batch size. When train_data
is an instance of Dataloader, this parameter will be ignored.
shuffle (bool): whther to shuffle train_data. When train_data is an instance
of Dataloader, this parameter will be ignored.
num_workers (int): the number of subprocess to load data, 0 for no subprocess
used and loading data in main process. When train_data and eval_data are
both the instance of Dataloader, this parameter will be ignored.
callbacks (Callback|None): A list of `Callback` instances to apply callbacks (Callback|None): A list of `Callback` instances to apply
during training. If None, `ProgBarLogger` and `ModelCheckpoint` during training. If None, `ProgBarLogger` and `ModelCheckpoint`
are automatically inserted. are automatically inserted.
""" """
assert train_dataset is not None or train_loader is not None, \ assert train_data is not None, \
"train_dataset or train_loader must be given" "train_data must be given!"
assert (train_loader is not None and train_dataset is None) or \
(train_loader is None and train_dataset is not None), \
"train_dataset should not be set when train_loader is given"
if fluid.in_dygraph_mode(): if fluid.in_dygraph_mode():
feed_list = None feed_list = None
else: else:
feed_list = [x.forward() for x in self._inputs + self._labels] feed_list = [x.forward() for x in self._inputs + self._labels]
if train_loader is None: if isinstance(train_data, Dataset):
train_sampler = DistributedBatchSampler( train_sampler = DistributedBatchSampler(
train_dataset, train_data,
batch_size=batch_size, batch_size=batch_size,
shuffle=shuffle, shuffle=shuffle,
drop_last=drop_last) drop_last=drop_last)
train_loader = DataLoader( train_loader = DataLoader(
train_dataset, train_data,
batch_sampler=train_sampler, batch_sampler=train_sampler,
places=self._place, places=self._place,
feed_list=feed_list, feed_list=feed_list,
num_workers=num_workers, num_workers=num_workers,
return_list=True) return_list=True)
else:
train_loader = train_data
if eval_loader is None and eval_dataset is not None: if eval_data is not None and isinstance(eval_data, Dataset):
eval_sampler = DistributedBatchSampler( eval_sampler = DistributedBatchSampler(
eval_dataset, batch_size=batch_size) eval_data, batch_size=batch_size)
eval_loader = DataLoader( eval_loader = DataLoader(
eval_dataset, eval_data,
batch_sampler=eval_sampler, batch_sampler=eval_sampler,
places=self._place, places=self._place,
feed_list=feed_list, feed_list=feed_list,
num_workers=num_workers, num_workers=num_workers,
return_list=True) return_list=True)
elif eval_data is not None:
eval_loader = eval_data
else:
eval_loader = None
do_eval = eval_loader is not None do_eval = eval_loader is not None
self._test_dataloader = eval_loader self._test_dataloader = eval_loader
...@@ -974,84 +975,256 @@ class Model(fluid.dygraph.Layer): ...@@ -974,84 +975,256 @@ class Model(fluid.dygraph.Layer):
verbose=verbose, verbose=verbose,
metrics=self._metrics_name(), ) metrics=self._metrics_name(), )
def _run_one_epoch(data_loader, callbacks, mode):
size = len(data_loader) if hasattr(data_loader,
'__len__') else None
logs = {
'steps': size,
'metrics_name': metrics_name,
}
for step, data in enumerate(data_loader):
# data might come from different types of data_loader and have
# different format, as following:
# 1. DataLoader in static graph:
# [[input1, input2, ..., label1, lable2, ...]]
# 2. DataLoader in dygraph
# [input1, input2, ..., label1, lable2, ...]
# 3. custumed iterator yield concated inputs and labels:
# [input1, input2, ..., label1, lable2, ...]
# 4. custumed iterator yield seperated inputs and labels:
# ([input1, input2, ...], [label1, lable2, ...])
# To handle all of these, flatten (nested) list to list.
data = flatten(data)
# LoDTensor.shape is callable, where LoDTensor comes from
# DataLoader in static graph
batch_size = data[0].shape()[0] if callable(data[
0].shape) else data[0].shape[0]
cbks.on_batch_begin(mode, step, logs)
if mode == 'train':
outs = self.train(data[:len(self._inputs)],
data[len(self._inputs):])
else:
outs = self.eval(data[:len(self._inputs)],
data[len(self._inputs):])
# losses
loss = outs[0] if self._metrics else outs
metrics = [[l[0] for l in loss]]
# metrics
for metric in self._metrics:
res = metric.accumulate()
metrics.extend(to_list(res))
assert len(metrics_name) == len(metrics)
for k, v in zip(metrics_name, metrics):
logs[k] = v
logs['step'] = step
if mode == 'train' or self._adapter._merge_count.get(
mode + '_batch', 0) <= 0:
logs['batch_size'] = batch_size * distributed.Env().nranks
else:
logs['batch_size'] = self._adapter._merge_count[mode +
'_batch']
cbks.on_batch_end(mode, step, logs)
self._reset_metrics()
return logs
cbks.on_begin('train') cbks.on_begin('train')
for epoch in range(epochs): for epoch in range(epochs):
cbks.on_epoch_begin(epoch)
# FIXME: adapt to DataLoader # FIXME: adapt to DataLoader
loader = train_loader loader = train_loader
if not isinstance(train_loader, Iterable): if not isinstance(train_loader, Iterable):
loader = train_loader() loader = train_loader()
logs = _run_one_epoch(loader, cbks, 'train') logs = self._run_one_epoch(
cbks.on_epoch_end(epoch, logs) loader, cbks, 'train', metrics_name, epoch=epoch)
if do_eval and epoch % eval_freq == 0: if do_eval and epoch % eval_freq == 0:
cbks.on_begin('eval', logs)
# FIXME: adapt to DataLoader # FIXME: adapt to DataLoader
loader = eval_loader loader = eval_loader
if not isinstance(eval_loader, Iterable): if not isinstance(eval_loader, Iterable):
loader = eval_loader() loader = eval_loader()
logs = _run_one_epoch(eval_loader(), cbks, 'eval')
eval_steps = len(loader) if hasattr(loader,
'__len__') else None
cbks.on_begin('eval', {
'steps': eval_steps,
'metrics_name': metrics_name
})
logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name)
cbks.on_end('eval', logs) cbks.on_end('eval', logs)
cbks.on_end('train', logs) cbks.on_end('train', logs)
self._test_dataloader = None
def evaluate(
self,
eval_data,
batch_size=1,
log_freq=10,
verbose=2,
num_workers=0,
callbacks=None, ):
"""
FIXME: add more comments and usage
Args:
eval_data (Dataset|DataLoader): An iterable data loader is used for
evaluation. An instance of paddle.fluid.io.Dataset or
paddle.fluid.io.Dataloader is recomended.
batch_size (int): Integer number. The batch size of train_data and eval_data.
When train_data and eval_data are both the instance of Dataloader, this
parameter will be ignored.
log_freq (int): The frequency, in number of steps, the eval logs
are printed.
verbose (int): The verbosity mode, should be 0, 1, or 2.
0 = silent, 1 = progress bar, 2 = one line per epoch.
num_workers (int): The number of subprocess to load data, 0 for no subprocess
used and loading data in main process. When train_data and eval_data are
both the instance of Dataloader, this parameter will be ignored.
callbacks (Callback|None): A list of `Callback` instances to apply
during training. If None, `ProgBarLogger` and `ModelCheckpoint`
are automatically inserted.
"""
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in self._inputs + self._labels]
if eval_data is not None and isinstance(eval_data, Dataset):
eval_sampler = DistributedBatchSampler(
eval_data, batch_size=batch_size)
eval_loader = DataLoader(
eval_data,
batch_sampler=eval_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
else:
eval_loader = eval_data
self._test_dataloader = eval_loader
metrics_name = self._metrics_name()
cbks = config_callbacks(
callbacks,
model=self,
log_freq=log_freq,
verbose=verbose,
metrics=self._metrics_name(), )
loader = eval_loader
if not isinstance(eval_loader, Iterable):
loader = eval_loader()
eval_steps = len(loader) if hasattr(loader, '__len__') else None
cbks.on_begin('eval',
{'steps': eval_steps,
'metrics_name': metrics_name})
logs = self._run_one_epoch(loader, cbks, 'eval', metrics_name)
cbks.on_end('eval', logs)
self._test_dataloader = None
eval_result = {}
for k in self._metrics_name():
eval_result[k] = logs[k]
return eval_result
def predict(self, test_data, batch_size=1, num_workers=0):
"""
FIXME: add more comments and usage
Args:
test_data (Dataset|DataLoader): An iterable data loader is used for
predict. An instance of paddle.fluid.io.Dataset or paddle.fluid.io.Dataloader
is recomended.
batch_size (int): Integer number. The batch size of train_data and eval_data.
When train_data and eval_data are both the instance of Dataloader, this
parameter will be ignored.
num_workers (int): the number of subprocess to load data, 0 for no subprocess
used and loading data in main process. When train_data and eval_data are
both the instance of Dataloader, this parameter will be ignored.
"""
if fluid.in_dygraph_mode():
feed_list = None
else:
feed_list = [x.forward() for x in self._inputs + self._labels]
if test_data is not None and isinstance(test_data, Dataset):
test_sampler = DistributedBatchSampler(
test_data, batch_size=batch_size)
test_loader = DataLoader(
test_data,
batch_sampler=test_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
else:
test_loader = test_data
self._test_dataloader = test_loader
loader = test_loader
if not isinstance(test_loader, Iterable):
loader = test_loader()
outputs = None
for data in tqdm.tqdm(loader):
if not fluid.in_dygraph_mode():
data = data[0]
outs = self.test(*data)
if outputs is None:
outputs = outs
else:
outputs = [
np.vstack([x, outs[i]]) for i, x in enumerate(outputs)
]
self._test_dataloader = None
if test_loader is not None and self._adapter._nranks > 1 \
and isinstance(test_loader, DataLoader):
outputs = [o[:len(test_loader.dataset)] for o in outputs]
return outputs
def set_eval_data(self, eval_data):
"""
Args:
eval_data (Dataset|DataLoader|None): An iterable data loader is used for
eval. An instance of paddle.fluid.io.Dataset or
paddle.fluid.io.Dataloader is recomended.
"""
assert isinstance(
eval_data,
DataLoader), "eval_data must be a instance of Dataloader!"
self._test_dataloader = eval_data
def _run_one_epoch(self,
data_loader,
callbacks,
mode,
metrics_name,
epoch=None):
size = len(data_loader) if hasattr(data_loader, '__len__') else None
logs = {
'steps': size,
'metrics_name': metrics_name,
}
if mode == 'train':
assert epoch is not None, 'when mode is train, epoch must be given'
callbacks.on_epoch_begin(epoch)
for step, data in enumerate(data_loader):
# data might come from different types of data_loader and have
# different format, as following:
# 1. DataLoader in static graph:
# [[input1, input2, ..., label1, lable2, ...]]
# 2. DataLoader in dygraph
# [input1, input2, ..., label1, lable2, ...]
# 3. custumed iterator yield concated inputs and labels:
# [input1, input2, ..., label1, lable2, ...]
# 4. custumed iterator yield seperated inputs and labels:
# ([input1, input2, ...], [label1, lable2, ...])
# To handle all of these, flatten (nested) list to list.
data = flatten(data)
# LoDTensor.shape is callable, where LoDTensor comes from
# DataLoader in static graph
batch_size = data[0].shape()[0] if callable(data[
0].shape) else data[0].shape[0]
callbacks.on_batch_begin(mode, step, logs)
if mode == 'train':
outs = self.train(data[:len(self._inputs)],
data[len(self._inputs):])
else:
outs = self.eval(data[:len(self._inputs)],
data[len(self._inputs):])
# losses
loss = outs[0] if self._metrics else outs
metrics = [[l[0] for l in loss]]
# metrics
for metric in self._metrics:
res = metric.accumulate()
metrics.extend(to_list(res))
assert len(metrics_name) == len(metrics)
for k, v in zip(metrics_name, metrics):
logs[k] = v
logs['step'] = step
if mode == 'train' or self._adapter._merge_count.get(
mode + '_batch', 0) <= 0:
logs['batch_size'] = batch_size * ParallelEnv().nranks
else:
logs['batch_size'] = self._adapter._merge_count[mode +
'_batch']
callbacks.on_batch_end(mode, step, logs)
self._reset_metrics()
if mode == 'train':
assert epoch is not None, 'when mode is train, epoch must be given'
callbacks.on_epoch_end(epoch)
return logs
def _reset_metrics(self): def _reset_metrics(self):
for metric in self._metrics: for metric in self._metrics:
......
...@@ -28,7 +28,7 @@ import contextlib ...@@ -28,7 +28,7 @@ import contextlib
import paddle import paddle
from paddle import fluid from paddle import fluid
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from model import Model, CrossEntropy, Input, Loss, init_context from model import Model, CrossEntropy, Input, Loss, set_device
from metrics import Accuracy from metrics import Accuracy
from callbacks import ProgBarLogger from callbacks import ProgBarLogger
from paddle.fluid.io import BatchSampler, DataLoader from paddle.fluid.io import BatchSampler, DataLoader
...@@ -139,9 +139,30 @@ class MyCrossEntropy(Loss): ...@@ -139,9 +139,30 @@ class MyCrossEntropy(Loss):
return [loss1, loss2] return [loss1, loss2]
class TestMnistDataset(MnistDataset):
def __init__(self):
super(TestMnistDataset, self).__init__(mode='test')
def __getitem__(self, idx):
return self.images[idx],
def __len__(self):
return len(self.images)
def get_predict_accuracy(pred, gt):
pred = np.argmax(pred, -1)
gt = np.array(gt)
correct = pred[:, np.newaxis] == gt
return np.sum(correct) / correct.shape[0]
class TestModel(unittest.TestCase): class TestModel(unittest.TestCase):
def fit(self, dynamic, is_mlp=False): def fit(self, dynamic, is_mlp=False):
init_context('dynamic' if dynamic else 'static') device = set_device('gpu')
fluid.enable_dygraph(device) if dynamic else None
im_shape = (-1, 784) im_shape = (-1, 784)
batch_size = 128 batch_size = 128
...@@ -151,19 +172,31 @@ class TestModel(unittest.TestCase): ...@@ -151,19 +172,31 @@ class TestModel(unittest.TestCase):
train_dataset = MnistDataset(mode='train') train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test') val_dataset = MnistDataset(mode='test')
test_dataset = TestMnistDataset()
model = MNIST() if not is_mlp else MLP() model = MNIST() if not is_mlp else MLP()
optim = fluid.optimizer.Momentum( optim = fluid.optimizer.Momentum(
learning_rate=0.01, momentum=.9, parameter_list=model.parameters()) learning_rate=0.01, momentum=.9, parameter_list=model.parameters())
loss = CrossEntropy() if not is_mlp else MyCrossEntropy() loss = CrossEntropy() if not is_mlp else MyCrossEntropy()
model.prepare(optim, loss, Accuracy(), inputs, labels) model.prepare(optim, loss, Accuracy(), inputs, labels, device=device)
cbk = ProgBarLogger(50) cbk = ProgBarLogger(50)
model.fit(train_dataset, model.fit(train_dataset,
val_dataset, val_dataset,
epochs=2, epochs=2,
batch_size=batch_size, batch_size=batch_size,
callbacks=cbk) callbacks=cbk)
eval_result = model.evaluate(val_dataset, batch_size=batch_size)
output = model.predict(test_dataset, batch_size=batch_size)
np.testing.assert_equal(output[0].shape[0], len(test_dataset))
acc = get_predict_accuracy(output[0], val_dataset.labels)
np.testing.assert_allclose(acc, eval_result['acc'])
def test_fit_static(self): def test_fit_static(self):
self.fit(False) self.fit(False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册