提交 a501b8c8 编写于 作者: L LielinJiang

refine code

......@@ -18,6 +18,7 @@ import copy
from progressbar import ProgressBar
from paddle.fluid.dygraph.parallel import Env
def config_callbacks(callbacks=None,
model=None,
batch_size=None,
......@@ -26,6 +27,7 @@ def config_callbacks(callbacks=None,
log_freq=2,
verbose=2,
save_freq=1,
save_dir=None,
metrics=None,
mode='train'):
cbks = callbacks or []
......@@ -34,7 +36,7 @@ def config_callbacks(callbacks=None,
cbks = cbks + [ProgBarLogger(log_freq, verbose=verbose)]
if not any(isinstance(k, ModelCheckpoint) for k in cbks):
cbks = cbks + [ModelCheckpoint(save_freq)]
cbks = cbks + [ModelCheckpoint(save_freq, save_dir)]
cbk_list = CallbackList(cbks)
cbk_list.set_model(model)
......@@ -209,7 +211,7 @@ class ProgBarLogger(Callback):
def on_train_batch_end(self, step, logs=None):
logs = logs or {}
self.train_step = step
self.train_step += 1
if self.train_step % self.log_freq == 0 and self.verbose and Env().local_rank == 0:
# if steps is not None, last step will update in on_epoch_end
......@@ -247,21 +249,24 @@ class ProgBarLogger(Callback):
class ModelCheckpoint(Callback):
def __init__(self, save_freq=1, save_file='output'):
def __init__(self, save_freq=1, save_dir=None):
self.save_freq = save_freq
self.save_file = save_file
self.save_dir = save_dir
def on_epoch_begin(self, epoch=None, logs=None):
self.epoch = epoch
def _is_save(self):
return self.model and self.save_dir and Env().local_rank == 0
def on_epoch_end(self, epoch, logs=None):
if self.model and self.epoch % self.save_freq == 0 and Env().local_rank == 0:
path = '{}/{}'.format(self.save_file, epoch)
if self._is_save() and self.epoch % self.save_freq == 0:
path = '{}/{}'.format(self.save_dir, epoch)
print('save checkpoint at {}'.format(path))
self.model.save(path)
def on_train_end(self, logs=None):
if self.model and Env().local_rank == 0:
path = '{}/final'.format(self.save_file)
if self._is_save():
path = '{}/final'.format(self.save_dir)
print('save checkpoint at {}'.format(path))
self.model.save(path)
......@@ -13,6 +13,7 @@
# limitations under the License.
import os
import sys
import six
import time
import math
import socket
......@@ -21,10 +22,13 @@ import numpy as np
from paddle import fluid
from paddle.fluid.layers import collective
from paddle.fluid.dygraph.parallel import Env
from paddle.fluid.dygraph.parallel import Env, ParallelStrategy
from paddle.fluid.io import BatchSampler
_parallel_context_initialized = False
class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset.
......@@ -100,3 +104,97 @@ class DistributedBatchSampler(BatchSampler):
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return collective._c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
def wait_server_ready(endpoints):
assert not isinstance(endpoints, six.string_types)
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with contextlib.closing(
socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
time.sleep(3)
else:
break
def init_communicator(program, rank, nranks, wait_port,
current_endpoint, endpoints):
if nranks < 2:
return
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block()
nccl_id_var = block.create_var(
name=fluid.unique_name.generate('nccl_id'),
persistable=True,
type=fluid.core.VarDesc.VarType.RAW)
block.append_op(
type='c_gen_nccl_id',
inputs={},
outputs={'Out': nccl_id_var},
attrs={
'rank': rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints
})
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': 0,
})
def prepare_distributed_context(place=None):
if place is None:
place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \
else fluid.CUDAPlace(0)
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
if strategy.nranks < 2:
return
global _parallel_context_initialized
if not _parallel_context_initialized and isinstance(place, fluid.CUDAPlace):
def _init_context():
communicator_prog = fluid.Program()
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks,
True, strategy.current_endpoint, strategy.trainer_endpoints)
exe = fluid.Executor(place)
exe.run(communicator_prog)
if fluid.in_dygraph_mode():
fluid.disable_dygraph()
_init_context()
fluid.enable_dygraph(place)
else:
_init_context()
else:
assert ("Only support CUDAPlace for now.")
_parallel_context_initialized = True
return strategy
\ No newline at end of file
......@@ -107,24 +107,26 @@ class MNIST(Model):
def main():
init_context('dynamic' if FLAGS.dynamic else 'static')
train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test')
inputs = [Input([None, 784], 'float32', name='image')]
labels = [Input([None, 1], 'int64', name='label')]
model = MNIST()
optim = Momentum(
learning_rate=FLAGS.lr,
momentum=.9,
parameter_list=model.parameters())
learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters())
model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels)
if FLAGS.resume is not None:
model.load(FLAGS.resume)
model.fit(train_dataset, val_dataset, epochs=FLAGS.epoch, batch_size=FLAGS.batch_size)
model.fit(train_dataset,
val_dataset,
epochs=FLAGS.epoch,
batch_size=FLAGS.batch_size,
save_dir='mnist_checkpoint')
if __name__ == '__main__':
......
......@@ -21,7 +21,7 @@ import numpy as np
import six
import warnings
from collections import Iterable, OrderedDict
from collections import Iterable
from paddle import fluid
from paddle.fluid.framework import in_dygraph_mode, Variable
from paddle.fluid.executor import global_scope
......@@ -32,14 +32,12 @@ from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.incubate.fleet.base import role_maker
from paddle.fluid.io import DataLoader
from distributed import DistributedBatchSampler, _all_gather
from distributed import DistributedBatchSampler, _all_gather, prepare_distributed_context, _parallel_context_initialized
from metrics import Metric
from callbacks import config_callbacks
__all__ = ['Model', 'Loss', 'CrossEntropy', 'Input']
_parallel_context_inited = False
def to_list(value):
if value is None:
......@@ -142,9 +140,13 @@ class StaticGraphAdapter(object):
self._progs = {}
self._compiled_progs = {}
self._merge_count = {'eval_total': 0, 'test_total': 0,
'eval_batch': 0, 'test_batch': 0}
self._merge_count = {
'eval_total': 0,
'test_total': 0,
'eval_batch': 0,
'test_batch': 0
}
self._nranks = Env().nranks
self._local_rank = Env().local_rank
......@@ -251,7 +253,8 @@ class StaticGraphAdapter(object):
# When using static learning rate, static-graph would make it
# a persistable var named 'unique_name.generate("learning_rate")',
# However, dygraph wouldn't save it.
if var.name not in state: continue
if var.name not in state:
continue
else:
# moment and other accumulators
if var.name not in converted_state:
......@@ -350,16 +353,19 @@ class StaticGraphAdapter(object):
for metric, state in zip(self.model._metrics, metric_states):
# cut off padding size
if self.mode != 'train' and self.model._test_dataloader is not None \
and isinstance(self.model._test_dataloader, DataLoader) \
and self._nranks > 1:
and isinstance(self.model._test_dataloader, DataLoader) \
and self._nranks > 1:
total_size = len(self.model._test_dataloader.dataset)
# TODO: fixme if have better way to get batch size
samples = state[0].shape[0]
current_count = self._merge_count.get(self.mode + '_total', 0)
if current_count + samples >= total_size:
state = [s[:total_size - current_count, ...] for s in state]
state = [
s[:total_size - current_count, ...] for s in state
]
self._merge_count[self.mode + '_total'] = 0
self._merge_count[self.mode + '_batch'] = total_size - current_count
self._merge_count[self.mode +
'_batch'] = total_size - current_count
else:
self._merge_count[self.mode + '_total'] += samples
self._merge_count[self.mode + '_batch'] = samples
......@@ -388,17 +394,17 @@ class StaticGraphAdapter(object):
for op in list(prog.global_block().ops):
prog.global_block()._remove_op(0)
if mode == 'train' and self.model._optimizer \
and self.model._optimizer._learning_rate_map:
and self.model._optimizer._learning_rate_map:
# HACK workaround learning rate map issue
lr_var = self.model._optimizer._learning_rate_map[self._orig_prog]
self.model._optimizer._learning_rate_map[prog] = lr_var
losses = []
metrics = []
with fluid.program_guard(prog, self._startup_prog):
if isinstance(self.model._inputs, dict):
ins = [self.model._inputs[n] \
for n in extract_args(self.model.forward) if n != 'self']
ins = [self.model._inputs[n]
for n in extract_args(self.model.forward) if n != 'self']
else:
ins = self.model._inputs
lbls = self.model._labels if self.model._labels else []
......@@ -408,16 +414,17 @@ class StaticGraphAdapter(object):
outputs = to_list(self.model.forward(*inputs))
if mode != 'test' and self.model._loss_function:
losses = self.model._loss_function(outputs, labels)
losses = self.model._loss_function(outputs, labels)
if self._nranks > 1 and mode != 'train':
outputs = [_all_gather(o, self._nranks) for o in outputs]
if mode != 'test':
labels = [_all_gather(l, self._nranks) for l in labels]
if mode != 'test':
for metric in self.model._metrics:
metrics.append(to_list(metric.add_metric_op(outputs, labels)))
metrics.append(
to_list(metric.add_metric_op(outputs, labels)))
if mode == 'train' and self.model._optimizer:
self._loss_endpoint = fluid.layers.sum(losses)
......@@ -427,16 +434,16 @@ class StaticGraphAdapter(object):
dist_strategy = DistributedStrategy()
dist_strategy.mode = "collective"
dist_strategy.collective_mode = "grad_allreduce"
self.model._optimizer = fleet.distributed_optimizer(self.model._optimizer,
strategy=dist_strategy)
self.model._optimizer = fleet.distributed_optimizer(
self.model._optimizer, strategy=dist_strategy)
self.model._optimizer.minimize(self._loss_endpoint)
if mode != 'train': # clone again to put it in test mode
prog = prog.clone(for_test=True)
self._input_vars[mode] = inputs
self._progs[mode] = prog
self._endpoints[mode] = {
"output": outputs,
......@@ -444,7 +451,6 @@ class StaticGraphAdapter(object):
"metric": metrics
}
def _compile_and_initialize(self, prog, mode):
compiled_prog = self._compiled_progs.get(mode, None)
if compiled_prog is not None:
......@@ -464,7 +470,8 @@ class StaticGraphAdapter(object):
if self._executor is None:
if self._nranks > 1 and device.lower() == 'gpu':
gpu_id = int(Env().dev_id)
place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
place = fluid.CUDAPlace(
gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
else:
place = places[0]
self._executor = fluid.Executor(place)
......@@ -473,7 +480,7 @@ class StaticGraphAdapter(object):
for var_py in self._startup_prog.list_vars():
var = fluid.global_scope().find_var(var_py.name)
if not var_py.name.startswith('nccl_id') and var and \
var.get_tensor()._is_initialized():
var.get_tensor()._is_initialized():
continue
uninitialized.append(var_py)
......@@ -484,7 +491,7 @@ class StaticGraphAdapter(object):
if self._nranks < 2:
compiled_prog = fluid.CompiledProgram(prog)
else:
compiled_prog = prog#fleet.main_program
compiled_prog = prog
if len(places) > 1:
loss_name = None
......@@ -501,8 +508,12 @@ class DynamicGraphAdapter(object):
self.model = model
self._nranks = Env().nranks
self._local_rank = Env().local_rank
self._merge_count = {'eval_total': 0, 'test_total': 0,
'eval_batch': 0, 'test_batch': 0}
self._merge_count = {
'eval_total': 0,
'test_total': 0,
'eval_batch': 0,
'test_batch': 0
}
if self._nranks > 1:
stradegy = fluid.dygraph.parallel.ParallelStrategy()
......@@ -510,7 +521,8 @@ class DynamicGraphAdapter(object):
stradegy.local_rank = Env().local_rank
stradegy.trainer_endpoints = Env().trainer_endpoints
stradegy.current_endpoint = Env().current_endpoint
self.ddp_model = fluid.dygraph.parallel.DataParallel(self.model, stradegy)
self.ddp_model = fluid.dygraph.parallel.DataParallel(
self.model, stradegy)
@property
def mode(self):
......@@ -546,12 +558,13 @@ class DynamicGraphAdapter(object):
self.model.clear_gradients()
metrics = []
for metric in self.model._metrics:
metric_outs = metric.add_metric_op(to_list(outputs), to_list(labels))
metric_outs = metric.add_metric_op(
to_list(outputs), to_list(labels))
m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
metrics.append(m)
return ([to_numpy(l) for l in losses], metrics) \
if len(metrics) > 0 else [to_numpy(l) for l in losses]
if len(metrics) > 0 else [to_numpy(l) for l in losses]
def eval(self, inputs, labels=None):
super(Model, self.model).eval()
......@@ -576,15 +589,17 @@ class DynamicGraphAdapter(object):
samples = outputs[0].shape[0]
current_count = self._merge_count.get(self.mode + '_total', 0)
if current_count + samples >= total_size:
outputs = [o[:total_size - metric.count[0]] for o in outputs]
outputs = [
o[:total_size - metric.count[0]] for o in outputs
]
labels = [l[:total_size - metric.count[0]] for l in labels]
self._merge_count[self.mode + '_total'] = 0
self._merge_count[self.mode + '_batch'] = total_size - current_count
self._merge_count[self.mode +
'_batch'] = total_size - current_count
else:
self._merge_count[self.mode + '_total'] += samples
self._merge_count[self.mode + '_batch'] = samples
metric_outs = metric.add_metric_op(to_list(outputs), labels)
m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)])
metrics.append(m)
......@@ -592,7 +607,7 @@ class DynamicGraphAdapter(object):
# To be consistent with static graph
# return empty loss if loss_function is None
return ([to_numpy(l) for l in losses], metrics) \
if len(metrics) > 0 else [to_numpy(l) for l in losses]
if len(metrics) > 0 else [to_numpy(l) for l in losses]
def test(self, inputs):
super(Model, self.model).eval()
......@@ -689,19 +704,18 @@ class Model(fluid.dygraph.Layer):
# init multiple gpus context
self._place = fluid.CUDAPlace(Env().dev_id) \
if Env().nranks > 1 else fluid.CUDAPlace(0)
if Env().nranks > 1 else fluid.CUDAPlace(0)
global _parallel_context_inited
if Env().nranks > 1 and not _parallel_context_inited:
global _parallel_context_initialized
if Env().nranks > 1 and not _parallel_context_initialized:
if fluid.in_dygraph_mode():
fluid.disable_dygraph()
fluid.enable_dygraph(self._place)
fluid.dygraph.parallel.prepare_context()
else:
fluid.enable_dygraph(self._place)
fluid.dygraph.parallel.prepare_context()
fluid.disable_dygraph()
_parallel_context_inited = True
prepare_distributed_context(self._place)
_parallel_context_initialized = True
# init backend
if fluid.in_dygraph_mode():
......@@ -850,13 +864,14 @@ class Model(fluid.dygraph.Layer):
metrics = metrics or []
for metric in to_list(metrics):
assert isinstance(metric, Metric), \
"{} is not sub class of Metric".format(metric.__class__.__name__)
"{} is not sub class of Metric".format(
metric.__class__.__name__)
self._metrics = to_list(metrics)
self._inputs = inputs
self._labels = labels
self._device = device
if device is None:
self._device = 'GPU' if fluid.is_compiled_with_cuda() else 'CPU'
self._device_ids = device_ids
......@@ -873,6 +888,7 @@ class Model(fluid.dygraph.Layer):
epochs=1,
eval_freq=1,
log_freq=10,
save_dir=None,
save_freq=1,
verbose=2,
drop_last=False,
......@@ -882,25 +898,32 @@ class Model(fluid.dygraph.Layer):
"""
FIXME: add more comments and usage
Args:
train_loader (DataLoader): an iterable data loader is used for train.
eval_loader (DataLoader): an iterable data loader is used for
train_dataset (Dataset): An instance of paddle.fluid.io.Dataset.
eval_dataset (Dataset): An instance of paddle.fluid.io.Dataset.
train_loader (DataLoader): An iterable data loader is used for train.
eval_loader (DataLoader): An iterable data loader is used for
evaluation at the end of epoch. If None, will not do evaluation.
epochs (int): number of epochs to train the model.
eval_freq (int): evaluation frequency in epoch.
log_freq (int): frequency to print log during training.
save_freq (int): frequency to save checkpoint during training.
verbose (int): verbosity mode, should be 0, 1, or 2.
epochs (int): Integer number. The number of epochs to train the model.
eval_freq (int): The frequency, in number of epochs, an evalutation
is performed.
log_freq (int): The frequency, in number of steps, the training logs
is printed.
save_dir(str|None): The directory to save checkpoint during training.
If None, will not save checkpoint.
save_freq (int): The frequency, in number of epochs, to save checkpoint.
verbose (int): The verbosity mode, should be 0, 1, or 2.
0 = silent, 1 = progress bar, 2 = one line per epoch.
callbacks (Callback|None): list of `Callback` instances to apply
during training.
callbacks (Callback|None): A list of `Callback` instances to apply
during training. If None, `ProgBarLogger` and `ModelCheckpoint`
are automatically inserted.
"""
assert train_dataset is not None or train_loader is not None, \
"train_dataset or train_loader must be given"
"train_dataset or train_loader must be given"
assert (train_loader is not None and train_dataset is None) or \
(train_loader is None and train_dataset is not None), \
"train_dataset should not be set when train_loader is given"
"train_dataset should not be set when train_loader is given"
if fluid.in_dygraph_mode():
feed_list = None
......@@ -908,42 +931,48 @@ class Model(fluid.dygraph.Layer):
feed_list = [x.forward() for x in self._inputs + self._labels]
if train_loader is None:
train_sampler = DistributedBatchSampler(train_dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last)
train_loader = DataLoader(train_dataset,
batch_sampler=train_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
train_sampler = DistributedBatchSampler(
train_dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last)
train_loader = DataLoader(
train_dataset,
batch_sampler=train_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
if eval_loader is None and eval_dataset is not None:
eval_sampler = DistributedBatchSampler(eval_dataset,
batch_size=batch_size)
eval_loader = DataLoader(eval_dataset,
batch_sampler=eval_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
eval_sampler = DistributedBatchSampler(
eval_dataset, batch_size=batch_size)
eval_loader = DataLoader(
eval_dataset,
batch_sampler=eval_sampler,
places=self._place,
feed_list=feed_list,
num_workers=num_workers,
return_list=True)
do_eval = eval_loader is not None
self._test_dataloader = eval_loader
metrics_name = self._metrics_name()
steps = len(train_loader) if hasattr(train_loader, '__len__') else None
cbks = config_callbacks(
callbacks,
model=self,
epochs=epochs,
steps=None,
steps=steps,
log_freq=log_freq,
save_freq=save_freq,
save_dir=save_dir,
verbose=verbose,
metrics=self._metrics_name(), )
def _run_one_epoch(data_loader, callbacks, mode):
size = data_loader.size if hasattr(data_loader, 'size') else None
size = len(data_loader) if hasattr(data_loader,
'__len__') else None
logs = {
'steps': size,
'metrics_name': metrics_name,
......@@ -969,7 +998,7 @@ class Model(fluid.dygraph.Layer):
for metric in self._metrics:
res = metric.accumulate()
metrics.extend(to_list(res))
assert len(metrics_name) == len(metrics)
for k, v in zip(metrics_name, metrics):
logs[k] = v
......@@ -978,7 +1007,8 @@ class Model(fluid.dygraph.Layer):
if mode == 'train' or self._adapter._merge_count.get(mode + '_batch', 0) <= 0:
logs['batch_size'] = batch_size * Env().nranks
else:
logs['batch_size'] = self._adapter._merge_count[mode + '_batch']
logs['batch_size'] = self._adapter._merge_count[mode +
'_batch']
cbks.on_batch_end(mode, step, logs)
self._reset_metrics()
......@@ -1000,7 +1030,7 @@ class Model(fluid.dygraph.Layer):
loader = eval_loader
if not isinstance(eval_loader, Iterable):
loader = eval_loader()
logs = _run_one_epoch(eval_loader(), cbks, 'eval')
logs = _run_one_epoch(eval_loader, cbks, 'eval')
cbks.on_end('eval', logs)
cbks.on_end('train', logs)
......
......@@ -151,16 +151,18 @@ class TestModel(unittest.TestCase):
train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test')
model = MNIST() if not is_mlp else MLP()
optim = fluid.optimizer.Momentum(
learning_rate=0.01,
momentum=.9,
parameter_list=model.parameters())
learning_rate=0.01, momentum=.9, parameter_list=model.parameters())
loss = CrossEntropy() if not is_mlp else MyCrossEntropy()
model.prepare(optim, loss, Accuracy(), inputs, labels)
cbk = ProgBarLogger(50)
model.fit(train_dataset, val_dataset, epochs=2, batch_size=batch_size, callbacks=cbk)
model.fit(train_dataset,
val_dataset,
epochs=2,
batch_size=batch_size,
callbacks=cbk)
def test_fit_static(self):
self.fit(False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册