From 315ef26505c4c1b5bf534b5ff9c8643fcce57f54 Mon Sep 17 00:00:00 2001 From: zhaoyingli <86812880+zhaoyinglia@users.noreply.github.com> Date: Fri, 28 Oct 2022 20:08:03 +0800 Subject: [PATCH] [AutoParallel] fix engine _build and cost method (#47263) * fix engine build method * fix import * update engine cost * update raise error * update cmakelist * revert optimizer * revert optimizer * fix unittest * fix unittest Co-authored-by: caozhou --- .../auto_parallel/cost/comp_op_cost.py | 38 ++ .../auto_parallel/cost/estimate_cost.py | 28 +- .../distributed/auto_parallel/engine.py | 338 +++++++++--------- .../paddle/distributed/auto_parallel/utils.py | 31 ++ .../unittests/auto_parallel/CMakeLists.txt | 22 +- .../unittests/auto_parallel/engine_api.py | 54 ++- .../auto_parallel/test_engine_api_error.py | 311 ++++++++++++++++ 7 files changed, 634 insertions(+), 188 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api_error.py diff --git a/python/paddle/distributed/auto_parallel/cost/comp_op_cost.py b/python/paddle/distributed/auto_parallel/cost/comp_op_cost.py index 51c3d6069a..293c9ae3a5 100644 --- a/python/paddle/distributed/auto_parallel/cost/comp_op_cost.py +++ b/python/paddle/distributed/auto_parallel/cost/comp_op_cost.py @@ -34,6 +34,25 @@ class AdamOpCost(CompOpCost): return 0 +@register_op_cost +class ArgsortOpCost(CompOpCost): + OP_TYPE = "argsort" + + def __init__(self, op=None, op_desc=None, cluster=None): + super(ArgsortOpCost, self).__init__( + op=op, op_desc=op_desc, cluster=cluster + ) + + # For a concrete COMP OP, the calc_time and calc_flops function need to be overrided + def calc_flops(self): + # NOTE: The actual formula will be filled in the future + return 0 + + def calc_time(self): + # NOTE: The actual formula will be filled in the future + return 0 + + @register_op_cost class AssignOpCost(CompOpCost): OP_TYPE = "assign" @@ -338,6 +357,25 @@ class ElementwiseSubGradOpCost(CompOpCost): return 0 +@register_op_cost +class EqualOpCost(CompOpCost): + OP_TYPE = "equal" + + def __init__(self, op=None, op_desc=None, cluster=None): + super(EqualOpCost, self).__init__( + op=op, op_desc=op_desc, cluster=cluster + ) + + # For a concrete COMP OP, the calc_time and calc_flops function need to be overrided + def calc_flops(self): + # NOTE: The actual formula will be filled in the future + return 0 + + def calc_time(self): + # NOTE: The actual formula will be filled in the future + return 0 + + @register_op_cost class EmbeddingOpCost(CompOpCost): OP_TYPE = "c_embedding" diff --git a/python/paddle/distributed/auto_parallel/cost/estimate_cost.py b/python/paddle/distributed/auto_parallel/cost/estimate_cost.py index cac8bf9f27..5808b706fc 100644 --- a/python/paddle/distributed/auto_parallel/cost/estimate_cost.py +++ b/python/paddle/distributed/auto_parallel/cost/estimate_cost.py @@ -545,11 +545,12 @@ class CostEstimator: def get_cost_from_engine(engine, mode): from ..utils import to_list + import copy # Construct cost estimator by original main program serial_main_prog = ( - engine._serial_main_progs[mode].clone() - if mode in engine._serial_main_progs + engine._fwd_main_progs[mode].clone() + if mode in engine._fwd_main_progs else engine._orig_main_prog.clone() ) @@ -566,29 +567,29 @@ def get_cost_from_engine(engine, mode): ) else engine._losses ) - - if mode in engine._dist_contexts: - dist_context = engine._dist_contexts[mode] - completer = engine._planners[mode].completer + serial_optimizer = copy.deepcopy(engine._orig_optimizer) + if mode in engine._fwd_dist_contexts: + dist_context = copy.deepcopy(engine._fwd_dist_contexts[mode]) else: - from ..completion import Completer from ..dist_context import DistributedContext dist_context = DistributedContext( serial_main_prog, serial_startup_prog, - engine._optimizer, + serial_optimizer, losses, {}, {"loss": losses}, engine._cluster, engine._strategy, ) - completer = Completer(dist_context) - completer.complete_forward_annotation() - dist_context.block_state.parse_forward_blocks( - dist_context.serial_main_program - ) + from ..completion import Completer + + completer = Completer(dist_context) + completer.complete_forward_annotation() + dist_context.block_state.parse_forward_blocks( + dist_context.serial_main_program + ) if mode == "eval" or mode == "predict": cost_estimator = CostEstimator(serial_main_prog, engine._cluster) @@ -596,7 +597,6 @@ def get_cost_from_engine(engine, mode): from ..parallelizer_v2 import Parallelizer # Get serial main program with backward - serial_optimizer = engine._optimizer parallelizer = Parallelizer(mode, completer, dist_context) # Generate backward loss_name = dist_context.serial_loss.name diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index 1581beb2a5..28e9fc69d7 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -13,8 +13,10 @@ # limitations under the License. import os +import copy import logging import random +import numbers import numpy as np from collections import defaultdict @@ -45,15 +47,16 @@ from .dist_loader import ( DistributedDataLoaderFromGenerator, DistributedDataLoader, ) -from .utils import to_list, get_dist_attr, get_lr from .process_group import new_process_group, get_all_process_groups from .dist_context import DistributedContext, get_default_distributed_context from .strategy import Strategy from .interface import CollectionNames, get_collection -from ..utils.log_utils import get_logger -from .utils import initialize_pg_in_full_mode +from .utils import to_list, get_dist_attr, get_lr, validate_opt +from .utils import initialize_pg_in_full_mode, get_input_split_info from .cost.estimate_cost import get_cost_from_engine +from ..utils.log_utils import get_logger + class Engine: """ @@ -137,6 +140,15 @@ class Engine: "'model must be sub classes of `paddle.nn.Layer` or any callable function." ) self._model = model + + if ( + loss + and not isinstance(loss, (paddle.nn.Layer, Variable)) + and not callable(loss) + ): + raise TypeError( + "'loss' must be sub classes of `paddle.nn.Layer` or any callable function or a Variable." + ) self._loss = loss if optimizer and not isinstance( @@ -147,13 +159,17 @@ class Engine: "'optimizer' must be object of class `paddle.optimizer.Optimizer`" " or `paddle.fluid.optimizer.Optimizer`." ) - self._optimizer = self._validate_opt(optimizer) + self._optimizer = validate_opt(optimizer) + self._orig_optimizer = copy.deepcopy(self._optimizer) metrics = metrics or [] for metric in to_list(metrics): - assert isinstance( - metric, Metric - ), "{} is not sub class of Metric".format(metric.__class__.__name__) + if metric and not isinstance(metric, Metric): + raise TypeError( + "{} is not sub class of Metric".format( + metric.__class__.__name__ + ) + ) self._metrics = to_list(metrics) if cluster and not isinstance(cluster, Cluster): @@ -168,9 +184,10 @@ class Engine: ) self._strategy = strategy or Strategy() + self._logger = get_logger(logging.INFO) if os.getenv("POD_NAME"): - print( - "Distribute training by paddle.distributed.launch", flush=True + self._logger.info( + "Distribute training by paddle.distributed.launch" ) fleet.init(is_collective=True) @@ -179,12 +196,12 @@ class Engine: self._nranks = paddle.distributed.get_world_size() self._saver = DistributedSaver() - self._logger = get_logger(logging.INFO) - self._orig_main_prog = static.default_main_program() self._orig_startup_prog = static.default_startup_program() self._orig_dist_context = get_default_distributed_context() self._dist_contexts = {} + self._fwd_main_progs = {} + self._fwd_dist_contexts = {} self._serial_main_progs = {} self._serial_startup_progs = {} self._dist_main_progs = defaultdict(dict) # dist main programs @@ -202,13 +219,14 @@ class Engine: self._labels_spec = [] self._inputs = [] self._labels = [] + self._losses = [] + self._mode = None self._skip_build = False self._outside_dataloader = False self._planned_mode = None self._dygraph_mode = False self._tuning = self._strategy.tuning - self._losses = None self.history = None @@ -230,7 +248,7 @@ class Engine: inputs = sample[:split] labels = sample[split:] else: - raise ValueError( + raise TypeError( "Data should be a Dataset or IterableDatset, but received {}.".format( type(data).__name__ ) @@ -259,8 +277,14 @@ class Engine: specs.append(spec) else: specs.append(spec.batch(batch_size)) - else: + elif isinstance(item, numbers.Number): specs.append(InputSpec([batch_size], type(item), name)) + else: + raise TypeError( + "The sample's dtype returned of dataset should be number, np.ndarray or Tensor, but got {}".format( + type(item).__name__ + ) + ) if inputs is not None: for i, item in enumerate(inputs): @@ -277,43 +301,41 @@ class Engine: labels_spec = self._validate_spec(labels_spec) return inputs_spec, labels_spec - def _prepare_data_tensor( - self, inputs_spec, labels_spec, inputs=None, labels=None - ): + def _prepare_data_tensor(self, inputs_spec, labels_spec, inputs, labels): if _non_static_mode() or self._dygraph_mode: - return None, None - inputs_spec = inputs_spec if inputs_spec else [] - labels_spec = labels_spec if labels_spec else [] + raise ValueError("Only support static graph mode.") + if inputs_spec: assert isinstance( inputs_spec, list ), "inputs should be list, but received {}".format( type(inputs_spec) ) - if inputs is None: - inputs = [s._create_feed_layer() for s in inputs_spec] - else: - assert isinstance( - inputs, list - ), "inputs should be list, but received {}".format(type(inputs)) - for input_spec, input in zip(inputs_spec, inputs): - if input_spec.shape != input.shape: - input.desc.set_shape(input_spec.shape) + assert isinstance( + inputs, list + ), "inputs should be list, but received {}".format(type(inputs)) + assert len(inputs_spec) == len( + inputs + ), "the number of `inputs_spec` should be equal to `inputs`'s." + for input_spec, input in zip(inputs_spec, inputs): + if input_spec.shape != input.shape: + input.desc.set_shape(input_spec.shape) if labels_spec: assert isinstance( labels_spec, list ), "labels should be list, but received {}".format( type(labels_spec) ) - if labels is None: - labels = [s._create_feed_layer() for s in labels_spec] - else: - assert isinstance( - labels, list - ), "labels should be list, but received {}".format(type(labels)) - for label_spec, label in zip(labels_spec, labels): - if label_spec.shape != label.shape: - label.desc.set_shape(label_spec.shape) + assert isinstance( + labels, list + ), "labels should be list, but received {}".format(type(labels)) + assert len(labels_spec) == len( + labels + ), "the number of `labels_spec` should be equal to `labels`'s." + for label_spec, label in zip(labels_spec, labels): + if label_spec.shape != label.shape: + label.desc.set_shape(label_spec.shape) + return inputs, labels def _prepare_reader(self): @@ -497,10 +519,12 @@ class Engine: self._dygraph_mode = True self._logger.info("Building model with 'to_static' method.") - inputs_spec = self._inputs_spec - labels_spec = self._labels_spec if self._labels_spec else [] self.program_helper = ProgramHelper( - self._model, self._loss, self._metrics, inputs_spec, labels_spec + self._model, + self._loss, + self._metrics, + self._inputs_spec, + self._labels_spec, ) # build forward main program self.program_helper.build_program(mode) @@ -509,16 +533,12 @@ class Engine: serial_main_prog = self.program_helper.main_program serial_startup_prog = self.program_helper.startup_program - inputs = self.program_helper.input_vars + self._inputs = self.program_helper.input_vars + self._labels = self.program_helper.label_vars outputs = self.program_helper.output_vars - labels = self.program_helper.label_vars - losses = self.program_helper.loss_vars - self._losses = losses + self._losses = self.program_helper.loss_vars metrics = self.program_helper.metric_vars - self._inputs = inputs - self._labels = labels - paddle.enable_static() else: # build program in static mode @@ -527,29 +547,45 @@ class Engine: return outputs = [] - losses = [] metrics = [] - inputs = self._inputs if self._inputs else [] - labels = self._labels if self._labels else [] + self._losses = [] serial_main_prog = self._orig_main_prog.clone() serial_startup_prog = self._orig_startup_prog.clone() if not self._skip_build: with static.program_guard( serial_main_prog, serial_startup_prog ), utils.unique_name.guard(): - outputs = to_list(self._model(*inputs)) + self._inputs = [ + s._create_feed_layer() for s in self._inputs_spec + ] + self._labels = [ + s._create_feed_layer() for s in self._labels_spec + ] + + outputs = to_list(self._model(*self._inputs)) + if mode != "predict" and self._loss: - losses = to_list(self._loss(*(outputs + labels))) - self._losses = losses + assert isinstance( + self._loss, paddle.nn.Layer + ) or callable( + self._loss + ), "the type of `loss` of the Engine arguments should be sub classes of `paddle.nn.Layer` or any callable function." + self._losses = to_list( + self._loss(*(outputs + self._labels)) + ) - if mode != "predict" and (outputs or labels): + if mode != "predict" and (outputs or self._labels): for metric in self._metrics: metrics.append( - to_list(metric.compute(*(outputs + labels))) + to_list( + metric.compute(*(outputs + self._labels)) + ) ) else: - losses = to_list(self._loss) - self.losses = losses + assert isinstance( + self._loss, Variable + ), "the type of `loss` of the Engine arguments should be Variable." + self._losses = to_list(self._loss) default_ctx = get_default_distributed_context() if not default_ctx.has_annotation: @@ -558,11 +594,11 @@ class Engine: new_process_group(list(range(self._nranks))) default_ctx.data_parallel = True - feed_vars = {"inputs": inputs, "labels": labels} + feed_vars = {"inputs": self._inputs, "labels": self._labels} fetch_vars = { "outputs": flatten(outputs), - "loss": losses, + "loss": self._losses, "metrics": metrics, } @@ -574,13 +610,24 @@ class Engine: serial_main_prog, serial_startup_prog, self._optimizer, - losses, + self._losses, + feed_vars, + fetch_vars, + self._cluster, + self._strategy, + ) + self._fwd_dist_contexts[mode] = DistributedContext( + serial_main_prog, + serial_startup_prog, + self._optimizer, + self._losses, feed_vars, fetch_vars, self._cluster, self._strategy, ) self._dist_contexts[mode].gradient_scale = self._strategy.gradient_scale + self._fwd_main_progs[mode] = serial_main_prog.clone() def _optimization_tuning(self, mode, dataset, batch_size): if not self._tuning.enable: @@ -637,8 +684,8 @@ class Engine: self._dp_world_sizes = [] self._dp_ranks = [] for feed_var in feed_list: - dp_world_size, dp_rank = self._get_input_split_info( - feed_var, self._dist_contexts[mode] + dp_world_size, dp_rank = get_input_split_info( + self._cur_rank, feed_var, self._dist_contexts[mode] ) self._dp_world_sizes.append(dp_world_size) self._dp_ranks.append(dp_rank) @@ -834,18 +881,11 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( train_data, train_sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) if not self._has_prepared[self._mode]: self._prepare_program(self._mode) else: self._switch_mode(self._mode) - assert ( - self._mode in self._dist_main_progs - ), "train model is not ready, please call `engine._prepare_program('train')` first." - train_dataloader = self._prepare_dataloader_from_generator( dataset=train_data, capacity=70, @@ -984,17 +1024,11 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( valid_data, valid_sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) if not self._has_prepared[self._mode]: self._prepare_program(self._mode) else: self._switch_mode(self._mode) - assert ( - self._mode in self._dist_main_progs - ), "eval model is not ready, please call `engine._prepare_program('eval')` first." valid_dataloader = self._prepare_dataloader_from_generator( dataset=valid_data, capacity=70, @@ -1096,18 +1130,11 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( test_data, test_sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) if not self._has_prepared[self._mode]: self._prepare_program(self._mode) else: self._switch_mode(self._mode) - assert ( - self._mode in self._dist_main_progs - ), "predict model is not ready, please call `engine._prepare_program('predict')` first." - test_dataloader = self._prepare_dataloader_from_generator( dataset=test_data, capacity=70, @@ -1165,13 +1192,11 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( dataset, sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) if not self._has_prepared[self._mode]: self._prepare_program(self._mode) else: self._switch_mode(self._mode) + dataloader = self._prepare_dataloader( dataset, return_list=False, @@ -1209,13 +1234,11 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( dataset, sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) if not self._has_prepared[self._mode]: self._prepare_program(self._mode) else: self._switch_mode(self._mode) + dataloader = self._prepare_dataloader_from_generator( dataset=dataset, capacity=capacity, @@ -1243,45 +1266,49 @@ class Engine: ): if mode is not None: self.to_mode(mode) + + if not self._mode: + raise ValueError( + "Please set mode to be prepared with `prepare(mode=...)`" + ) + + if self._has_prepared[self._mode]: + return + + inputs_spec = self._validate_spec(inputs_spec) + labels_spec = self._validate_spec(labels_spec) + inputs = self._validate_vars(inputs) + labels = self._validate_vars(labels) + + self._orig_main_prog = main_program + self._orig_startup_prog = startup_program if inputs or labels: self._skip_build = True - self._inputs_spec = inputs_spec - self._labels_spec = labels_spec - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec, inputs, labels + inputs, labels = self._prepare_data_tensor( + inputs_spec, labels_spec, inputs, labels ) - self._orig_main_prog = main_program if self._orig_main_prog is None: self._orig_main_prog = static.default_main_program() - self._orig_startup_prog = startup_program if self._orig_startup_prog is None: self._orig_startup_prog = static.default_startup_program() - if not self._has_prepared[self._mode]: - self._prepare_program(self._mode) - else: - self._switch_mode(self._mode) elif inputs_spec or labels_spec: - self._inputs_spec = inputs_spec - self._labels_spec = labels_spec self._outside_dataloader = True - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) - self._orig_main_prog = main_program if self._orig_main_prog is None: self._orig_main_prog = static.default_main_program() - self._orig_startup_prog = startup_program if self._orig_startup_prog is None: self._orig_startup_prog = static.default_startup_program() - if not self._has_prepared[self._mode]: - self._prepare_program(self._mode) - else: - self._switch_mode(self._mode) else: assert ( self._inputs_spec and self._labels_spec ), "Please call the dataloader(...) before calling prepare(...)" + self._inputs_spec, self._labels_spec = inputs_spec, labels_spec + self._inputs, self._labels = inputs, labels + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) + else: + self._switch_mode(self._mode) + def run(self, data=None, feed=None, fetch_list=None, mode=None): if mode is not None: self.to_mode(mode) @@ -1331,7 +1358,6 @@ class Engine: dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank] - dist_context = self._dist_contexts[self._mode] dist_main_block = dist_main_prog.global_block() # NOTE: Get feed_list, then insert dataloader op with sharded var shape. @@ -1400,7 +1426,6 @@ class Engine: dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank] - dist_context = self._dist_contexts[self._mode] dist_main_block = dist_main_prog.global_block() # NOTE: Get feed_list, then insert dataloader op with sharded var shape. @@ -1446,9 +1471,6 @@ class Engine: self._inputs_spec, self._labels_spec = self._prepare_data_spec( tune_data, tune_sample_split, batch_size ) - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) self._optimization_tuning(self._mode, tune_data, batch_size) def _validate_spec(self, specs): @@ -1456,7 +1478,10 @@ class Engine: self._k_steps = self._strategy.gradient_merge.k_steps if specs is not None: for i, spec in enumerate(specs): - assert isinstance(spec, InputSpec) + if not isinstance(spec, InputSpec): + raise TypeError( + "'spec' must be object of class `paddle.static.InputSpec`." + ) if spec.name is None: raise ValueError( "Requires Input[{}].name != None, but receive `None` with {}.".format( @@ -1472,39 +1497,20 @@ class Engine: ) shape[0] //= self._k_steps spec.shape = shape - return specs + return specs or [] + + def _validate_vars(self, vars): + vars = to_list(vars) + if vars is not None: + for i, var in enumerate(vars): + if not isinstance(var, Variable): + raise TypeError("'var' must be a `Variable`.") + return vars or [] def _is_local_var(self, var): var_name = _to_name_str(var) return var_name in self.main_program.global_block().vars - def _get_input_split_info(self, var, dist_context): - # deduce how the input data is split among the cluster - from .utils import _get_comm_group, _get_corresponding_rank - - tensor_dist_attr = dist_context.get_tensor_dist_attr_for_program(var) - process_mesh = tensor_dist_attr.process_mesh - dims_mapping = tensor_dist_attr.dims_mapping - - if self._cur_rank not in process_mesh.processes: - rank_id = _get_corresponding_rank( - dist_context, process_mesh, self._cur_rank - ) - else: - rank_id = self._cur_rank - - batch_size_axis = dims_mapping[0] - if batch_size_axis > -1 and process_mesh.topology[batch_size_axis] > 1: - group_ranks = _get_comm_group( - process_mesh.processes, - process_mesh.topology, - batch_size_axis, - rank_id, - ) - return len(group_ranks), group_ranks.index(rank_id) - - return 1, 0 - def _set_recompute_ckpts(self): # NOTE hack to enable recompute in engine api for GPT-3 # TODO support more PaddleNLP/CV models here @@ -1534,12 +1540,6 @@ class Engine: } self._logger.info(logs) - def _validate_opt(self, optimizer): - if optimizer is not None: - optimizer._parameter_list = None - optimizer._param_groups = None - return optimizer - def _reset_metrics(self): for metric in self._metrics: metric.reset() @@ -1551,6 +1551,9 @@ class Engine: return metrics_name def _switch_mode(self, mode): + assert ( + mode in self._dist_main_progs + ), "{} model is not ready, please call `prepare()` first.".format(mode) self.to_mode(mode) self._optimizer = self._dist_contexts[mode]._serial_optimizer @@ -1691,7 +1694,7 @@ class Engine: ) return self._state_dict, self._dist_attr - def cost(self, inputs_spec=None, labels_spec=None, mode="train"): + def cost(self, inputs_spec=None, labels_spec=None, mode=None): """ Get and Print cost, including memory of every rank, max memory among all ranks, and the global cost of one step based on @@ -1702,7 +1705,7 @@ class Engine: Args: inputs_spec(InputSpec): The specification of inputs. Default: None. labels_spec(InputSpec): The specification of labels. Default: None. - mode (str): The engine mode must be in ["train", "predict", "eval"]. Default: "train". + mode (str): The engine mode must be in ["train", "predict", "eval"]. Default: None. Returns: Return the global execution time (ms) and max memory (B). @@ -1710,33 +1713,44 @@ class Engine: """ # Check parallel mode if self._strategy.auto_mode == "full": - print( + self._logger.info( "The cost will be calcudated in the search process when the auto mode is full." ) return # Check mode - accepted_modes = ["train", "predict", "eval"] - if mode not in accepted_modes: + mode = mode if mode is not None else self._mode + assert mode is not None, "Please set mode." + if mode not in self._has_prepared: raise ValueError( "The mode {} is not in accepted modes {}".format( - mode, accepted_modes + mode, list(self._has_prepared.keys()) ) ) self.to_mode(mode) - if inputs_spec is not None: - self._inputs_spec, self._labels_spec = inputs_spec, labels_spec - self._inputs, self._labels = self._prepare_data_tensor( - self._inputs_spec, self._labels_spec - ) + if inputs_spec is not None and not self._has_prepared[mode]: + self._inputs_spec = self._validate_spec(inputs_spec) + self._labels_spec = self._validate_spec(labels_spec) self._build(mode) self._plan(mode) else: if _non_static_mode() or self._dygraph_mode: raise ValueError( - "Please call `engine._prepare_program('mode')` firstly when in the static graph mode." + "Please call `prepare()` or `fit()` or `evaluate()` or `predict()` before calling `cost()`." + ) + else: + self._logger.info( + "The program whose cost to be estimated must be static default program. Otherwise, please call `prepare()`before calling `cost()`." ) + program = paddle.static.default_main_program() + if ( + not program.global_block().ops + or not program.global_block().ops + ) and not self._has_prepared[mode]: + raise ValueError( + "Please call `prepare()` or `fit()` or `evaluate()` or `predict()` before calling `cost()`." + ) # Estimate the exec cost and max memory global_cost, max_memory = get_cost_from_engine(self, mode) diff --git a/python/paddle/distributed/auto_parallel/utils.py b/python/paddle/distributed/auto_parallel/utils.py index 8b7ec647f6..73d1c1412d 100644 --- a/python/paddle/distributed/auto_parallel/utils.py +++ b/python/paddle/distributed/auto_parallel/utils.py @@ -1876,3 +1876,34 @@ def initialize_pg_in_full_mode(all_process_groups, cur_rank): break process_group.instantiate() server_socket.close() + + +def get_input_split_info(cur_rank, var, dist_context): + # deduce how the input data is split among the cluster + tensor_dist_attr = dist_context.get_tensor_dist_attr_for_program(var) + process_mesh = tensor_dist_attr.process_mesh + dims_mapping = tensor_dist_attr.dims_mapping + + if cur_rank not in process_mesh.processes: + rank_id = _get_corresponding_rank(dist_context, process_mesh, cur_rank) + else: + rank_id = cur_rank + + batch_size_axis = dims_mapping[0] + if batch_size_axis > -1 and process_mesh.topology[batch_size_axis] > 1: + group_ranks = _get_comm_group( + process_mesh.processes, + process_mesh.topology, + batch_size_axis, + rank_id, + ) + return len(group_ranks), group_ranks.index(rank_id) + + return 1, 0 + + +def validate_opt(optimizer): + if optimizer is not None: + optimizer._parameter_list = None + optimizer._param_groups = None + return optimizer diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 3d34ed4fcd..bd6ccfd392 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -63,6 +63,15 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_engine_callbacks MODULES test_engine_callbacks) set_tests_properties(test_engine_callbacks PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) + py_test_modules(test_parallel_tuner MODULES test_parallel_tuner ENVS + ${dist_ENVS}) + set_tests_properties(test_parallel_tuner PROPERTIES TIMEOUT 120) + py_test_modules(test_parallel_tuner_full MODULES test_parallel_tuner_full + ENVS ${dist_ENVS}) + set_tests_properties(test_parallel_tuner_full PROPERTIES TIMEOUT 120) + py_test_modules(test_parallel_tuner_predict MODULES + test_parallel_tuner_predict ENVS ${dist_ENVS}) + set_tests_properties(test_parallel_tuner_predict PROPERTIES TIMEOUT 120) py_test_modules(test_while_op_completion MODULES test_while_op_completion ENVS ${dist_ENVS}) @@ -90,6 +99,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_prim_dist_op MODULES test_prim_dist_op ENVS ${dist_ENVS}) py_test_modules(test_to_static MODULES test_to_static ENVS ${dist_ENVS}) py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS}) + py_test_modules(test_cluster_v2 MODULES test_cluster_v2) py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2) py_test_modules(test_dist_attr_v2 MODULES test_dist_attr_v2) @@ -99,20 +109,10 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_interface MODULES test_interface) py_test_modules(test_strategy MODULES test_strategy) py_test_modules(test_pass_quantization MODULES test_pass_quantization) - py_test_modules(test_dist_shape MODULES test_dist_shape) py_test_modules(test_dist_assign MODULES test_dist_assign) py_test_modules(test_conditional_block_reshard MODULES test_conditional_block_reshard) - - py_test_modules(test_parallel_tuner MODULES test_parallel_tuner ENVS - ${dist_ENVS}) - set_tests_properties(test_parallel_tuner PROPERTIES TIMEOUT 120) - py_test_modules(test_parallel_tuner_full MODULES test_parallel_tuner_full - ENVS ${dist_ENVS}) - set_tests_properties(test_parallel_tuner_full PROPERTIES TIMEOUT 120) - py_test_modules(test_parallel_tuner_predict MODULES - test_parallel_tuner_predict ENVS ${dist_ENVS}) - set_tests_properties(test_parallel_tuner_predict PROPERTIES TIMEOUT 120) + py_test_modules(test_engine_api_error MODULES test_engine_api_error) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py index c09edf0442..291792e359 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -374,6 +374,57 @@ def train_non_builtin_data_vars(): def get_cost(): + main_program = static.Program() + startup_program = static.Program() + with static.program_guard( + main_program, startup_program + ), utils.unique_name.guard(): + input = static.data( + name="input", shape=[batch_size, image_size], dtype='float32' + ) + label = static.data(name="label", shape=[batch_size, 1], dtype='int64') + + loader = paddle.io.DataLoader.from_generator( + feed_list=[input, label], capacity=4 * batch_size, iterable=False + ) + places = static.cuda_places() + loader.set_batch_generator(batch_generator_creator(), places=places) + + mlp = MLPLayer( + hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02, + ) + loss = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam( + learning_rate=0.00001, + beta1=0.9, + beta2=0.999, + epsilon=1e-08, + grad_clip=None, + ) + metric = paddle.metric.Accuracy() + predict = mlp(input) + loss_var = loss(predict, label) + + strategy = auto.Strategy() + strategy.auto_mode = "semi" + + engine = auto.Engine( + loss=loss_var, optimizer=optimizer, metrics=metric, strategy=strategy + ) + engine.prepare( + main_program=main_program, + startup_program=startup_program, + inputs=[input], + labels=[label], + mode="train", + ) + engine.cost() + + +def get_cost_by_default_program(): main_program = static.default_main_program() startup_program = static.default_startup_program() with static.program_guard( @@ -414,7 +465,7 @@ def get_cost(): engine = auto.Engine( loss=loss_var, optimizer=optimizer, metrics=metric, strategy=strategy ) - engine.cost() + engine.cost(mode="train") def get_cost_by_spec(): @@ -451,4 +502,5 @@ if __name__ == "__main__": train_builtin_data_vars() train_non_builtin_data_vars() get_cost() + get_cost_by_default_program() get_cost_by_spec() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api_error.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api_error.py new file mode 100644 index 0000000000..cd825524b8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api_error.py @@ -0,0 +1,311 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import paddle +import paddle.static as static +import paddle.nn as nn +import paddle.nn.functional as F +from paddle.io import Dataset + +from paddle.distributed.fleet import auto + +paddle.enable_static() + + +epoch_num = 1 +batch_size = 2 +batch_num = 10 +hidden_size = 1024 +sequence_len = 512 +image_size = hidden_size +class_num = 10 + +is_fetch = True +is_feed = True +my_feed_vars = [] + + +class TrainDataset(Dataset): + def __init__(self, num_samples): + super(TrainDataset, self).__init__() + self.num_samples = num_samples + + def __getitem__(self, index): + input = np.random.uniform(size=image_size).astype("float32") + label = np.random.randint(0, class_num - 1, dtype="int64") + return input, label + + def __len__(self): + return self.num_samples + + +class TestDataset(Dataset): + def __init__(self, num_samples): + super(TestDataset, self).__init__() + self.num_samples = num_samples + + def __getitem__(self, index): + input = np.random.uniform(size=image_size).astype("float32") + return input + + def __len__(self): + return self.num_samples + + +class MLPLayer(nn.Layer): + def __init__( + self, + hidden_size=1024, + intermediate_size=4 * 1024, + dropout_ratio=0.1, + initializer_range=0.02, + ): + super(MLPLayer, self).__init__() + d_model = hidden_size + dim_feedforward = intermediate_size + weight_attr = paddle.ParamAttr( + initializer=nn.initializer.Normal(mean=0.0, std=initializer_range) + ) + bias_attr = None + + self.linear0 = nn.Linear( + d_model, dim_feedforward, weight_attr, bias_attr=bias_attr + ) + self.linear1 = nn.Linear( + dim_feedforward, d_model, weight_attr, bias_attr=bias_attr + ) + self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr) + self.norm = nn.LayerNorm(d_model, epsilon=1e-5) + self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train") + + def forward(self, input): + out = self.norm(input) + out = self.linear0(out) + + if is_feed: + my_feed_vars.append((out, out.shape)) + + out = F.gelu(out, approximate=True) + out = self.linear1(out) + out = self.dropout(out) + out = self.linear2(out) + + if is_feed: + my_feed_vars.append((out, out.shape)) + if is_fetch: + auto.fetch(out, "my_fetch", logging=True) + return out + + +class TestEngineErrorRaise(unittest.TestCase): + def setUp(self): + class NoSupportData1: + def __getitem__(self, index): + input = np.random.uniform(size=image_size).astype("float32") + label = np.random.randint(0, class_num - 1, dtype="int64") + return input, label + + class NoSupportData2(TrainDataset): + def __getitem__(self, index): + input = [ + list(np.random.uniform(size=image_size).astype("float32")) + ] + label = [np.random.randint(0, class_num - 1, dtype="int64")] + return input, label + + class NoSupportData3: + def __getitem__(self, index): + input = np.random.uniform(size=image_size).astype("float32") + return input + + class NoSupportData4(TestDataset): + def __getitem__(self, index): + input = [ + list(np.random.uniform(size=image_size).astype("float32")) + ] + return input + + self.no_support_data_1 = NoSupportData1() + self.no_support_data_2 = NoSupportData2(10) + self.no_support_data_3 = NoSupportData3() + self.no_support_data_4 = NoSupportData4(10) + + def test_Engine(self): + with self.assertRaises(TypeError): + auto.Engine(model=paddle.static.Program()) + with self.assertRaises(TypeError): + auto.Engine(loss="CrossEntropyLoss") + with self.assertRaises(TypeError): + auto.Engine(optimizer="adam") + with self.assertRaises(TypeError): + auto.Engine(metrics=["acc"]) + with self.assertRaises(TypeError): + auto.Engine(cluster="cluster") + with self.assertRaises(TypeError): + auto.Engine(strategy="strategy") + + def test_fit(self): + + with self.assertRaises(TypeError): + + engine = auto.Engine( + model=MLPLayer(), + loss=paddle.nn.CrossEntropyLoss(), + optimizer=paddle.optimizer.AdamW(0.00001), + ) + engine.fit(train_data=self.no_support_data_1) + + with self.assertRaises(TypeError): + + engine = auto.Engine( + model=MLPLayer(), + loss=paddle.nn.CrossEntropyLoss(), + optimizer=paddle.optimizer.AdamW(0.00001), + ) + engine.fit(train_data=self.no_support_data_2) + + def test_evaluate(self): + with self.assertRaises(TypeError): + + engine = auto.Engine( + model=MLPLayer(), + loss=paddle.nn.CrossEntropyLoss(), + metrics=paddle.metric.Accuracy(), + ) + engine.evaluate(valid_data=self.no_support_data_3) + + with self.assertRaises(TypeError): + + engine = auto.Engine( + model=MLPLayer(), + loss=paddle.nn.CrossEntropyLoss(), + metrics=paddle.metric.Accuracy(), + ) + engine.evaluate( + valid_data=self.no_support_data_4, valid_sample_split=1 + ) + + def test_predict(self): + with self.assertRaises(TypeError): + + engine = auto.Engine(model=MLPLayer()) + engine.predict( + test_data=self.no_support_data_3, test_sample_split=1 + ) + + with self.assertRaises(TypeError): + + engine = auto.Engine(model=MLPLayer()) + engine.predict( + test_data=self.no_support_data_4, test_sample_split=1 + ) + + def build_program(self): + main_prog = static.Program() + startup_prog = static.Program() + with static.program_guard(main_prog, startup_prog): + input = static.data( + name="input", + shape=[batch_size // 2, image_size], + dtype='float32', + ) + label = static.data( + name="label", shape=[batch_size // 2, 1], dtype='int64' + ) + mlp = MLPLayer() + loss = paddle.nn.CrossEntropyLoss() + predict = mlp(input) + loss_var = loss(predict, label) + return main_prog, startup_prog, input, label, loss_var + + def test_prepare(self): + with self.assertRaises(ValueError): + engine = auto.Engine(model=MLPLayer()) + engine.prepare() + + with self.assertRaises(AssertionError): + engine = auto.Engine(model=MLPLayer()) + engine.prepare(mode="train") + + with self.assertRaises(TypeError): + input = static.data( + name="input", + shape=[batch_size / 2, image_size], + dtype='float32', + ) + label = static.data( + name="label", shape=[batch_size / 2, 1], dtype='int64' + ) + engine = auto.Engine(model=MLPLayer()) + engine.prepare(inputs_spec=input, labels_spec=label, mode="eval") + + input_spec = static.InputSpec( + shape=[batch_size, image_size], dtype="float32", name="input" + ) + label_spec = static.InputSpec( + shape=[batch_size, image_size], dtype="float32", name="input" + ) + ( + main_prog, + startup_prog, + input_var, + label_var, + loss_var, + ) = self.build_program() + + with self.assertRaises(TypeError): + engine = auto.Engine(loss=loss_var) + engine.prepare( + inputs=input_spec, + labels=label_spec, + main_program=main_prog, + startup_program=startup_prog, + mode="eval", + ) + + with self.assertRaises(AssertionError): + engine = auto.Engine(loss=loss_var) + engine.prepare( + inputs_spec=[input_spec, input_spec], + labels_spec=[label_spec, label_spec], + inputs=input_var, + labels=label_var, + main_program=main_prog, + startup_program=startup_prog, + mode="predict", + ) + + def test_cost(self): + with self.assertRaises(ValueError): + engine = auto.Engine(model=MLPLayer()) + engine.cost(mode="predict") + + +class TestEngineDynamicErrorRaise(unittest.TestCase): + def setUp(self): + paddle.disable_static() + + def tearDown(self): + paddle.enable_static() + + def test_cost(self): + with self.assertRaises(ValueError): + engine = auto.Engine(model=MLPLayer()) + engine.cost(mode="predict") + + +if __name__ == "__main__": + unittest.main() -- GitLab