diff --git a/python/paddle/distributed/auto_parallel/callbacks.py b/python/paddle/distributed/auto_parallel/callbacks.py new file mode 100644 index 0000000000000000000000000000000000000000..17ce5bd71b8168608f19af4a0f1b9860856c6091 --- /dev/null +++ b/python/paddle/distributed/auto_parallel/callbacks.py @@ -0,0 +1,226 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +import paddle +from paddle.hapi.callbacks import ProgBarLogger, ModelCheckpoint, LRScheduler, CallbackList, Callback +from .interface import CollectionNames, get_collection + + +def config_callbacks(callbacks=None, + engine=None, + batch_size=None, + epochs=None, + steps=None, + log_freq=2, + verbose=2, + save_freq=1, + save_dir=None, + metrics=None, + acc_step=1, + mode='train'): + cbks = callbacks or [] + cbks = cbks if isinstance(cbks, (list, tuple)) else [cbks] + + if not any(isinstance(k, ProgBarLogger) for k in cbks) and verbose: + cbks = [ProgBarLoggerAuto(log_freq, verbose=verbose)] + cbks + + if not any(isinstance(k, LRScheduler) for k in cbks): + cbks = [LRSchedulerAuto()] + cbks + + if not any(isinstance(k, ModelCheckpoint) for k in cbks): + cbks = cbks + [ModelCheckpointAuto(save_freq, save_dir)] + + if not any(isinstance(k, Profiler) for k in cbks) and verbose == 3: + cbks = cbks + [Profiler(timer_only=True)] + + if not any(isinstance(k, History) for k in cbks): + cbks = cbks + [History()] + + for i, k in enumerate(cbks): + if isinstance(k, ProgBarLogger): + cbks[i] = ProgBarLoggerAuto(k.log_freq, k.verbose) + if isinstance(k, LRScheduler): + cbks[i] = LRSchedulerAuto(k.by_step, k.by_epoch) + if isinstance(k, ModelCheckpoint): + cbks[i] = ModelCheckpointAuto(k.save_freq, k.save_dir) + + cbk_list = CallbackList(cbks) + cbk_list.set_model(engine) + metrics = metrics or [] if mode != 'test' else [] + params = { + 'batch_size': batch_size, + 'epochs': epochs, + 'steps': steps, + 'verbose': verbose, + 'metrics': metrics, + 'acc_step': acc_step, + } + cbk_list.set_params(params) + return cbk_list + + +class ProgBarLoggerAuto(ProgBarLogger): + + def __init__(self, log_freq=1, verbose=2): + super(ProgBarLoggerAuto, self).__init__(log_freq, verbose) + + def _is_print(self): + return True + + def _updates(self, logs, mode): + values = [] + metrics = getattr(self, '%s_metrics' % (mode)) + progbar = getattr(self, '%s_progbar' % (mode)) + steps = getattr(self, '%s_step' % (mode)) + + for k in metrics: + if k in logs: + values.append((k, logs[k])) + + if 'lr' in logs: + values.append(('lr', logs['lr'])) + + fetches_logs = logs.get('fetches', {}) + collect_logging = get_collection(CollectionNames.LOGGING) + for name, var in collect_logging: + k = name or var.name + if k in fetches_logs: + values.append((k, fetches_logs[k])) + + out_logs = logs.get('outputs', {}) + for k in out_logs: + values.append((k, out_logs[k])) + + if self.verbose == 3 and hasattr(self, '_%s_timer' % (mode)): + timer = getattr(self, '_%s_timer' % (mode)) + cnt = timer['count'] if timer['count'] > 0 else 1.0 + samples = timer['samples'] if timer['samples'] > 0 else 1.0 + values.append( + ('avg_reader_cost', "%.5f sec" % (timer['data_time'] / cnt))) + values.append( + ('avg_batch_cost', "%.5f sec" % (timer['batch_time'] / cnt))) + values.append( + ('ips', "%.5f samples/sec" % + (samples / (timer['data_time'] + timer['batch_time'])))) + timer['count'] = 0 + timer['samples'] = 0 + timer['data_time'] = 0. + timer['batch_time'] = 0. + + progbar.update(steps, values) + + def on_eval_batch_end(self, step, logs=None): + logs = logs or {} + self.eval_step += 1 + samples = self.params['batch_size'] + self.evaled_samples += samples + + self._eval_timer['batch_time'] += ( + time.time() - self._eval_timer['batch_data_end_time']) + self._eval_timer['count'] += 1 + samples = self.params['batch_size'] + self._eval_timer['samples'] += samples + + if self._is_print() and self.eval_step % self.log_freq == 0: + if self.eval_steps is None or self.eval_step < self.eval_steps: + self._updates(logs, 'eval') + + self._eval_timer['batch_start_time'] = time.time() + + +class LRSchedulerAuto(LRScheduler): + + def __init__(self, by_step=True, by_epoch=False): + super(LRSchedulerAuto, self).__init__(by_step, by_epoch) + + def on_epoch_begin(self, epoch=None, logs=None): + self.acc_step = self.params["acc_step"] + self.epoch = epoch + self.train_step = 0 + + def on_train_batch_end(self, step, logs=None): + self.train_step += 1 + + if self.by_step and self.train_step % self.acc_step == 0: + if self.model._optimizer and \ + hasattr(self.model._optimizer, '_learning_rate') and \ + isinstance(self.model._optimizer._learning_rate, + paddle.optimizer.lr.LRScheduler): + self.model._optimizer._learning_rate.step() + + +class History(Callback): + + def __init__(self): + self.history = {} + + def on_train_begin(self, logs=None): + self.epoch = [] + + def on_epoch_end(self, epoch, logs=None): + logs = logs or {} + self.epoch.append(epoch) + for k, v in logs.items(): + self.history.setdefault(k, []).append(v) + + self.model.history = self + + +class Profiler(Callback): + + def __init__(self, *args, **kwargs): + self.prof = paddle.profiler.Profiler(*args, **kwargs) + + def on_epoch_begin(self, epoch=None, logs=None): + self.epoch = epoch + self.train_step = 0 + self.batch_size = self.params["batch_size"] + self.steps = self.params['steps'] + + def on_train_begin(self, logs=None): + self.prof.start() + + def on_train_batch_end(self, step, logs=None): + self.train_step += 1 + self.prof.step(num_samples=self.batch_size) + print("step {}:{}".format(self.train_step, + self.prof.step_info(unit='samples'))) + + def on_train_end(self, logs=None): + self.prof.stop() + self.prof.summary() + + +class ModelCheckpointAuto(ModelCheckpoint): + + def __init__(self, *args, **kwargs): + super(ModelCheckpointAuto, self).__init__(*args, **kwargs) + + def _is_save(self): + return self.model and self.save_dir + + def on_epoch_end(self, epoch, logs=None): + if self._is_save() and (self.epoch + 1) % self.save_freq == 0: + path = '{}/epoch{}'.format(self.save_dir, epoch) + print('save checkpoint at {}'.format(os.path.abspath(path))) + self.model.save(path) + + def on_train_end(self, logs=None): + if self._is_save(): + path = '{}/final'.format(self.save_dir) + print('save checkpoint at {}'.format(os.path.abspath(path))) + self.model.save(path) diff --git a/python/paddle/distributed/auto_parallel/dist_context.py b/python/paddle/distributed/auto_parallel/dist_context.py index 13da2a80f7b47ed709d3ad82ec20fc0d40eb8284..008c4b987074e5a8e5342495b7aa0b38cbeb8c76 100644 --- a/python/paddle/distributed/auto_parallel/dist_context.py +++ b/python/paddle/distributed/auto_parallel/dist_context.py @@ -74,7 +74,6 @@ class DistributedContext: self._serial_optimizer = None self._serial_feed_vars = {} self._serial_fetch_vars = {} - self._lr_optimizer = None # record the optimzier holding lr_scheduler # Data members related to the program self._dist_tensors_for_program = {} @@ -870,7 +869,7 @@ class DistributedContext: "_serial_ordered_nodes", "_serial_ordered_tensor_nodes", \ "_serial_ordered_op_nodes", "_original_serial_loss", \ "_original_serial_feed_vars", "_original_serial_fetch_vars", \ - "_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_lr_optimizer", \ + "_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_serial_optimizer", \ "_backup_serial_main_program_stack", "_backup_serial_startup_program_stack", \ "_pass_context"]: setattr(result, k, v) diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index 7c550ab57852c7ffb488d8a7c84a1846cc36b62c..6ba4a7ad1a3ed073a0a9ae734edadb11264ef6ba 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -21,7 +21,7 @@ from collections import defaultdict import paddle import paddle.utils as utils -from paddle import fluid, profiler, static +from paddle import fluid, static from paddle.metric import Metric from paddle.static import InputSpec from paddle.fluid import core @@ -33,6 +33,7 @@ from paddle.fluid.framework import _current_expected_place as _get_device from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.distributed import fleet +from .callbacks import config_callbacks from .converter import Converter from .helper import ProgramHelper from .cluster import Cluster, get_default_cluster @@ -41,11 +42,12 @@ from .parallelizer_v2 import Parallelizer from .dist_op import DistributedOperator from .dist_saver import DistributedSaver from .dist_loader import DistributedDataLoaderFromGenerator, DistributedDataLoader -from .utils import to_list, get_logger, get_dist_attr +from .utils import to_list, get_dist_attr, get_lr from .process_group import new_process_group, get_all_process_groups from .dist_context import DistributedContext, get_default_distributed_context from .strategy import Strategy from .interface import CollectionNames, get_collection +from ..utils.log_utils import get_logger class Engine: @@ -200,6 +202,8 @@ class Engine: self._dygraph_mode = False self._tuning = self._strategy.tuning + self.history = None + def _prepare_data_spec(self, data, split, batch_size): inputs_spec = [] labels_spec = [] @@ -402,24 +406,23 @@ class Engine: lr=None, fetch_names=None, fetch_indices=None, - profiler_log="", mode=None): - logs = "[{}] ".format(mode) + logs = {} if epoch is not None: - logs += "epoch: {:d} ".format(epoch) + logs["epoch"] = epoch if step is not None: - logs += "step: {:d} ".format(step) + logs["step"] = step + 1 if lr is not None: - logs += "lr: {:5e} ".format(lr) + logs["lr"] = lr group_idx = 0 - # logging loss if mode != "predict": + # logging loss loss_indices = fetch_indices[group_idx] + assert len(loss_indices) <= 1 for idx in loss_indices: - logs += "loss: {:8f} ".format(outs[idx][0]) + logs["loss"] = outs[idx][0] group_idx += 1 - # logging metrics - if mode != "predict": + # logging metrics metric_vars = self._fetch_vars[mode]["metrics"] if metric_vars: for metric in self._metrics: @@ -431,61 +434,25 @@ class Engine: metric.update(*metric_out) results = metric.accumulate() for i, res in enumerate(to_list(results)): - logs += "{}: {:8f} ".format(metric.name()[i], res) + logs[metric.name()[i]] = res group_idx += 1 - # Skip logging outputs - if mode == "predict": + # logging outputs + elif mode == "predict": + outputs_indices = fetch_indices[group_idx] + logs_out = {} + for idx in outputs_indices: + logs_out["out%d" % (idx)] = outs[idx] + logs["outputs"] = logs_out group_idx += 1 # logging user fetches - fetches_logging = get_collection(CollectionNames.LOGGING) - for name, var in fetches_logging: + collect_fetches = get_collection(CollectionNames.FETCHES) + logs_fetch = {} + for name, var in collect_fetches: if var.name in fetch_names: idx = fetch_names.index(var.name) - # Use the user defined name for logging - logs += "{}: {} ".format(name, outs[idx]) - logs += profiler_log - self._logger.info(logs) - - def _prepare_history(self, outs, fetch_indices=None, mode=None): - history = {} - group_idx = 0 - # store loss - if mode != "predict": - loss_indices = fetch_indices[group_idx] - loss_values = [] - for idx in loss_indices: - loss_values.append(outs[idx][0]) - history["loss"] = loss_values - group_idx += 1 - # store metrics - if mode != "predict": - metric_vars = self._fetch_vars[mode]["metrics"] - if metric_vars: - for metric in self._metrics: - metrics_indices = fetch_indices[group_idx] - metric_out = [] - for idx in metrics_indices: - metric_out.append(outs[idx]) - if metric_out: - metric.update(*metric_out) - results = metric.accumulate() - history[tuple(metric.name())] = to_list(results) - group_idx += 1 - # store outputs - if mode == "predict": - outputs_indices = fetch_indices[group_idx] - outputs_values = [] - for idx in outputs_indices: - outputs_values.append(outs[idx]) - history["outputs"] = outputs_values - group_idx += 1 - # store user fetches - fetches_indices = fetch_indices[group_idx] - fetches_values = [] - for idx in fetches_indices: - fetches_values.append(outs[idx]) - history["fetches"] = fetches_values - return history + logs_fetch[name or var.name] = outs[idx] + logs["fetches"] = logs_fetch + return logs def _prepare_program(self, mode): # Do the build process @@ -674,7 +641,7 @@ class Engine: mode].dist_startup_programs self._feed_vars[mode] = self._dist_contexts[mode].serial_feed_vars self._fetch_vars[mode] = self._dist_contexts[mode].serial_fetch_vars - self._lr_optimizer = self._dist_contexts[mode]._lr_optimizer + self._optimizer = self._dist_contexts[mode]._serial_optimizer if self._nranks > 1: # Traverse different rank programs and traverse each op of them, @@ -719,7 +686,7 @@ class Engine: self._dist_attr) if self._strategy.reinit: - self._logger.info("NOTE: parameters wiil be re-initialized.") + self._logger.info("NOTE: parameters will be re-initialized.") dist_startup_prog = self._dist_startup_progs[mode][self._cur_rank] self._executor.run(dist_startup_prog) @@ -729,12 +696,16 @@ class Engine: batch_size=1, epochs=1, steps_per_epoch=None, + log_freq=10, + save_dir=None, + save_freq=1, valid_data=None, valid_sample_split=None, valid_freq=1, valid_steps=None, collate_fn=None, - callbacks=None): + callbacks=None, + verbose=2): """ Trains the model for a fixed number of epochs. If `valid_data` is set, evaluation will be done at the end of each epoch. @@ -810,59 +781,81 @@ class Engine: self._prepare_program(self._mode) else: self._switch_mode(self._mode) + + assert self._mode in self._dist_main_progs, \ + "train model is not ready, please call `engine._prepare_program('train')` first." + train_dataloader = self._prepare_dataloader_from_generator( dataset=train_data, capacity=70, - # use_double_buffer=use_double_buffer, iterable=False, - # return_list=return_list, - # use_multiprocess=use_multiprocess, - # drop_last=drop_last, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) - lr_scheduler = self._get_lr_scheduler(self.main_program) - - with profiler.Profiler(timer_only=True) as prof: - for epoch in range(epochs): - for step, _ in enumerate(train_dataloader): - try: - outs = self._executor.run( - self.main_program, - fetch_list=fetch_names, - use_program_cache=self._strategy.use_cache, - return_numpy=self._strategy.return_numpy) - except core.EOFException: - break - if lr_scheduler and step % self._k_steps == 0: - lr_scheduler.step() - lr = self._get_lr(self._lr_optimizer) - - prof.step() - - self._prepare_logger(outs, epoch, step, lr, - fetch_names, fetch_indices, - prof.step_info(), self._mode) - history = self._prepare_history(outs, fetch_indices, - self._mode) - - if valid_data and epoch % valid_freq == 0: - self.evaluate(valid_data, valid_sample_split, batch_size, - valid_steps, collate_fn, callbacks) - self._switch_mode("train") - else: - self._reset_metrics() - return history + + cbks = config_callbacks( + callbacks, + engine=self, + batch_size=batch_size, + epochs=epochs, + steps=train_dataloader._steps, + log_freq=log_freq, + save_freq=save_freq, + save_dir=save_dir, + verbose=verbose, + metrics=self._metrics_name(), + acc_step=self._k_steps, + ) + + cbks.on_begin('train') + for epoch in range(epochs): + logs = {} + cbks.on_epoch_begin(epoch) + for step, _ in enumerate(train_dataloader): + cbks.on_batch_begin('train', step, logs) + try: + outs = self._executor.run( + self.main_program, + fetch_list=fetch_names, + use_program_cache=self._strategy.use_cache, + return_numpy=self._strategy.return_numpy) + except core.EOFException: + break + lr = get_lr(self._optimizer) + logs = self._prepare_logger(outs, epoch, step, lr, fetch_names, + fetch_indices, self._mode) + cbks.on_batch_end('train', step, logs) + + if valid_data and (epoch + 1) % valid_freq == 0: + val_logs = self.evaluate(valid_data, valid_sample_split, + batch_size, valid_steps, log_freq, + collate_fn, callbacks, verbose) + val_logs = { + "val_" + name: val + for name, val in val_logs.items() + } + logs.update(val_logs) + self._switch_mode("train") + else: + self._reset_metrics() + + cbks.on_epoch_end(epoch, logs) + + cbks.on_end('train', logs) + return self.history def evaluate(self, valid_data, valid_sample_split=None, batch_size=1, steps=None, + log_freq=10, collate_fn=None, - callbacks=None): + callbacks=None, + verbose=2): """ Evaluate the loss and metrics of the model on evaluation data. @@ -918,25 +911,36 @@ class Engine: self._prepare_program(self._mode) else: self._switch_mode(self._mode) + assert self._mode in self._dist_main_progs, \ "eval model is not ready, please call `engine._prepare_program('eval')` first." valid_dataloader = self._prepare_dataloader_from_generator( dataset=valid_data, - # feed_list=feed_list, capacity=70, - # use_double_buffer=use_double_buffer, iterable=False, - # return_list=return_list, - # use_multiprocess=use_multiprocess, - # drop_last=drop_last, - # places=places, batch_size=batch_size, - # epochs=epochs, steps_per_epoch=steps, collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) + cbks = config_callbacks( + callbacks, + engine=self, + batch_size=batch_size, + log_freq=log_freq, + verbose=verbose, + metrics=self._metrics_name(), + ) + + eval_steps = valid_dataloader._steps + cbks.on_begin('eval', { + 'steps': eval_steps, + 'metrics': self._metrics_name() + }) + logs = {} for step, _ in enumerate(valid_dataloader): + cbks.on_batch_begin('eval', step, logs) try: outs = self._executor.run( self.main_program, @@ -945,11 +949,12 @@ class Engine: return_numpy=self._strategy.return_numpy) except core.EOFException: break - self._prepare_logger(outs, None, step, None, fetch_names, - fetch_indices, "", self._mode) - history = self._prepare_history(outs, fetch_indices, self._mode) + logs = self._prepare_logger(outs, None, step, None, fetch_names, + fetch_indices, self._mode) + cbks.on_batch_end('eval', step, logs) + cbks.on_end('eval', logs) self._reset_metrics() - return history + return logs def predict(self, test_data, @@ -957,7 +962,8 @@ class Engine: batch_size=1, steps=None, collate_fn=None, - callbacks=None): + callbacks=None, + verbose=2): """ Compute the output predictions on testing data. @@ -1010,8 +1016,10 @@ class Engine: self._prepare_program(self._mode) else: self._switch_mode(self._mode) + assert self._mode in self._dist_main_progs, \ "predict model is not ready, please call `engine._prepare_program('predict')` first." + test_dataloader = self._prepare_dataloader_from_generator( dataset=test_data, # feed_list=feed_list, @@ -1026,9 +1034,16 @@ class Engine: # epochs=epochs, steps_per_epoch=steps, collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) + outputs = [] + cbks = config_callbacks(callbacks, engine=self, verbose=verbose) + test_steps = test_dataloader._steps + cbks.on_begin('predict', {'steps': test_steps}) + logs = {} for step, _ in enumerate(test_dataloader): + cbks.on_batch_begin('predict', step, logs) try: outs = self._executor.run( self.main_program, @@ -1037,29 +1052,28 @@ class Engine: return_numpy=self._strategy.return_numpy) except core.EOFException: break - self._prepare_logger(outs, None, step, None, fetch_names, - fetch_indices, "", self._mode) - history = self._prepare_history(outs, fetch_indices, self._mode) - - return history - - def dataloader( - self, - dataset, - # return_list=True, - batch_size=1, - shuffle=False, - drop_last=False, - collate_fn=None, - num_workers=0, - use_buffer_reader=True, - use_shared_memory=True, - timeout=0, - worker_init_fn=None, - epochs=1, - steps_per_epoch=None, - sample_split=1, - mode=None): + logs = self._prepare_logger(outs, None, step, None, fetch_names, + fetch_indices, self._mode) + cbks.on_batch_end('predict', step, logs) + outputs.append(list(logs["outputs"].values())) + cbks.on_end('predict', logs) + return outputs + + def dataloader(self, + dataset, + batch_size=1, + shuffle=False, + drop_last=False, + collate_fn=None, + num_workers=0, + use_buffer_reader=True, + use_shared_memory=True, + timeout=0, + worker_init_fn=None, + epochs=1, + steps_per_epoch=None, + sample_split=1, + mode=None): if mode is not None: self.to_mode(mode) self._inputs_spec, self._labels_spec = self._prepare_data_spec( @@ -1199,10 +1213,9 @@ class Engine: fetch_list=fetch_names, use_program_cache=self._strategy.use_cache, return_numpy=self._strategy.return_numpy) - self._prepare_logger(outs, None, None, None, fetch_names, fetch_indices, - "", self._mode) - history = self._prepare_history(outs, fetch_indices, self._mode) - return history + logs = self._prepare_logger(outs, None, None, None, fetch_names, + fetch_indices, self._mode) + return logs def _prepare_dataloader(self, dataset, @@ -1307,13 +1320,6 @@ class Engine: copy_var.desc.set_original_id(var.desc.original_id()) feed_list.append(copy_var) - # # remove the first three ops if multi run fit/evaluate/predict - # self._op_size = len(dist_main_block.ops) - # if dist_main_block.ops[0].type == 'create_py_reader': - # op_size -= 3 - # for _ in range(3): - # dist_main_block._remove_op(0, sync=False) - places = paddle.static.cuda_places() with static.program_guard(dist_main_prog, dist_startup_prog): dataloader = DistributedDataLoaderFromGenerator( @@ -1334,21 +1340,6 @@ class Engine: data_parallel_world_size=self._dp_world_sizes, data_parallel_rank=self._dp_ranks) self._prepare_reader() - # # move read op from the end of program to the start of program - # new_op_size = len(dist_main_block.ops) - # for _ in range(new_op_size - 1, op_size - 1, -1): - # op = dist_main_block.ops[new_op_size - 1] - # new_op_desc = dist_main_block.desc._prepend_op() - # new_op_desc.copy_from(op.desc) - # new_op = Operator(dist_main_block, - # new_op_desc, - # type=new_op_desc.type()) - # dist_main_block.ops.insert(0, new_op) - # dist_op = DistributedOperator(new_op) - # dist_context.add_dist_op_for_program(dist_op) - # for _ in range(new_op_size - op_size): - # dist_main_block._remove_op(new_op_size, sync=False) - # dist_main_block._sync_with_cpp() return dataloader def _tune(self, tune_data, tune_sample_split=None, batch_size=1): @@ -1441,9 +1432,15 @@ class Engine: for metric in self._metrics: metric.reset() + def _metrics_name(self): + metrics_name = ['loss'] if self._loss else [] + for m in self._metrics: + metrics_name.extend(to_list(m.name())) + return metrics_name + def _switch_mode(self, mode): self.to_mode(mode) - self._initialize(mode) + self._optimizer = self._dist_contexts[mode]._serial_optimizer def to_mode(self, mode): assert mode in ["train", "eval", "predict"], \ @@ -1504,20 +1501,19 @@ class Engine: """ if training: - assert 'train' in self._serial_main_progs, \ - "training model is not ready, please call `engine._prepare_program('train')` first." - serial_program = self._serial_main_progs["train"] - dist_main_prog = self._dist_main_progs["train"][self._cur_rank] - dist_context = self._dist_contexts["train"] + assert self._mode in self._serial_main_progs + serial_program = self._serial_main_progs[self._mode] + dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] + dist_context = self._dist_contexts[self._mode] self._saver.save(path, serial_program=serial_program, dist_main_program=dist_main_prog, dist_context=dist_context) else: - mode = "predict" - feed_vars = self._feed_vars[mode]['inputs'] - fetch_vars = self._fetch_vars[mode]['outputs'] - dist_main_prog = self._dist_main_progs[mode][self._cur_rank] + assert "predict" in self._dist_main_progs + feed_vars = self._feed_vars["predict"]['inputs'] + fetch_vars = self._fetch_vars["predict"]['outputs'] + dist_main_prog = self._dist_main_progs["predict"][self._cur_rank] self._saver.save_inference_model(path, feed_vars, fetch_vars, @@ -1575,29 +1571,6 @@ class Engine: path, load_optimizer) return self._state_dict, self._dist_attr - @staticmethod - def _get_lr_scheduler(program): - lr_sheduler = None - if hasattr(program, 'lr_sheduler'): - from paddle.optimizer.lr import LRScheduler - lr_sheduler = program.lr_sheduler - assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler" - return lr_sheduler - - def _get_lr(self, optimizer): - if isinstance(optimizer, paddle.optimizer.Optimizer): - return optimizer.get_lr() - elif isinstance(optimizer, paddle.fluid.optimizer.Optimizer): - if isinstance(optimizer._learning_rate, float): - return optimizer._learning_rate - else: - return optimizer._learning_rate() - else: - raise TypeError( - "'optimizer' must be object of class `paddle.optimizer.Optimizer`" \ - " or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer)) - ) - @property def main_program(self): return self._dist_main_progs[self._mode][self._cur_rank] diff --git a/python/paddle/distributed/auto_parallel/interface.py b/python/paddle/distributed/auto_parallel/interface.py index f7f6d89a6168e37e19a4fec1d0a9f3595bdab107..0abdc0ea76893c37190613df9d84b2754274eea9 100644 --- a/python/paddle/distributed/auto_parallel/interface.py +++ b/python/paddle/distributed/auto_parallel/interface.py @@ -214,8 +214,12 @@ def add_to_collection(collection_name, value, name=None): if collection_name not in _g_collections: _g_collections[collection_name] = [] if name is not None: + for _, v in _g_collections[collection_name]: + if v == value: return _g_collections[collection_name].append((name, value)) else: + for _, v in _g_collections[collection_name]: + if v == value: return _g_collections[collection_name].append((None, value)) diff --git a/python/paddle/distributed/auto_parallel/parallelizer.py b/python/paddle/distributed/auto_parallel/parallelizer.py index 68bdc91435f3c1bafa1c818fe53e089d9e916981..b25d1d8b33bf7844493a1049c8c503abb3ecb43a 100644 --- a/python/paddle/distributed/auto_parallel/parallelizer.py +++ b/python/paddle/distributed/auto_parallel/parallelizer.py @@ -23,10 +23,10 @@ import logging import pickle import time import paddle -from paddle.fluid.backward import append_backward -from paddle.distributed.utils.log_utils import get_logger import paddle.fluid.core as core from paddle.fluid import program_guard +from paddle.fluid.backward import append_backward +from paddle.distributed.utils.log_utils import get_logger from paddle.distributed.passes import new_pass, PassContext from .dist_context import DistributedContext from .dist_context import set_default_distributed_context @@ -39,7 +39,6 @@ from .process_group import _g_process_group_map, ProcessGroup from .utils import make_data_unshard from .utils import set_grad_var_shape from .utils import SerialProgramInfo -from .utils import get_logger from .reshard import Resharder from .cluster import Cluster from .mapper import mapping @@ -147,7 +146,7 @@ class AutoParallelizer: with program_guard(main_program, startup_program): optimize_ops = optimizer.apply_gradients(params_grads) - self._dist_context._lr_optimizer = optimizer + self._dist_context._serial_optimizer = optimizer # update completion self._completer = Completer(self._dist_context) self._completer.complete_update_annotation(main_program) diff --git a/python/paddle/distributed/auto_parallel/parallelizer_v2.py b/python/paddle/distributed/auto_parallel/parallelizer_v2.py index 98bb2d52dab5f0685c232eb5629c243d7fb1ed38..d6bb5efc8eb915d0f5e662cad2abf983f9da0895 100644 --- a/python/paddle/distributed/auto_parallel/parallelizer_v2.py +++ b/python/paddle/distributed/auto_parallel/parallelizer_v2.py @@ -24,8 +24,8 @@ from paddle.distributed.passes import new_pass from .reshard import Resharder from .partitioner import Partitioner from .utils import set_grad_var_shape -from .utils import get_logger from .process_group import get_world_process_group +from ..utils.log_utils import get_logger class Parallelizer: @@ -62,7 +62,7 @@ class Parallelizer: serial_main_program, serial_startup_program, params_grads = self._apply_pre_optimization( serial_main_program, serial_startup_program, serial_loss, serial_optimizer, params_grads) - self._logger.info( + self._logger.debug( "within parallel apply_pre_optimization time: {}, mode {}". format(time.time() - time0, self._mode)) # Do logical partition @@ -70,14 +70,14 @@ class Parallelizer: partitioner = Partitioner(self._dist_context, rank) dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition( serial_main_program, serial_startup_program, params_grads) - self._logger.info( + self._logger.debug( "within parallel partitioner time: {}, mode {}".format( time.time() - time0, self._mode)) # Generate optimizer time0 = time.time() self._generate_optimizer(dist_main_prog, dist_startup_prog, serial_optimizer, dist_params_grads) - self._logger.info( + self._logger.debug( "within parallel optimizer time: {}, mode {}".format( time.time() - time0, self._mode)) # Do reshard process @@ -86,14 +86,14 @@ class Parallelizer: resharder = Resharder(dist_main_prog, dist_startup_prog, rank, self._dist_context, dist_params_grads) resharder.reshard() - self._logger.info( + self._logger.debug( "within parallel reshard time: {}, mode {}".format( time.time() - time0, self._mode)) # Apply post optimization passes time0 = time.time() self._apply_post_optimization(dist_main_prog, dist_startup_prog, rank, dist_params_grads) - self._logger.info( + self._logger.debug( "within parallel apply_post_optimization time: {}, mode {}". format(time.time() - time0, self._mode)) else: @@ -102,7 +102,7 @@ class Parallelizer: self._apply_pre_optimization(serial_main_program, serial_startup_program, None, None, None) - self._logger.info( + self._logger.debug( "within parallel apply_pre_optimization time: {}, mode {}". format(time.time() - time0, self._mode)) # Do logical partition @@ -111,14 +111,14 @@ class Parallelizer: dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition( serial_main_program, serial_startup_program, []) # Do reshard process - self._logger.info( + self._logger.debug( "within parallel partitioner time: {}, mode {}".format( time.time() - time0, self._mode)) time0 = time.time() resharder = Resharder(dist_main_prog, dist_startup_prog, rank, self._dist_context, [], 1) resharder.reshard() - self._logger.info( + self._logger.debug( "within parallel reshard time: {}, mode {}".format( time.time() - time0, self._mode)) # Clone program for test @@ -143,7 +143,7 @@ class Parallelizer: # NOTE: `apply_gradients` will add an Accumulator for a parameter only once, # but optimizer will be called repeatedly in re-launch, so optimizer need to be copied. optimizer = copy.deepcopy(optimizer) - self._dist_context._lr_optimizer = optimizer + self._dist_context._serial_optimizer = optimizer with program_guard(main_program, startup_program): with unique_name.guard("opt_"): optimizer_ops = optimizer.apply_gradients(params_grads) @@ -170,9 +170,7 @@ class Parallelizer: startup_program = self._pass_context.get_attr("startup_program") params_grads = self._pass_context.get_attr("params_grads") - # apply amp pass - # FIXME we disenable amp for eval since it has a little bug with - # eval program and which will be fixed in future + # apply amp pass on train/eval/predict if self._strategy.amp.enable: config = copy.deepcopy(self._strategy.amp.to_dict()) config["dist_context"] = self._dist_context diff --git a/python/paddle/distributed/auto_parallel/utils.py b/python/paddle/distributed/auto_parallel/utils.py index 88b5a0842262dadc0ebf5be677b0aa62501e67b7..00f86ca4d0c1e98fd346b8e07bb21c19e65aee34 100644 --- a/python/paddle/distributed/auto_parallel/utils.py +++ b/python/paddle/distributed/auto_parallel/utils.py @@ -1587,3 +1587,18 @@ def find_higher_order_backward_op(program): return True return False + + +def get_lr(optimizer): + if isinstance(optimizer, paddle.optimizer.Optimizer): + return optimizer.get_lr() + elif isinstance(optimizer, paddle.fluid.optimizer.Optimizer): + if isinstance(optimizer._learning_rate, float): + return optimizer._learning_rate + else: + return optimizer._learning_rate() + else: + raise TypeError( + "'optimizer' must be object of class `paddle.optimizer.Optimizer`" \ + " or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer)) + ) diff --git a/python/paddle/distributed/passes/auto_parallel_grad_clip.py b/python/paddle/distributed/passes/auto_parallel_grad_clip.py index 8f5d5463e55060cf582c1287464fd44292b4abc3..5108992ae55feac11dd6ebad29dc7eba9d1d4e2e 100644 --- a/python/paddle/distributed/passes/auto_parallel_grad_clip.py +++ b/python/paddle/distributed/passes/auto_parallel_grad_clip.py @@ -212,7 +212,7 @@ class ClipGradByGloblNormPass(PassBase): if self.get_attr("dist_context") is None: return False dist_context = self.get_attr("dist_context") - if dist_context._lr_optimizer._grad_clip is None: + if dist_context._serial_optimizer._grad_clip is None: return False if self.get_attr("params_grads") is None: return False diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 808546482bb279066e2ab937240d9cffa789b55a..c70028051f1e605dcb17a258baf634f2adceea9a 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -60,6 +60,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_pass_amp MODULES test_pass_amp ENVS ${dist_ENVS}) set_tests_properties(test_pass_amp PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) + py_test_modules(test_engine_callbacks MODULES test_engine_callbacks) + set_tests_properties(test_engine_callbacks + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) py_test_modules(test_while_op_completion MODULES test_while_op_completion ENVS ${dist_ENVS}) @@ -100,5 +103,4 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_dist_assign MODULES test_dist_assign) py_test_modules(test_conditional_block_reshard MODULES test_conditional_block_reshard) - endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py index 60b47a364d82f9aaceb7adf7f98a7630ee99d435..c00a3367a986dec2873eed89fb0a3e64e0e59842 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py @@ -87,27 +87,27 @@ class TestAMPPass(unittest.TestCase): def test_amp_pass(self): # mp2 training mp_engine = self.get_engine() - outs = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size) - mp_losses = np.array(outs["loss"]) + history = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size) + mp_losses = np.array(history.history["loss"]) # mp2 amp-o1 training amp_o1_engine = self.get_engine(True, "o1") - outs = amp_o1_engine.fit(self.dataset, 3, batch_size=self.batch_size) - amp_o1_losses = np.array(outs["loss"]) + history = amp_o1_engine.fit(self.dataset, 3, batch_size=self.batch_size) + amp_o1_losses = np.array(history.history["loss"]) amp_o1_engine.evaluate(self.dataset, 3, batch_size=self.batch_size) # self.check_results(mp_losses, amp_o1_losses) # mp2 amp-o2 training amp_o2_engine = self.get_engine(True, "o2") - outs = amp_o2_engine.fit(self.dataset, 3, batch_size=self.batch_size) - amp_o2_losses = np.array(outs["loss"]) + history = amp_o2_engine.fit(self.dataset, 3, batch_size=self.batch_size) + amp_o2_losses = np.array(history.history["loss"]) amp_o2_engine.evaluate(self.dataset, 3, batch_size=self.batch_size) # self.check_results(mp_losses, amp_o2_losses) # mp2 amp-o3 training amp_o3_engine = self.get_engine(True, "o3") - outs = amp_o3_engine.fit(self.dataset, 3, batch_size=self.batch_size) - amp_o3_losses = np.array(outs["loss"]) + history = amp_o3_engine.fit(self.dataset, 3, batch_size=self.batch_size) + amp_o3_losses = np.array(history.history["loss"]) amp_o3_engine.evaluate(self.dataset, 3, batch_size=self.batch_size) # self.check_results(mp_losses, amp_o3_losses) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py index ad8d477c81dbc4acc12398b4272130adab21f145..07eaac027e1dcfa7444a2b8ba16f859745721289 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -25,6 +25,7 @@ from paddle.io import Dataset from paddle.distributed.fleet import auto paddle.enable_static() + global_process_mesh = auto.ProcessMesh(mesh=[0, 1]) PP_MESH_0 = auto.ProcessMesh([0]) PP_MESH_1 = auto.ProcessMesh([1]) @@ -113,7 +114,7 @@ class MLPLayer(nn.Layer): if is_feed: my_feed_vars.append((out, out.shape)) if is_fetch: - auto.fetch(out, "my_out", logging=True) + auto.fetch(out, "my_fetch", logging=True) return out @@ -140,10 +141,11 @@ def train_high_level(fetch): # train train_dataset = MyDataset(batch_num * batch_size) eval_dataset1 = MyDataset(5 * batch_size) - engine.fit(train_data=train_dataset, - epochs=2, - batch_size=batch_size, - valid_data=eval_dataset1) + history = engine.fit(train_data=train_dataset, + epochs=2, + batch_size=batch_size, + valid_data=eval_dataset1, + log_freq=1) # eval eval_dataset2 = MyDataset(batch_size) @@ -151,7 +153,7 @@ def train_high_level(fetch): # predict test_dataset = MyDataset(batch_size) - engine.predict(test_dataset, batch_size=batch_size) + outputs = engine.predict(test_dataset, batch_size=batch_size) # save temp_dir = tempfile.TemporaryDirectory() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py index f70829c9fb6c77bb1a3786dc70c7291cec28c57e..bdce36be605c1ba2938768c17a2cb513f1c8a57f 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py @@ -83,25 +83,32 @@ class TestGradientMergePass(unittest.TestCase): def test_gradient_merge_pass(self): # dp2 training dp_engine = self.get_engine() - outs = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size) - dp_losses = np.array(outs["loss"]) + history = dp_engine.fit(self.dataset, + 3, + batch_size=self.batch_size, + log_freq=1) + dp_losses = np.array(history.history["loss"]) # dp2 gradient merge training gm_engine = self.get_engine(True) - outs = gm_engine.fit(self.dataset, 3, batch_size=self.batch_size) - gm_losses = np.array(outs["loss"]) - - avg_loss = 0 - pass_avg_ret_list = [] - for i, pass_ret in enumerate(gm_losses): - if (i + 1) % 4 == 0: - avg_loss += pass_ret - pass_avg_ret_list.append(avg_loss / 4) - avg_loss = 0 - else: - avg_loss += pass_ret - - # self.check_results(dp_losses, np.array(pass_avg_ret_list)) + history = gm_engine.fit(self.dataset, + 3, + batch_size=self.batch_size, + log_freq=1) + gm_losses = np.array(history.history["loss"]) + + # avg_loss = 0 + # pass_avg_ret_list = [] + # for i, pass_ret in enumerate(gm_losses): + # if (i + 1) % 4 == 0: + # avg_loss += pass_ret + # pass_avg_ret_list.append(avg_loss / 4) + # avg_loss = 0 + # else: + # avg_loss += pass_ret + + # NOTE: every sample data from dataset is all the same + self.check_results(dp_losses, gm_losses) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py index e0b9f9d9e7588b397b12b5c121e61c12bb39029a..9c17af12d73de2eaf3722f983b0a06f7dd2ea6b1 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py @@ -78,13 +78,13 @@ class TestRecomputePass(unittest.TestCase): def test_recompute_pass(self): # mp2 training mp_engine = self.get_engine() - outs = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size) - mp_losses = np.array(outs["loss"]) + history = mp_engine.fit(self.dataset, 3, batch_size=self.batch_size) + mp_losses = np.array(history.history["loss"]) # mp2 recompute training rc_engine = self.get_engine(True) - outs = rc_engine.fit(self.dataset, 3, batch_size=self.batch_size) - rc_losses = np.array(outs["loss"]) + history = rc_engine.fit(self.dataset, 3, batch_size=self.batch_size) + rc_losses = np.array(history.history["loss"]) self.check_results(mp_losses, rc_losses) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py index 78f37e37db186ea1f55c9bafc5058e441e754c78..a549cd8e6d8eb5e02706f9e52b6f167e86969d81 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py @@ -83,25 +83,31 @@ class TestShardingPass(unittest.TestCase): def test_sharding_pass(self): # dp2 training dp_engine = self.get_engine() - dp_losses = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size) - dp_losses = np.array(dp_losses["loss"]) + history = dp_engine.fit(self.dataset, 3, batch_size=self.batch_size) + dp_losses = np.array(history.history["loss"]) # sharding2 stage1 training sharding1_engine = self.get_engine(True, 1) - outs = sharding1_engine.fit(self.dataset, 3, batch_size=self.batch_size) - sharding1_losses = np.array(outs["loss"]) + history = sharding1_engine.fit(self.dataset, + 3, + batch_size=self.batch_size) + sharding1_losses = np.array(history.history["loss"]) self.check_results(dp_losses, sharding1_losses) # sharding2 stage2 training sharding2_engine = self.get_engine(True, 2) - outs = sharding2_engine.fit(self.dataset, 3, batch_size=self.batch_size) - sharding2_losses = np.array(outs["loss"]) + history = sharding2_engine.fit(self.dataset, + 3, + batch_size=self.batch_size) + sharding2_losses = np.array(history.history["loss"]) self.check_results(dp_losses, sharding2_losses) # sharding2 stage3 training sharding3_engine = self.get_engine(True, 3) - outs = sharding3_engine.fit(self.dataset, 3, batch_size=self.batch_size) - sharding3_losses = np.array(outs["loss"]) + history = sharding3_engine.fit(self.dataset, + 3, + batch_size=self.batch_size) + sharding3_losses = np.array(history.history["loss"]) self.check_results(dp_losses, sharding3_losses) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_context.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_context.py index 6caf9ba82c3719530a91270c01f6e2fd02e8e664..ea24f65479561e9274771d1f25f16eae5d754a2d 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_context.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_dist_context.py @@ -195,7 +195,7 @@ class TestDistributedContext(unittest.TestCase): "_serial_ordered_nodes", "_serial_ordered_tensor_nodes", \ "_serial_ordered_op_nodes", "_original_serial_loss", \ "_original_serial_feed_vars", "_original_serial_fetch_vars", \ - "_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_lr_optimizer", \ + "_serial_loss", "_serial_feed_vars", "_serial_fetch_vars", "_serial_optimizer", \ "_backup_serial_main_program_stack", "_backup_serial_startup_program_stack", \ "_pass_context"] diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_callbacks.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_callbacks.py new file mode 100644 index 0000000000000000000000000000000000000000..9baaee353f71536fcbb15bdf26d7e4dfae16c7fd --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_callbacks.py @@ -0,0 +1,173 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import tempfile +import shutil +import time +import random + +import paddle +import paddle.vision.transforms as T + +from paddle.static import InputSpec +from paddle.distributed.fleet import auto +from paddle.distributed.auto_parallel.callbacks import config_callbacks +from paddle.vision.models import LeNet +from paddle.vision.datasets import MNIST + +paddle.enable_static() + + +class TestCallbacks(unittest.TestCase): + + def setUp(self): + self.save_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.save_dir) + + def run_callback(self): + epochs = 2 + steps = 5 + freq = 2 + eval_steps = 2 + + inputs_spec = [InputSpec([None, 1, 28, 28], 'float32', 'image')] + strategy = auto.Strategy() + strategy.auto_mode = "semi" + + engine = auto.Engine(LeNet(), strategy=strategy) + engine.prepare(inputs_spec, mode="predict") + + cbks = config_callbacks(engine=engine, + batch_size=128, + epochs=epochs, + steps=steps, + log_freq=freq, + verbose=self.verbose, + metrics=['loss', 'acc'], + save_dir=self.save_dir) + cbks.on_begin('train') + + logs = {'loss': 50.341673, 'acc': 0.00256} + for epoch in range(epochs): + cbks.on_epoch_begin(epoch) + for step in range(steps): + cbks.on_batch_begin('train', step, logs) + logs['loss'] -= random.random() * 0.1 + logs['acc'] += random.random() * 0.1 + time.sleep(0.005) + cbks.on_batch_end('train', step, logs) + cbks.on_epoch_end(epoch, logs) + + eval_logs = {'eval_loss': 20.341673, 'eval_acc': 0.256} + params = { + 'steps': eval_steps, + 'metrics': ['eval_loss', 'eval_acc'], + } + cbks.on_begin('eval', params) + for step in range(eval_steps): + cbks.on_batch_begin('eval', step, eval_logs) + eval_logs['eval_loss'] -= random.random() * 0.1 + eval_logs['eval_acc'] += random.random() * 0.1 + eval_logs['batch_size'] = 2 + time.sleep(0.005) + cbks.on_batch_end('eval', step, eval_logs) + cbks.on_end('eval', eval_logs) + + test_logs = {} + params = {'steps': eval_steps} + cbks.on_begin('predict', params) + for step in range(eval_steps): + cbks.on_batch_begin('predict', step, test_logs) + test_logs['batch_size'] = 2 + time.sleep(0.005) + cbks.on_batch_end('predict', step, test_logs) + cbks.on_end('predict', test_logs) + + cbks.on_end('train') + + print(engine.history.history) + + def test_callback_verbose_0(self): + self.verbose = 0 + self.run_callback() + + def test_callback_verbose_1(self): + self.verbose = 1 + self.run_callback() + + def test_callback_verbose_2(self): + self.verbose = 2 + self.run_callback() + + def test_callback_verbose_3(self): + self.verbose = 3 + self.run_callback() + + +class TestCallbacksEngine(unittest.TestCase): + + def setUp(self): + self.save_dir = tempfile.mkdtemp() + transform = T.Compose([T.Transpose(), T.Normalize([127.5], [127.5])]) + self.train_dataset = MNIST(mode='train', transform=transform) + self.test_dataset = MNIST(mode='test', transform=transform) + self.prepare_engine() + + def tearDown(self): + shutil.rmtree(self.save_dir) + + def prepare_engine(self): + model = paddle.vision.models.LeNet() + loss = paddle.nn.CrossEntropyLoss() + base_lr = 1e-3 + boundaries = [5, 8] + values = [base_lr * (0.1**i) for i in range(len(boundaries) + 1)] + lr = paddle.optimizer.lr.PiecewiseDecay(boundaries=boundaries, + values=values, + verbose=False) + optimizer = paddle.optimizer.Adam(learning_rate=lr, + parameters=model.parameters()) + auto.fetch(model.parameters()[0], "param0", logging=True) + metrics = paddle.metric.Accuracy(topk=(1, 2)) + self.engine = auto.Engine(model, loss, optimizer, metrics) + + def test_fit_eval(self): + history = self.engine.fit(train_data=self.train_dataset, + valid_data=self.test_dataset, + batch_size=128, + steps_per_epoch=60, + valid_steps=40, + log_freq=20, + save_dir=self.save_dir, + save_freq=1) + print(history.history) + + def test_eval(self): + self.engine.evaluate(valid_data=self.test_dataset, + batch_size=128, + steps=40, + log_freq=10) + + def test_predict(self): + logger_cbks = paddle.callbacks.ProgBarLogger() + self.engine.predict(test_data=self.test_dataset, + batch_size=128, + callbacks=[logger_cbks]) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_lr_grad_clip.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_lr_grad_clip.py index 7077cf8acd6d4c552c0b996eec4834f8a7a9f7a5..fc5240a369da1e7cca39f6b7ad967c996c416611 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_lr_grad_clip.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_lr_grad_clip.py @@ -68,7 +68,7 @@ class TestLRScheduler(TestEngineBase): def test_lr_scheduler(self): self.init_engine() self.engine.fit(self.dataset, batch_size=self.batch_size) - lr = self.engine._lr_optimizer._learning_rate + lr = self.engine._optimizer._learning_rate assert isinstance(lr, paddle.optimizer.lr.LRScheduler) diff --git a/python/paddle/hapi/callbacks.py b/python/paddle/hapi/callbacks.py index 7bae53370eed8c851082186aa0d52a240e67a6a6..886a81801595806a03d8940517378050c18e3717 100644 --- a/python/paddle/hapi/callbacks.py +++ b/python/paddle/hapi/callbacks.py @@ -20,7 +20,7 @@ import warnings import numpy as np import paddle -from paddle.distributed import ParallelEnv +from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.utils import try_import from .progressbar import ProgressBar diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index acacf2fd08848b0d90e4e53bb89da33e73544bcd..5bd926fddf84aed1c8fe9e3bb29be14a0fa5ffcb 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -46,6 +46,7 @@ from paddle.static import InputSpec as Input import paddle.distributed as dist import paddle.distributed.fleet as fleet from paddle.distributed.fleet.base import role_maker +from paddle.autograd import no_grad from .callbacks import config_callbacks, EarlyStopping from .model_summary import summary @@ -1099,7 +1100,7 @@ class Model(object): self._update_inputs() return loss - @paddle.no_grad() + @no_grad() def eval_batch(self, inputs, labels=None): """ Run one evaluating step on a batch of data. @@ -1151,7 +1152,7 @@ class Model(object): self._update_inputs() return loss - @paddle.no_grad() + @no_grad() def predict_batch(self, inputs): """ Run one predicting step on a batch of data. diff --git a/python/paddle/hapi/model_summary.py b/python/paddle/hapi/model_summary.py index 3822d13b989940db12a4edb40ef3753f94e13b30..de4ec449a06b68b05cc71969f176cf2d5277659b 100644 --- a/python/paddle/hapi/model_summary.py +++ b/python/paddle/hapi/model_summary.py @@ -19,7 +19,7 @@ import numbers import paddle import paddle.nn as nn from paddle.static import InputSpec - +from paddle.autograd import no_grad from collections import OrderedDict __all__ = [] @@ -229,7 +229,7 @@ def summary(net, input_size=None, dtypes=None, input=None): return params_info -@paddle.no_grad() +@no_grad() def summary_string(model, input_size=None, dtypes=None, input=None): def _all_is_numper(items):