未验证 提交 7c92177c 编写于 作者: Z zhaoyingli 提交者: GitHub

[AutoParallel] add callbacks (#47014)

* [AutoParallel] add callbacks

* fix unittest

* fix dist_context

* fix engine

* fix cmakelist

* fix unittest's returns

* fix cmakelist
上级 b9a2f29c
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import paddle
from paddle.hapi.callbacks import ProgBarLogger, ModelCheckpoint, LRScheduler, CallbackList, Callback
from .interface import CollectionNames, get_collection
def config_callbacks(callbacks=None,
engine=None,
batch_size=None,
epochs=None,
steps=None,
log_freq=2,
verbose=2,
save_freq=1,
save_dir=None,
metrics=None,
acc_step=1,
mode='train'):
cbks = callbacks or []
cbks = cbks if isinstance(cbks, (list, tuple)) else [cbks]
if not any(isinstance(k, ProgBarLogger) for k in cbks) and verbose:
cbks = [ProgBarLoggerAuto(log_freq, verbose=verbose)] + cbks
if not any(isinstance(k, LRScheduler) for k in cbks):
cbks = [LRSchedulerAuto()] + cbks
if not any(isinstance(k, ModelCheckpoint) for k in cbks):
cbks = cbks + [ModelCheckpointAuto(save_freq, save_dir)]
if not any(isinstance(k, Profiler) for k in cbks) and verbose == 3:
cbks = cbks + [Profiler(timer_only=True)]
if not any(isinstance(k, History) for k in cbks):
cbks = cbks + [History()]
for i, k in enumerate(cbks):
if isinstance(k, ProgBarLogger):
cbks[i] = ProgBarLoggerAuto(k.log_freq, k.verbose)
if isinstance(k, LRScheduler):
cbks[i] = LRSchedulerAuto(k.by_step, k.by_epoch)
if isinstance(k, ModelCheckpoint):
cbks[i] = ModelCheckpointAuto(k.save_freq, k.save_dir)
cbk_list = CallbackList(cbks)
cbk_list.set_model(engine)
metrics = metrics or [] if mode != 'test' else []
params = {
'batch_size': batch_size,
'epochs': epochs,
'steps': steps,
'verbose': verbose,
'metrics': metrics,
'acc_step': acc_step,
}
cbk_list.set_params(params)
return cbk_list
class ProgBarLoggerAuto(ProgBarLogger):
def __init__(self, log_freq=1, verbose=2):
super(ProgBarLoggerAuto, self).__init__(log_freq, verbose)
def _is_print(self):
return True
def _updates(self, logs, mode):
values = []
metrics = getattr(self, '%s_metrics' % (mode))
progbar = getattr(self, '%s_progbar' % (mode))
steps = getattr(self, '%s_step' % (mode))
for k in metrics:
if k in logs:
values.append((k, logs[k]))
if 'lr' in logs:
values.append(('lr', logs['lr']))
fetches_logs = logs.get('fetches', {})
collect_logging = get_collection(CollectionNames.LOGGING)
for name, var in collect_logging:
k = name or var.name
if k in fetches_logs:
values.append((k, fetches_logs[k]))
out_logs = logs.get('outputs', {})
for k in out_logs:
values.append((k, out_logs[k]))
if self.verbose == 3 and hasattr(self, '_%s_timer' % (mode)):
timer = getattr(self, '_%s_timer' % (mode))
cnt = timer['count'] if timer['count'] > 0 else 1.0
samples = timer['samples'] if timer['samples'] > 0 else 1.0
values.append(
('avg_reader_cost', "%.5f sec" % (timer['data_time'] / cnt)))
values.append(
('avg_batch_cost', "%.5f sec" % (timer['batch_time'] / cnt)))
values.append(
('ips', "%.5f samples/sec" %
(samples / (timer['data_time'] + timer['batch_time']))))
timer['count'] = 0
timer['samples'] = 0
timer['data_time'] = 0.
timer['batch_time'] = 0.
progbar.update(steps, values)
def on_eval_batch_end(self, step, logs=None):
logs = logs or {}
self.eval_step += 1
samples = self.params['batch_size']
self.evaled_samples += samples
self._eval_timer['batch_time'] += (
time.time() - self._eval_timer['batch_data_end_time'])
self._eval_timer['count'] += 1
samples = self.params['batch_size']
self._eval_timer['samples'] += samples
if self._is_print() and self.eval_step % self.log_freq == 0:
if self.eval_steps is None or self.eval_step < self.eval_steps:
self._updates(logs, 'eval')
self._eval_timer['batch_start_time'] = time.time()
class LRSchedulerAuto(LRScheduler):
def __init__(self, by_step=True, by_epoch=False):
super(LRSchedulerAuto, self).__init__(by_step, by_epoch)
def on_epoch_begin(self, epoch=None, logs=None):
self.acc_step = self.params["acc_step"]
self.epoch = epoch
self.train_step = 0
def on_train_batch_end(self, step, logs=None):
self.train_step += 1
if self.by_step and self.train_step % self.acc_step == 0:
if self.model._optimizer and \
hasattr(self.model._optimizer, '_learning_rate') and \
isinstance(self.model._optimizer._learning_rate,
paddle.optimizer.lr.LRScheduler):
self.model._optimizer._learning_rate.step()
class History(Callback):
def __init__(self):
self.history = {}
def on_train_begin(self, logs=None):
self.epoch = []
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.epoch.append(epoch)
for k, v in logs.items():
self.history.setdefault(k, []).append(v)
self.model.history = self
class Profiler(Callback):
def __init__(self, *args, **kwargs):
self.prof = paddle.profiler.Profiler(*args, **kwargs)
def on_epoch_begin(self, epoch=None, logs=None):
self.epoch = epoch
self.train_step = 0
self.batch_size = self.params["batch_size"]
self.steps = self.params['steps']
def on_train_begin(self, logs=None):
self.prof.start()
def on_train_batch_end(self, step, logs=None):
self.train_step += 1
self.prof.step(num_samples=self.batch_size)
print("step {}:{}".format(self.train_step,
self.prof.step_info(unit='samples')))
def on_train_end(self, logs=None):
self.prof.stop()
self.prof.summary()
class ModelCheckpointAuto(ModelCheckpoint):
def __init__(self, *args, **kwargs):
super(ModelCheckpointAuto, self).__init__(*args, **kwargs)
def _is_save(self):
return self.model and self.save_dir
def on_epoch_end(self, epoch, logs=None):
if self._is_save() and (self.epoch + 1) % self.save_freq == 0:
path = '{}/epoch{}'.format(self.save_dir, epoch)
print('save checkpoint at {}'.format(os.path.abspath(path)))
self.model.save(path)
def on_train_end(self, logs=None):
if self._is_save():
path = '{}/final'.format(self.save_dir)
print('save checkpoint at {}'.format(os.path.abspath(path)))
self.model.save(path)
......@@ -74,7 +74,6 @@ class DistributedContext:
self._serial_optimizer = None
self._serial_feed_vars = {}
self._serial_fetch_vars = {}
self._lr_optimizer = None # record the optimzier holding lr_scheduler
# Data members related to the program
self._dist_tensors_for_program = {}
......@@ -870,7 +869,7 @@ class DistributedContext:
"_serial_ordered_nodes", "_serial_ordered_tensor_nodes", \
"_serial_ordered_op_nodes", "_original_serial_loss", \
"_original_serial_feed_vars", "_original_serial_fetch_vars", \
"_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_lr_optimizer", \
"_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_serial_optimizer", \
"_backup_serial_main_program_stack", "_backup_serial_startup_program_stack", \
"_pass_context"]:
setattr(result, k, v)
......
......@@ -21,7 +21,7 @@ from collections import defaultdict
import paddle
import paddle.utils as utils
from paddle import fluid, profiler, static
from paddle import fluid, static
from paddle.metric import Metric
from paddle.static import InputSpec
from paddle.fluid import core
......@@ -33,6 +33,7 @@ from paddle.fluid.framework import _current_expected_place as _get_device
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.distributed import fleet
from .callbacks import config_callbacks
from .converter import Converter
from .helper import ProgramHelper
from .cluster import Cluster, get_default_cluster
......@@ -41,11 +42,12 @@ from .parallelizer_v2 import Parallelizer
from .dist_op import DistributedOperator
from .dist_saver import DistributedSaver
from .dist_loader import DistributedDataLoaderFromGenerator, DistributedDataLoader
from .utils import to_list, get_logger, get_dist_attr
from .utils import to_list, get_dist_attr, get_lr
from .process_group import new_process_group, get_all_process_groups
from .dist_context import DistributedContext, get_default_distributed_context
from .strategy import Strategy
from .interface import CollectionNames, get_collection
from ..utils.log_utils import get_logger
class Engine:
......@@ -200,6 +202,8 @@ class Engine:
self._dygraph_mode = False
self._tuning = self._strategy.tuning
self.history = None
def _prepare_data_spec(self, data, split, batch_size):
inputs_spec = []
labels_spec = []
......@@ -402,24 +406,23 @@ class Engine:
lr=None,
fetch_names=None,
fetch_indices=None,
profiler_log="",
mode=None):
logs = "[{}] ".format(mode)
logs = {}
if epoch is not None:
logs += "epoch: {:d} ".format(epoch)
logs["epoch"] = epoch
if step is not None:
logs += "step: {:d} ".format(step)
logs["step"] = step + 1
if lr is not None:
logs += "lr: {:5e} ".format(lr)
logs["lr"] = lr
group_idx = 0
# logging loss
if mode != "predict":
# logging loss
loss_indices = fetch_indices[group_idx]
assert len(loss_indices) <= 1
for idx in loss_indices:
logs += "loss: {:8f} ".format(outs[idx][0])
logs["loss"] = outs[idx][0]
group_idx += 1
# logging metrics
if mode != "predict":
# logging metrics
metric_vars = self._fetch_vars[mode]["metrics"]
if metric_vars:
for metric in self._metrics:
......@@ -431,61 +434,25 @@ class Engine:
metric.update(*metric_out)
results = metric.accumulate()
for i, res in enumerate(to_list(results)):
logs += "{}: {:8f} ".format(metric.name()[i], res)
logs[metric.name()[i]] = res
group_idx += 1
# Skip logging outputs
if mode == "predict":
# logging outputs
elif mode == "predict":
outputs_indices = fetch_indices[group_idx]
logs_out = {}
for idx in outputs_indices:
logs_out["out%d" % (idx)] = outs[idx]
logs["outputs"] = logs_out
group_idx += 1
# logging user fetches
fetches_logging = get_collection(CollectionNames.LOGGING)
for name, var in fetches_logging:
collect_fetches = get_collection(CollectionNames.FETCHES)
logs_fetch = {}
for name, var in collect_fetches:
if var.name in fetch_names:
idx = fetch_names.index(var.name)
# Use the user defined name for logging
logs += "{}: {} ".format(name, outs[idx])
logs += profiler_log
self._logger.info(logs)
def _prepare_history(self, outs, fetch_indices=None, mode=None):
history = {}
group_idx = 0
# store loss
if mode != "predict":
loss_indices = fetch_indices[group_idx]
loss_values = []
for idx in loss_indices:
loss_values.append(outs[idx][0])
history["loss"] = loss_values
group_idx += 1
# store metrics
if mode != "predict":
metric_vars = self._fetch_vars[mode]["metrics"]
if metric_vars:
for metric in self._metrics:
metrics_indices = fetch_indices[group_idx]
metric_out = []
for idx in metrics_indices:
metric_out.append(outs[idx])
if metric_out:
metric.update(*metric_out)
results = metric.accumulate()
history[tuple(metric.name())] = to_list(results)
group_idx += 1
# store outputs
if mode == "predict":
outputs_indices = fetch_indices[group_idx]
outputs_values = []
for idx in outputs_indices:
outputs_values.append(outs[idx])
history["outputs"] = outputs_values
group_idx += 1
# store user fetches
fetches_indices = fetch_indices[group_idx]
fetches_values = []
for idx in fetches_indices:
fetches_values.append(outs[idx])
history["fetches"] = fetches_values
return history
logs_fetch[name or var.name] = outs[idx]
logs["fetches"] = logs_fetch
return logs
def _prepare_program(self, mode):
# Do the build process
......@@ -674,7 +641,7 @@ class Engine:
mode].dist_startup_programs
self._feed_vars[mode] = self._dist_contexts[mode].serial_feed_vars
self._fetch_vars[mode] = self._dist_contexts[mode].serial_fetch_vars
self._lr_optimizer = self._dist_contexts[mode]._lr_optimizer
self._optimizer = self._dist_contexts[mode]._serial_optimizer
if self._nranks > 1:
# Traverse different rank programs and traverse each op of them,
......@@ -719,7 +686,7 @@ class Engine:
self._dist_attr)
if self._strategy.reinit:
self._logger.info("NOTE: parameters wiil be re-initialized.")
self._logger.info("NOTE: parameters will be re-initialized.")
dist_startup_prog = self._dist_startup_progs[mode][self._cur_rank]
self._executor.run(dist_startup_prog)
......@@ -729,12 +696,16 @@ class Engine:
batch_size=1,
epochs=1,
steps_per_epoch=None,
log_freq=10,
save_dir=None,
save_freq=1,
valid_data=None,
valid_sample_split=None,
valid_freq=1,
valid_steps=None,
collate_fn=None,
callbacks=None):
callbacks=None,
verbose=2):
"""
Trains the model for a fixed number of epochs. If `valid_data` is set,
evaluation will be done at the end of each epoch.
......@@ -810,59 +781,81 @@ class Engine:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert self._mode in self._dist_main_progs, \
"train model is not ready, please call `engine._prepare_program('train')` first."
train_dataloader = self._prepare_dataloader_from_generator(
dataset=train_data,
capacity=70,
# use_double_buffer=use_double_buffer,
iterable=False,
# return_list=return_list,
# use_multiprocess=use_multiprocess,
# drop_last=drop_last,
batch_size=batch_size,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
lr_scheduler = self._get_lr_scheduler(self.main_program)
with profiler.Profiler(timer_only=True) as prof:
for epoch in range(epochs):
for step, _ in enumerate(train_dataloader):
try:
outs = self._executor.run(
self.main_program,
fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy)
except core.EOFException:
break
if lr_scheduler and step % self._k_steps == 0:
lr_scheduler.step()
lr = self._get_lr(self._lr_optimizer)
prof.step()
self._prepare_logger(outs, epoch, step, lr,
fetch_names, fetch_indices,
prof.step_info(), self._mode)
history = self._prepare_history(outs, fetch_indices,
self._mode)
if valid_data and epoch % valid_freq == 0:
self.evaluate(valid_data, valid_sample_split, batch_size,
valid_steps, collate_fn, callbacks)
self._switch_mode("train")
else:
self._reset_metrics()
return history
cbks = config_callbacks(
callbacks,
engine=self,
batch_size=batch_size,
epochs=epochs,
steps=train_dataloader._steps,
log_freq=log_freq,
save_freq=save_freq,
save_dir=save_dir,
verbose=verbose,
metrics=self._metrics_name(),
acc_step=self._k_steps,
)
cbks.on_begin('train')
for epoch in range(epochs):
logs = {}
cbks.on_epoch_begin(epoch)
for step, _ in enumerate(train_dataloader):
cbks.on_batch_begin('train', step, logs)
try:
outs = self._executor.run(
self.main_program,
fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy)
except core.EOFException:
break
lr = get_lr(self._optimizer)
logs = self._prepare_logger(outs, epoch, step, lr, fetch_names,
fetch_indices, self._mode)
cbks.on_batch_end('train', step, logs)
if valid_data and (epoch + 1) % valid_freq == 0:
val_logs = self.evaluate(valid_data, valid_sample_split,
batch_size, valid_steps, log_freq,
collate_fn, callbacks, verbose)
val_logs = {
"val_" + name: val
for name, val in val_logs.items()
}
logs.update(val_logs)
self._switch_mode("train")
else:
self._reset_metrics()
cbks.on_epoch_end(epoch, logs)
cbks.on_end('train', logs)
return self.history
def evaluate(self,
valid_data,
valid_sample_split=None,
batch_size=1,
steps=None,
log_freq=10,
collate_fn=None,
callbacks=None):
callbacks=None,
verbose=2):
"""
Evaluate the loss and metrics of the model on evaluation data.
......@@ -918,25 +911,36 @@ class Engine:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert self._mode in self._dist_main_progs, \
"eval model is not ready, please call `engine._prepare_program('eval')` first."
valid_dataloader = self._prepare_dataloader_from_generator(
dataset=valid_data,
# feed_list=feed_list,
capacity=70,
# use_double_buffer=use_double_buffer,
iterable=False,
# return_list=return_list,
# use_multiprocess=use_multiprocess,
# drop_last=drop_last,
# places=places,
batch_size=batch_size,
# epochs=epochs,
steps_per_epoch=steps,
collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
cbks = config_callbacks(
callbacks,
engine=self,
batch_size=batch_size,
log_freq=log_freq,
verbose=verbose,
metrics=self._metrics_name(),
)
eval_steps = valid_dataloader._steps
cbks.on_begin('eval', {
'steps': eval_steps,
'metrics': self._metrics_name()
})
logs = {}
for step, _ in enumerate(valid_dataloader):
cbks.on_batch_begin('eval', step, logs)
try:
outs = self._executor.run(
self.main_program,
......@@ -945,11 +949,12 @@ class Engine:
return_numpy=self._strategy.return_numpy)
except core.EOFException:
break
self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices, "", self._mode)
history = self._prepare_history(outs, fetch_indices, self._mode)
logs = self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices, self._mode)
cbks.on_batch_end('eval', step, logs)
cbks.on_end('eval', logs)
self._reset_metrics()
return history
return logs
def predict(self,
test_data,
......@@ -957,7 +962,8 @@ class Engine:
batch_size=1,
steps=None,
collate_fn=None,
callbacks=None):
callbacks=None,
verbose=2):
"""
Compute the output predictions on testing data.
......@@ -1010,8 +1016,10 @@ class Engine:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert self._mode in self._dist_main_progs, \
"predict model is not ready, please call `engine._prepare_program('predict')` first."
test_dataloader = self._prepare_dataloader_from_generator(
dataset=test_data,
# feed_list=feed_list,
......@@ -1026,9 +1034,16 @@ class Engine:
# epochs=epochs,
steps_per_epoch=steps,
collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
outputs = []
cbks = config_callbacks(callbacks, engine=self, verbose=verbose)
test_steps = test_dataloader._steps
cbks.on_begin('predict', {'steps': test_steps})
logs = {}
for step, _ in enumerate(test_dataloader):
cbks.on_batch_begin('predict', step, logs)
try:
outs = self._executor.run(
self.main_program,
......@@ -1037,29 +1052,28 @@ class Engine:
return_numpy=self._strategy.return_numpy)
except core.EOFException:
break
self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices, "", self._mode)
history = self._prepare_history(outs, fetch_indices, self._mode)
return history
def dataloader(
self,
dataset,
# return_list=True,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
epochs=1,
steps_per_epoch=None,
sample_split=1,
mode=None):
logs = self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices, self._mode)
cbks.on_batch_end('predict', step, logs)
outputs.append(list(logs["outputs"].values()))
cbks.on_end('predict', logs)
return outputs
def dataloader(self,
dataset,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
epochs=1,
steps_per_epoch=None,
sample_split=1,
mode=None):
if mode is not None:
self.to_mode(mode)
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
......@@ -1199,10 +1213,9 @@ class Engine:
fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy)
self._prepare_logger(outs, None, None, None, fetch_names, fetch_indices,
"", self._mode)
history = self._prepare_history(outs, fetch_indices, self._mode)
return history
logs = self._prepare_logger(outs, None, None, None, fetch_names,
fetch_indices, self._mode)
return logs
def _prepare_dataloader(self,
dataset,
......@@ -1307,13 +1320,6 @@ class Engine:
copy_var.desc.set_original_id(var.desc.original_id())
feed_list.append(copy_var)
# # remove the first three ops if multi run fit/evaluate/predict
# self._op_size = len(dist_main_block.ops)
# if dist_main_block.ops[0].type == 'create_py_reader':
# op_size -= 3
# for _ in range(3):
# dist_main_block._remove_op(0, sync=False)
places = paddle.static.cuda_places()
with static.program_guard(dist_main_prog, dist_startup_prog):
dataloader = DistributedDataLoaderFromGenerator(
......@@ -1334,21 +1340,6 @@ class Engine:
data_parallel_world_size=self._dp_world_sizes,
data_parallel_rank=self._dp_ranks)
self._prepare_reader()
# # move read op from the end of program to the start of program
# new_op_size = len(dist_main_block.ops)
# for _ in range(new_op_size - 1, op_size - 1, -1):
# op = dist_main_block.ops[new_op_size - 1]
# new_op_desc = dist_main_block.desc._prepend_op()
# new_op_desc.copy_from(op.desc)
# new_op = Operator(dist_main_block,
# new_op_desc,
# type=new_op_desc.type())
# dist_main_block.ops.insert(0, new_op)
# dist_op = DistributedOperator(new_op)
# dist_context.add_dist_op_for_program(dist_op)
# for _ in range(new_op_size - op_size):
# dist_main_block._remove_op(new_op_size, sync=False)
# dist_main_block._sync_with_cpp()
return dataloader
def _tune(self, tune_data, tune_sample_split=None, batch_size=1):
......@@ -1441,9 +1432,15 @@ class Engine:
for metric in self._metrics:
metric.reset()
def _metrics_name(self):
metrics_name = ['loss'] if self._loss else []
for m in self._metrics:
metrics_name.extend(to_list(m.name()))
return metrics_name
def _switch_mode(self, mode):
self.to_mode(mode)
self._initialize(mode)
self._optimizer = self._dist_contexts[mode]._serial_optimizer
def to_mode(self, mode):
assert mode in ["train", "eval", "predict"], \
......@@ -1504,20 +1501,19 @@ class Engine:
"""
if training:
assert 'train' in self._serial_main_progs, \
"training model is not ready, please call `engine._prepare_program('train')` first."
serial_program = self._serial_main_progs["train"]
dist_main_prog = self._dist_main_progs["train"][self._cur_rank]
dist_context = self._dist_contexts["train"]
assert self._mode in self._serial_main_progs
serial_program = self._serial_main_progs[self._mode]
dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self._mode]
self._saver.save(path,
serial_program=serial_program,
dist_main_program=dist_main_prog,
dist_context=dist_context)
else:
mode = "predict"
feed_vars = self._feed_vars[mode]['inputs']
fetch_vars = self._fetch_vars[mode]['outputs']
dist_main_prog = self._dist_main_progs[mode][self._cur_rank]
assert "predict" in self._dist_main_progs
feed_vars = self._feed_vars["predict"]['inputs']
fetch_vars = self._fetch_vars["predict"]['outputs']
dist_main_prog = self._dist_main_progs["predict"][self._cur_rank]
self._saver.save_inference_model(path,
feed_vars,
fetch_vars,
......@@ -1575,29 +1571,6 @@ class Engine:
path, load_optimizer)
return self._state_dict, self._dist_attr
@staticmethod
def _get_lr_scheduler(program):
lr_sheduler = None
if hasattr(program, 'lr_sheduler'):
from paddle.optimizer.lr import LRScheduler
lr_sheduler = program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
return lr_sheduler
def _get_lr(self, optimizer):
if isinstance(optimizer, paddle.optimizer.Optimizer):
return optimizer.get_lr()
elif isinstance(optimizer, paddle.fluid.optimizer.Optimizer):
if isinstance(optimizer._learning_rate, float):
return optimizer._learning_rate
else:
return optimizer._learning_rate()
else:
raise TypeError(
"'optimizer' must be object of class `paddle.optimizer.Optimizer`" \
" or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer))
)
@property
def main_program(self):
return self._dist_main_progs[self._mode][self._cur_rank]
......
......@@ -214,8 +214,12 @@ def add_to_collection(collection_name, value, name=None):
if collection_name not in _g_collections:
_g_collections[collection_name] = []
if name is not None:
for _, v in _g_collections[collection_name]:
if v == value: return
_g_collections[collection_name].append((name, value))
else:
for _, v in _g_collections[collection_name]:
if v == value: return
_g_collections[collection_name].append((None, value))
......
......@@ -23,10 +23,10 @@ import logging
import pickle
import time
import paddle
from paddle.fluid.backward import append_backward
from paddle.distributed.utils.log_utils import get_logger
import paddle.fluid.core as core
from paddle.fluid import program_guard
from paddle.fluid.backward import append_backward
from paddle.distributed.utils.log_utils import get_logger
from paddle.distributed.passes import new_pass, PassContext
from .dist_context import DistributedContext
from .dist_context import set_default_distributed_context
......@@ -39,7 +39,6 @@ from .process_group import _g_process_group_map, ProcessGroup
from .utils import make_data_unshard
from .utils import set_grad_var_shape
from .utils import SerialProgramInfo
from .utils import get_logger
from .reshard import Resharder
from .cluster import Cluster
from .mapper import mapping
......@@ -147,7 +146,7 @@ class AutoParallelizer:
with program_guard(main_program, startup_program):
optimize_ops = optimizer.apply_gradients(params_grads)
self._dist_context._lr_optimizer = optimizer
self._dist_context._serial_optimizer = optimizer
# update completion
self._completer = Completer(self._dist_context)
self._completer.complete_update_annotation(main_program)
......
......@@ -24,8 +24,8 @@ from paddle.distributed.passes import new_pass
from .reshard import Resharder
from .partitioner import Partitioner
from .utils import set_grad_var_shape
from .utils import get_logger
from .process_group import get_world_process_group
from ..utils.log_utils import get_logger
class Parallelizer:
......@@ -62,7 +62,7 @@ class Parallelizer:
serial_main_program, serial_startup_program, params_grads = self._apply_pre_optimization(
serial_main_program, serial_startup_program, serial_loss,
serial_optimizer, params_grads)
self._logger.info(
self._logger.debug(
"within parallel apply_pre_optimization time: {}, mode {}".
format(time.time() - time0, self._mode))
# Do logical partition
......@@ -70,14 +70,14 @@ class Parallelizer:
partitioner = Partitioner(self._dist_context, rank)
dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition(
serial_main_program, serial_startup_program, params_grads)
self._logger.info(
self._logger.debug(
"within parallel partitioner time: {}, mode {}".format(
time.time() - time0, self._mode))
# Generate optimizer
time0 = time.time()
self._generate_optimizer(dist_main_prog, dist_startup_prog,
serial_optimizer, dist_params_grads)
self._logger.info(
self._logger.debug(
"within parallel optimizer time: {}, mode {}".format(
time.time() - time0, self._mode))
# Do reshard process
......@@ -86,14 +86,14 @@ class Parallelizer:
resharder = Resharder(dist_main_prog, dist_startup_prog, rank,
self._dist_context, dist_params_grads)
resharder.reshard()
self._logger.info(
self._logger.debug(
"within parallel reshard time: {}, mode {}".format(
time.time() - time0, self._mode))
# Apply post optimization passes
time0 = time.time()
self._apply_post_optimization(dist_main_prog, dist_startup_prog,
rank, dist_params_grads)
self._logger.info(
self._logger.debug(
"within parallel apply_post_optimization time: {}, mode {}".
format(time.time() - time0, self._mode))
else:
......@@ -102,7 +102,7 @@ class Parallelizer:
self._apply_pre_optimization(serial_main_program,
serial_startup_program, None, None,
None)
self._logger.info(
self._logger.debug(
"within parallel apply_pre_optimization time: {}, mode {}".
format(time.time() - time0, self._mode))
# Do logical partition
......@@ -111,14 +111,14 @@ class Parallelizer:
dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition(
serial_main_program, serial_startup_program, [])
# Do reshard process
self._logger.info(
self._logger.debug(
"within parallel partitioner time: {}, mode {}".format(
time.time() - time0, self._mode))
time0 = time.time()
resharder = Resharder(dist_main_prog, dist_startup_prog, rank,
self._dist_context, [], 1)
resharder.reshard()
self._logger.info(
self._logger.debug(
"within parallel reshard time: {}, mode {}".format(
time.time() - time0, self._mode))
# Clone program for test
......@@ -143,7 +143,7 @@ class Parallelizer:
# NOTE: `apply_gradients` will add an Accumulator for a parameter only once,
# but optimizer will be called repeatedly in re-launch, so optimizer need to be copied.
optimizer = copy.deepcopy(optimizer)
self._dist_context._lr_optimizer = optimizer
self._dist_context._serial_optimizer = optimizer
with program_guard(main_program, startup_program):
with unique_name.guard("opt_"):
optimizer_ops = optimizer.apply_gradients(params_grads)
......@@ -170,9 +170,7 @@ class Parallelizer:
startup_program = self._pass_context.get_attr("startup_program")
params_grads = self._pass_context.get_attr("params_grads")
# apply amp pass
# FIXME we disenable amp for eval since it has a little bug with
# eval program and which will be fixed in future
# apply amp pass on train/eval/predict
if self._strategy.amp.enable:
config = copy.deepcopy(self._strategy.amp.to_dict())
config["dist_context"] = self._dist_context
......
......@@ -1587,3 +1587,18 @@ def find_higher_order_backward_op(program):
return True
return False
def get_lr(optimizer):
if isinstance(optimizer, paddle.optimizer.Optimizer):
return optimizer.get_lr()
elif isinstance(optimizer, paddle.fluid.optimizer.Optimizer):
if isinstance(optimizer._learning_rate, float):
return optimizer._learning_rate
else:
return optimizer._learning_rate()
else:
raise TypeError(
"'optimizer' must be object of class `paddle.optimizer.Optimizer`" \
" or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer))
)
......@@ -212,7 +212,7 @@ class ClipGradByGloblNormPass(PassBase):
if self.get_attr("dist_context") is None:
return False
dist_context = self.get_attr("dist_context")
if dist_context._lr_optimizer._grad_clip is None:
if dist_context._serial_optimizer._grad_clip is None:
return False
if self.get_attr("params_grads") is None:
return False
......
......@@ -60,6 +60,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_pass_amp MODULES test_pass_amp ENVS ${dist_ENVS})
set_tests_properties(test_pass_amp PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE"
TIMEOUT 50)
py_test_modules(test_engine_callbacks MODULES test_engine_callbacks)
set_tests_properties(test_engine_callbacks
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50)
py_test_modules(test_while_op_completion MODULES test_while_op_completion
ENVS ${dist_ENVS})
......@@ -100,5 +103,4 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_dist_assign MODULES test_dist_assign)
py_test_modules(test_conditional_block_reshard MODULES
test_conditional_block_reshard)
endif()
......@@ -87,27 +87,27 @@ class TestAMPPass(unittest.TestCase):
def test_amp_pass(self):
# mp2 training
mp_engine = self.get_engine()
outs = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
mp_losses = np.array(outs["loss"])
history = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
mp_losses = np.array(history.history["loss"])
# mp2 amp-o1 training
amp_o1_engine = self.get_engine(True, "o1")
outs = amp_o1_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o1_losses = np.array(outs["loss"])
history = amp_o1_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o1_losses = np.array(history.history["loss"])
amp_o1_engine.evaluate(self.dataset, 3, batch_size=self.batch_size)
# self.check_results(mp_losses, amp_o1_losses)
# mp2 amp-o2 training
amp_o2_engine = self.get_engine(True, "o2")
outs = amp_o2_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o2_losses = np.array(outs["loss"])
history = amp_o2_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o2_losses = np.array(history.history["loss"])
amp_o2_engine.evaluate(self.dataset, 3, batch_size=self.batch_size)
# self.check_results(mp_losses, amp_o2_losses)
# mp2 amp-o3 training
amp_o3_engine = self.get_engine(True, "o3")
outs = amp_o3_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o3_losses = np.array(outs["loss"])
history = amp_o3_engine.fit(self.dataset, 3, batch_size=self.batch_size)
amp_o3_losses = np.array(history.history["loss"])
amp_o3_engine.evaluate(self.dataset, 3, batch_size=self.batch_size)
# self.check_results(mp_losses, amp_o3_losses)
......
......@@ -25,6 +25,7 @@ from paddle.io import Dataset
from paddle.distributed.fleet import auto
paddle.enable_static()
global_process_mesh = auto.ProcessMesh(mesh=[0, 1])
PP_MESH_0 = auto.ProcessMesh([0])
PP_MESH_1 = auto.ProcessMesh([1])
......@@ -113,7 +114,7 @@ class MLPLayer(nn.Layer):
if is_feed:
my_feed_vars.append((out, out.shape))
if is_fetch:
auto.fetch(out, "my_out", logging=True)
auto.fetch(out, "my_fetch", logging=True)
return out
......@@ -140,10 +141,11 @@ def train_high_level(fetch):
# train
train_dataset = MyDataset(batch_num * batch_size)
eval_dataset1 = MyDataset(5 * batch_size)
engine.fit(train_data=train_dataset,
epochs=2,
batch_size=batch_size,
valid_data=eval_dataset1)
history = engine.fit(train_data=train_dataset,
epochs=2,
batch_size=batch_size,
valid_data=eval_dataset1,
log_freq=1)
# eval
eval_dataset2 = MyDataset(batch_size)
......@@ -151,7 +153,7 @@ def train_high_level(fetch):
# predict
test_dataset = MyDataset(batch_size)
engine.predict(test_dataset, batch_size=batch_size)
outputs = engine.predict(test_dataset, batch_size=batch_size)
# save
temp_dir = tempfile.TemporaryDirectory()
......
......@@ -83,25 +83,32 @@ class TestGradientMergePass(unittest.TestCase):
def test_gradient_merge_pass(self):
# dp2 training
dp_engine = self.get_engine()
outs = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
dp_losses = np.array(outs["loss"])
history = dp_engine.fit(self.dataset,
3,
batch_size=self.batch_size,
log_freq=1)
dp_losses = np.array(history.history["loss"])
# dp2 gradient merge training
gm_engine = self.get_engine(True)
outs = gm_engine.fit(self.dataset, 3, batch_size=self.batch_size)
gm_losses = np.array(outs["loss"])
avg_loss = 0
pass_avg_ret_list = []
for i, pass_ret in enumerate(gm_losses):
if (i + 1) % 4 == 0:
avg_loss += pass_ret
pass_avg_ret_list.append(avg_loss / 4)
avg_loss = 0
else:
avg_loss += pass_ret
# self.check_results(dp_losses, np.array(pass_avg_ret_list))
history = gm_engine.fit(self.dataset,
3,
batch_size=self.batch_size,
log_freq=1)
gm_losses = np.array(history.history["loss"])
# avg_loss = 0
# pass_avg_ret_list = []
# for i, pass_ret in enumerate(gm_losses):
# if (i + 1) % 4 == 0:
# avg_loss += pass_ret
# pass_avg_ret_list.append(avg_loss / 4)
# avg_loss = 0
# else:
# avg_loss += pass_ret
# NOTE: every sample data from dataset is all the same
self.check_results(dp_losses, gm_losses)
if __name__ == "__main__":
......
......@@ -78,13 +78,13 @@ class TestRecomputePass(unittest.TestCase):
def test_recompute_pass(self):
# mp2 training
mp_engine = self.get_engine()
outs = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
mp_losses = np.array(outs["loss"])
history = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
mp_losses = np.array(history.history["loss"])
# mp2 recompute training
rc_engine = self.get_engine(True)
outs = rc_engine.fit(self.dataset, 3, batch_size=self.batch_size)
rc_losses = np.array(outs["loss"])
history = rc_engine.fit(self.dataset, 3, batch_size=self.batch_size)
rc_losses = np.array(history.history["loss"])
self.check_results(mp_losses, rc_losses)
......
......@@ -83,25 +83,31 @@ class TestShardingPass(unittest.TestCase):
def test_sharding_pass(self):
# dp2 training
dp_engine = self.get_engine()
dp_losses = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
dp_losses = np.array(dp_losses["loss"])
history = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size)
dp_losses = np.array(history.history["loss"])
# sharding2 stage1 training
sharding1_engine = self.get_engine(True, 1)
outs = sharding1_engine.fit(self.dataset, 3, batch_size=self.batch_size)
sharding1_losses = np.array(outs["loss"])
history = sharding1_engine.fit(self.dataset,
3,
batch_size=self.batch_size)
sharding1_losses = np.array(history.history["loss"])
self.check_results(dp_losses, sharding1_losses)
# sharding2 stage2 training
sharding2_engine = self.get_engine(True, 2)
outs = sharding2_engine.fit(self.dataset, 3, batch_size=self.batch_size)
sharding2_losses = np.array(outs["loss"])
history = sharding2_engine.fit(self.dataset,
3,
batch_size=self.batch_size)
sharding2_losses = np.array(history.history["loss"])
self.check_results(dp_losses, sharding2_losses)
# sharding2 stage3 training
sharding3_engine = self.get_engine(True, 3)
outs = sharding3_engine.fit(self.dataset, 3, batch_size=self.batch_size)
sharding3_losses = np.array(outs["loss"])
history = sharding3_engine.fit(self.dataset,
3,
batch_size=self.batch_size)
sharding3_losses = np.array(history.history["loss"])
self.check_results(dp_losses, sharding3_losses)
......
......@@ -195,7 +195,7 @@ class TestDistributedContext(unittest.TestCase):
"_serial_ordered_nodes", "_serial_ordered_tensor_nodes", \
"_serial_ordered_op_nodes", "_original_serial_loss", \
"_original_serial_feed_vars", "_original_serial_fetch_vars", \
"_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_lr_optimizer", \
"_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_serial_optimizer", \
"_backup_serial_main_program_stack", "_backup_serial_startup_program_stack", \
"_pass_context"]
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import tempfile
import shutil
import time
import random
import paddle
import paddle.vision.transforms as T
from paddle.static import InputSpec
from paddle.distributed.fleet import auto
from paddle.distributed.auto_parallel.callbacks import config_callbacks
from paddle.vision.models import LeNet
from paddle.vision.datasets import MNIST
paddle.enable_static()
class TestCallbacks(unittest.TestCase):
def setUp(self):
self.save_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.save_dir)
def run_callback(self):
epochs = 2
steps = 5
freq = 2
eval_steps = 2
inputs_spec = [InputSpec([None, 1, 28, 28], 'float32', 'image')]
strategy = auto.Strategy()
strategy.auto_mode = "semi"
engine = auto.Engine(LeNet(), strategy=strategy)
engine.prepare(inputs_spec, mode="predict")
cbks = config_callbacks(engine=engine,
batch_size=128,
epochs=epochs,
steps=steps,
log_freq=freq,
verbose=self.verbose,
metrics=['loss', 'acc'],
save_dir=self.save_dir)
cbks.on_begin('train')
logs = {'loss': 50.341673, 'acc': 0.00256}
for epoch in range(epochs):
cbks.on_epoch_begin(epoch)
for step in range(steps):
cbks.on_batch_begin('train', step, logs)
logs['loss'] -= random.random() * 0.1
logs['acc'] += random.random() * 0.1
time.sleep(0.005)
cbks.on_batch_end('train', step, logs)
cbks.on_epoch_end(epoch, logs)
eval_logs = {'eval_loss': 20.341673, 'eval_acc': 0.256}
params = {
'steps': eval_steps,
'metrics': ['eval_loss', 'eval_acc'],
}
cbks.on_begin('eval', params)
for step in range(eval_steps):
cbks.on_batch_begin('eval', step, eval_logs)
eval_logs['eval_loss'] -= random.random() * 0.1
eval_logs['eval_acc'] += random.random() * 0.1
eval_logs['batch_size'] = 2
time.sleep(0.005)
cbks.on_batch_end('eval', step, eval_logs)
cbks.on_end('eval', eval_logs)
test_logs = {}
params = {'steps': eval_steps}
cbks.on_begin('predict', params)
for step in range(eval_steps):
cbks.on_batch_begin('predict', step, test_logs)
test_logs['batch_size'] = 2
time.sleep(0.005)
cbks.on_batch_end('predict', step, test_logs)
cbks.on_end('predict', test_logs)
cbks.on_end('train')
print(engine.history.history)
def test_callback_verbose_0(self):
self.verbose = 0
self.run_callback()
def test_callback_verbose_1(self):
self.verbose = 1
self.run_callback()
def test_callback_verbose_2(self):
self.verbose = 2
self.run_callback()
def test_callback_verbose_3(self):
self.verbose = 3
self.run_callback()
class TestCallbacksEngine(unittest.TestCase):
def setUp(self):
self.save_dir = tempfile.mkdtemp()
transform = T.Compose([T.Transpose(), T.Normalize([127.5], [127.5])])
self.train_dataset = MNIST(mode='train', transform=transform)
self.test_dataset = MNIST(mode='test', transform=transform)
self.prepare_engine()
def tearDown(self):
shutil.rmtree(self.save_dir)
def prepare_engine(self):
model = paddle.vision.models.LeNet()
loss = paddle.nn.CrossEntropyLoss()
base_lr = 1e-3
boundaries = [5, 8]
values = [base_lr * (0.1**i) for i in range(len(boundaries) + 1)]
lr = paddle.optimizer.lr.PiecewiseDecay(boundaries=boundaries,
values=values,
verbose=False)
optimizer = paddle.optimizer.Adam(learning_rate=lr,
parameters=model.parameters())
auto.fetch(model.parameters()[0], "param0", logging=True)
metrics = paddle.metric.Accuracy(topk=(1, 2))
self.engine = auto.Engine(model, loss, optimizer, metrics)
def test_fit_eval(self):
history = self.engine.fit(train_data=self.train_dataset,
valid_data=self.test_dataset,
batch_size=128,
steps_per_epoch=60,
valid_steps=40,
log_freq=20,
save_dir=self.save_dir,
save_freq=1)
print(history.history)
def test_eval(self):
self.engine.evaluate(valid_data=self.test_dataset,
batch_size=128,
steps=40,
log_freq=10)
def test_predict(self):
logger_cbks = paddle.callbacks.ProgBarLogger()
self.engine.predict(test_data=self.test_dataset,
batch_size=128,
callbacks=[logger_cbks])
if __name__ == '__main__':
unittest.main()
......@@ -68,7 +68,7 @@ class TestLRScheduler(TestEngineBase):
def test_lr_scheduler(self):
self.init_engine()
self.engine.fit(self.dataset, batch_size=self.batch_size)
lr = self.engine._lr_optimizer._learning_rate
lr = self.engine._optimizer._learning_rate
assert isinstance(lr, paddle.optimizer.lr.LRScheduler)
......
......@@ -20,7 +20,7 @@ import warnings
import numpy as np
import paddle
from paddle.distributed import ParallelEnv
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.utils import try_import
from .progressbar import ProgressBar
......
......@@ -46,6 +46,7 @@ from paddle.static import InputSpec as Input
import paddle.distributed as dist
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet.base import role_maker
from paddle.autograd import no_grad
from .callbacks import config_callbacks, EarlyStopping
from .model_summary import summary
......@@ -1099,7 +1100,7 @@ class Model(object):
self._update_inputs()
return loss
@paddle.no_grad()
@no_grad()
def eval_batch(self, inputs, labels=None):
"""
Run one evaluating step on a batch of data.
......@@ -1151,7 +1152,7 @@ class Model(object):
self._update_inputs()
return loss
@paddle.no_grad()
@no_grad()
def predict_batch(self, inputs):
"""
Run one predicting step on a batch of data.
......
......@@ -19,7 +19,7 @@ import numbers
import paddle
import paddle.nn as nn
from paddle.static import InputSpec
from paddle.autograd import no_grad
from collections import OrderedDict
__all__ = []
......@@ -229,7 +229,7 @@ def summary(net, input_size=None, dtypes=None, input=None):
return params_info
@paddle.no_grad()
@no_grad()
def summary_string(model, input_size=None, dtypes=None, input=None):
def _all_is_numper(items):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册