未验证 提交 315ef265 编写于 作者: Z zhaoyingli 提交者: GitHub

[AutoParallel] fix engine _build and cost method (#47263)

* fix engine build method

* fix import

* update engine cost

* update raise error

* update cmakelist

* revert optimizer

* revert optimizer

* fix unittest

* fix unittest
Co-authored-by: Ncaozhou <caozhou@radi.ac.cn>
上级 26c419ca
......@@ -34,6 +34,25 @@ class AdamOpCost(CompOpCost):
return 0
@register_op_cost
class ArgsortOpCost(CompOpCost):
OP_TYPE = "argsort"
def __init__(self, op=None, op_desc=None, cluster=None):
super(ArgsortOpCost, self).__init__(
op=op, op_desc=op_desc, cluster=cluster
)
# For a concrete COMP OP, the calc_time and calc_flops function need to be overrided
def calc_flops(self):
# NOTE: The actual formula will be filled in the future
return 0
def calc_time(self):
# NOTE: The actual formula will be filled in the future
return 0
@register_op_cost
class AssignOpCost(CompOpCost):
OP_TYPE = "assign"
......@@ -338,6 +357,25 @@ class ElementwiseSubGradOpCost(CompOpCost):
return 0
@register_op_cost
class EqualOpCost(CompOpCost):
OP_TYPE = "equal"
def __init__(self, op=None, op_desc=None, cluster=None):
super(EqualOpCost, self).__init__(
op=op, op_desc=op_desc, cluster=cluster
)
# For a concrete COMP OP, the calc_time and calc_flops function need to be overrided
def calc_flops(self):
# NOTE: The actual formula will be filled in the future
return 0
def calc_time(self):
# NOTE: The actual formula will be filled in the future
return 0
@register_op_cost
class EmbeddingOpCost(CompOpCost):
OP_TYPE = "c_embedding"
......
......@@ -545,11 +545,12 @@ class CostEstimator:
def get_cost_from_engine(engine, mode):
from ..utils import to_list
import copy
# Construct cost estimator by original main program
serial_main_prog = (
engine._serial_main_progs[mode].clone()
if mode in engine._serial_main_progs
engine._fwd_main_progs[mode].clone()
if mode in engine._fwd_main_progs
else engine._orig_main_prog.clone()
)
......@@ -566,29 +567,29 @@ def get_cost_from_engine(engine, mode):
)
else engine._losses
)
if mode in engine._dist_contexts:
dist_context = engine._dist_contexts[mode]
completer = engine._planners[mode].completer
serial_optimizer = copy.deepcopy(engine._orig_optimizer)
if mode in engine._fwd_dist_contexts:
dist_context = copy.deepcopy(engine._fwd_dist_contexts[mode])
else:
from ..completion import Completer
from ..dist_context import DistributedContext
dist_context = DistributedContext(
serial_main_prog,
serial_startup_prog,
engine._optimizer,
serial_optimizer,
losses,
{},
{"loss": losses},
engine._cluster,
engine._strategy,
)
completer = Completer(dist_context)
completer.complete_forward_annotation()
dist_context.block_state.parse_forward_blocks(
dist_context.serial_main_program
)
from ..completion import Completer
completer = Completer(dist_context)
completer.complete_forward_annotation()
dist_context.block_state.parse_forward_blocks(
dist_context.serial_main_program
)
if mode == "eval" or mode == "predict":
cost_estimator = CostEstimator(serial_main_prog, engine._cluster)
......@@ -596,7 +597,6 @@ def get_cost_from_engine(engine, mode):
from ..parallelizer_v2 import Parallelizer
# Get serial main program with backward
serial_optimizer = engine._optimizer
parallelizer = Parallelizer(mode, completer, dist_context)
# Generate backward
loss_name = dist_context.serial_loss.name
......
......@@ -13,8 +13,10 @@
# limitations under the License.
import os
import copy
import logging
import random
import numbers
import numpy as np
from collections import defaultdict
......@@ -45,15 +47,16 @@ from .dist_loader import (
DistributedDataLoaderFromGenerator,
DistributedDataLoader,
)
from .utils import to_list, get_dist_attr, get_lr
from .process_group import new_process_group, get_all_process_groups
from .dist_context import DistributedContext, get_default_distributed_context
from .strategy import Strategy
from .interface import CollectionNames, get_collection
from ..utils.log_utils import get_logger
from .utils import initialize_pg_in_full_mode
from .utils import to_list, get_dist_attr, get_lr, validate_opt
from .utils import initialize_pg_in_full_mode, get_input_split_info
from .cost.estimate_cost import get_cost_from_engine
from ..utils.log_utils import get_logger
class Engine:
"""
......@@ -137,6 +140,15 @@ class Engine:
"'model must be sub classes of `paddle.nn.Layer` or any callable function."
)
self._model = model
if (
loss
and not isinstance(loss, (paddle.nn.Layer, Variable))
and not callable(loss)
):
raise TypeError(
"'loss' must be sub classes of `paddle.nn.Layer` or any callable function or a Variable."
)
self._loss = loss
if optimizer and not isinstance(
......@@ -147,13 +159,17 @@ class Engine:
"'optimizer' must be object of class `paddle.optimizer.Optimizer`"
" or `paddle.fluid.optimizer.Optimizer`."
)
self._optimizer = self._validate_opt(optimizer)
self._optimizer = validate_opt(optimizer)
self._orig_optimizer = copy.deepcopy(self._optimizer)
metrics = metrics or []
for metric in to_list(metrics):
assert isinstance(
metric, Metric
), "{} is not sub class of Metric".format(metric.__class__.__name__)
if metric and not isinstance(metric, Metric):
raise TypeError(
"{} is not sub class of Metric".format(
metric.__class__.__name__
)
)
self._metrics = to_list(metrics)
if cluster and not isinstance(cluster, Cluster):
......@@ -168,9 +184,10 @@ class Engine:
)
self._strategy = strategy or Strategy()
self._logger = get_logger(logging.INFO)
if os.getenv("POD_NAME"):
print(
"Distribute training by paddle.distributed.launch", flush=True
self._logger.info(
"Distribute training by paddle.distributed.launch"
)
fleet.init(is_collective=True)
......@@ -179,12 +196,12 @@ class Engine:
self._nranks = paddle.distributed.get_world_size()
self._saver = DistributedSaver()
self._logger = get_logger(logging.INFO)
self._orig_main_prog = static.default_main_program()
self._orig_startup_prog = static.default_startup_program()
self._orig_dist_context = get_default_distributed_context()
self._dist_contexts = {}
self._fwd_main_progs = {}
self._fwd_dist_contexts = {}
self._serial_main_progs = {}
self._serial_startup_progs = {}
self._dist_main_progs = defaultdict(dict) # dist main programs
......@@ -202,13 +219,14 @@ class Engine:
self._labels_spec = []
self._inputs = []
self._labels = []
self._losses = []
self._mode = None
self._skip_build = False
self._outside_dataloader = False
self._planned_mode = None
self._dygraph_mode = False
self._tuning = self._strategy.tuning
self._losses = None
self.history = None
......@@ -230,7 +248,7 @@ class Engine:
inputs = sample[:split]
labels = sample[split:]
else:
raise ValueError(
raise TypeError(
"Data should be a Dataset or IterableDatset, but received {}.".format(
type(data).__name__
)
......@@ -259,8 +277,14 @@ class Engine:
specs.append(spec)
else:
specs.append(spec.batch(batch_size))
else:
elif isinstance(item, numbers.Number):
specs.append(InputSpec([batch_size], type(item), name))
else:
raise TypeError(
"The sample's dtype returned of dataset should be number, np.ndarray or Tensor, but got {}".format(
type(item).__name__
)
)
if inputs is not None:
for i, item in enumerate(inputs):
......@@ -277,43 +301,41 @@ class Engine:
labels_spec = self._validate_spec(labels_spec)
return inputs_spec, labels_spec
def _prepare_data_tensor(
self, inputs_spec, labels_spec, inputs=None, labels=None
):
def _prepare_data_tensor(self, inputs_spec, labels_spec, inputs, labels):
if _non_static_mode() or self._dygraph_mode:
return None, None
inputs_spec = inputs_spec if inputs_spec else []
labels_spec = labels_spec if labels_spec else []
raise ValueError("Only support static graph mode.")
if inputs_spec:
assert isinstance(
inputs_spec, list
), "inputs should be list, but received {}".format(
type(inputs_spec)
)
if inputs is None:
inputs = [s._create_feed_layer() for s in inputs_spec]
else:
assert isinstance(
inputs, list
), "inputs should be list, but received {}".format(type(inputs))
for input_spec, input in zip(inputs_spec, inputs):
if input_spec.shape != input.shape:
input.desc.set_shape(input_spec.shape)
assert isinstance(
inputs, list
), "inputs should be list, but received {}".format(type(inputs))
assert len(inputs_spec) == len(
inputs
), "the number of `inputs_spec` should be equal to `inputs`'s."
for input_spec, input in zip(inputs_spec, inputs):
if input_spec.shape != input.shape:
input.desc.set_shape(input_spec.shape)
if labels_spec:
assert isinstance(
labels_spec, list
), "labels should be list, but received {}".format(
type(labels_spec)
)
if labels is None:
labels = [s._create_feed_layer() for s in labels_spec]
else:
assert isinstance(
labels, list
), "labels should be list, but received {}".format(type(labels))
for label_spec, label in zip(labels_spec, labels):
if label_spec.shape != label.shape:
label.desc.set_shape(label_spec.shape)
assert isinstance(
labels, list
), "labels should be list, but received {}".format(type(labels))
assert len(labels_spec) == len(
labels
), "the number of `labels_spec` should be equal to `labels`'s."
for label_spec, label in zip(labels_spec, labels):
if label_spec.shape != label.shape:
label.desc.set_shape(label_spec.shape)
return inputs, labels
def _prepare_reader(self):
......@@ -497,10 +519,12 @@ class Engine:
self._dygraph_mode = True
self._logger.info("Building model with 'to_static' method.")
inputs_spec = self._inputs_spec
labels_spec = self._labels_spec if self._labels_spec else []
self.program_helper = ProgramHelper(
self._model, self._loss, self._metrics, inputs_spec, labels_spec
self._model,
self._loss,
self._metrics,
self._inputs_spec,
self._labels_spec,
)
# build forward main program
self.program_helper.build_program(mode)
......@@ -509,16 +533,12 @@ class Engine:
serial_main_prog = self.program_helper.main_program
serial_startup_prog = self.program_helper.startup_program
inputs = self.program_helper.input_vars
self._inputs = self.program_helper.input_vars
self._labels = self.program_helper.label_vars
outputs = self.program_helper.output_vars
labels = self.program_helper.label_vars
losses = self.program_helper.loss_vars
self._losses = losses
self._losses = self.program_helper.loss_vars
metrics = self.program_helper.metric_vars
self._inputs = inputs
self._labels = labels
paddle.enable_static()
else:
# build program in static mode
......@@ -527,29 +547,45 @@ class Engine:
return
outputs = []
losses = []
metrics = []
inputs = self._inputs if self._inputs else []
labels = self._labels if self._labels else []
self._losses = []
serial_main_prog = self._orig_main_prog.clone()
serial_startup_prog = self._orig_startup_prog.clone()
if not self._skip_build:
with static.program_guard(
serial_main_prog, serial_startup_prog
), utils.unique_name.guard():
outputs = to_list(self._model(*inputs))
self._inputs = [
s._create_feed_layer() for s in self._inputs_spec
]
self._labels = [
s._create_feed_layer() for s in self._labels_spec
]
outputs = to_list(self._model(*self._inputs))
if mode != "predict" and self._loss:
losses = to_list(self._loss(*(outputs + labels)))
self._losses = losses
assert isinstance(
self._loss, paddle.nn.Layer
) or callable(
self._loss
), "the type of `loss` of the Engine arguments should be sub classes of `paddle.nn.Layer` or any callable function."
self._losses = to_list(
self._loss(*(outputs + self._labels))
)
if mode != "predict" and (outputs or labels):
if mode != "predict" and (outputs or self._labels):
for metric in self._metrics:
metrics.append(
to_list(metric.compute(*(outputs + labels)))
to_list(
metric.compute(*(outputs + self._labels))
)
)
else:
losses = to_list(self._loss)
self.losses = losses
assert isinstance(
self._loss, Variable
), "the type of `loss` of the Engine arguments should be Variable."
self._losses = to_list(self._loss)
default_ctx = get_default_distributed_context()
if not default_ctx.has_annotation:
......@@ -558,11 +594,11 @@ class Engine:
new_process_group(list(range(self._nranks)))
default_ctx.data_parallel = True
feed_vars = {"inputs": inputs, "labels": labels}
feed_vars = {"inputs": self._inputs, "labels": self._labels}
fetch_vars = {
"outputs": flatten(outputs),
"loss": losses,
"loss": self._losses,
"metrics": metrics,
}
......@@ -574,13 +610,24 @@ class Engine:
serial_main_prog,
serial_startup_prog,
self._optimizer,
losses,
self._losses,
feed_vars,
fetch_vars,
self._cluster,
self._strategy,
)
self._fwd_dist_contexts[mode] = DistributedContext(
serial_main_prog,
serial_startup_prog,
self._optimizer,
self._losses,
feed_vars,
fetch_vars,
self._cluster,
self._strategy,
)
self._dist_contexts[mode].gradient_scale = self._strategy.gradient_scale
self._fwd_main_progs[mode] = serial_main_prog.clone()
def _optimization_tuning(self, mode, dataset, batch_size):
if not self._tuning.enable:
......@@ -637,8 +684,8 @@ class Engine:
self._dp_world_sizes = []
self._dp_ranks = []
for feed_var in feed_list:
dp_world_size, dp_rank = self._get_input_split_info(
feed_var, self._dist_contexts[mode]
dp_world_size, dp_rank = get_input_split_info(
self._cur_rank, feed_var, self._dist_contexts[mode]
)
self._dp_world_sizes.append(dp_world_size)
self._dp_ranks.append(dp_rank)
......@@ -834,18 +881,11 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
train_data, train_sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert (
self._mode in self._dist_main_progs
), "train model is not ready, please call `engine._prepare_program('train')` first."
train_dataloader = self._prepare_dataloader_from_generator(
dataset=train_data,
capacity=70,
......@@ -984,17 +1024,11 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
valid_data, valid_sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert (
self._mode in self._dist_main_progs
), "eval model is not ready, please call `engine._prepare_program('eval')` first."
valid_dataloader = self._prepare_dataloader_from_generator(
dataset=valid_data,
capacity=70,
......@@ -1096,18 +1130,11 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
test_data, test_sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
assert (
self._mode in self._dist_main_progs
), "predict model is not ready, please call `engine._prepare_program('predict')` first."
test_dataloader = self._prepare_dataloader_from_generator(
dataset=test_data,
capacity=70,
......@@ -1165,13 +1192,11 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
dataset, sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
dataloader = self._prepare_dataloader(
dataset,
return_list=False,
......@@ -1209,13 +1234,11 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
dataset, sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
dataloader = self._prepare_dataloader_from_generator(
dataset=dataset,
capacity=capacity,
......@@ -1243,45 +1266,49 @@ class Engine:
):
if mode is not None:
self.to_mode(mode)
if not self._mode:
raise ValueError(
"Please set mode to be prepared with `prepare(mode=...)`"
)
if self._has_prepared[self._mode]:
return
inputs_spec = self._validate_spec(inputs_spec)
labels_spec = self._validate_spec(labels_spec)
inputs = self._validate_vars(inputs)
labels = self._validate_vars(labels)
self._orig_main_prog = main_program
self._orig_startup_prog = startup_program
if inputs or labels:
self._skip_build = True
self._inputs_spec = inputs_spec
self._labels_spec = labels_spec
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec, inputs, labels
inputs, labels = self._prepare_data_tensor(
inputs_spec, labels_spec, inputs, labels
)
self._orig_main_prog = main_program
if self._orig_main_prog is None:
self._orig_main_prog = static.default_main_program()
self._orig_startup_prog = startup_program
if self._orig_startup_prog is None:
self._orig_startup_prog = static.default_startup_program()
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
elif inputs_spec or labels_spec:
self._inputs_spec = inputs_spec
self._labels_spec = labels_spec
self._outside_dataloader = True
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
self._orig_main_prog = main_program
if self._orig_main_prog is None:
self._orig_main_prog = static.default_main_program()
self._orig_startup_prog = startup_program
if self._orig_startup_prog is None:
self._orig_startup_prog = static.default_startup_program()
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
else:
assert (
self._inputs_spec and self._labels_spec
), "Please call the dataloader(...) before calling prepare(...)"
self._inputs_spec, self._labels_spec = inputs_spec, labels_spec
self._inputs, self._labels = inputs, labels
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
def run(self, data=None, feed=None, fetch_list=None, mode=None):
if mode is not None:
self.to_mode(mode)
......@@ -1331,7 +1358,6 @@ class Engine:
dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self._mode]
dist_main_block = dist_main_prog.global_block()
# NOTE: Get feed_list, then insert dataloader op with sharded var shape.
......@@ -1400,7 +1426,6 @@ class Engine:
dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self._mode]
dist_main_block = dist_main_prog.global_block()
# NOTE: Get feed_list, then insert dataloader op with sharded var shape.
......@@ -1446,9 +1471,6 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
tune_data, tune_sample_split, batch_size
)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
self._optimization_tuning(self._mode, tune_data, batch_size)
def _validate_spec(self, specs):
......@@ -1456,7 +1478,10 @@ class Engine:
self._k_steps = self._strategy.gradient_merge.k_steps
if specs is not None:
for i, spec in enumerate(specs):
assert isinstance(spec, InputSpec)
if not isinstance(spec, InputSpec):
raise TypeError(
"'spec' must be object of class `paddle.static.InputSpec`."
)
if spec.name is None:
raise ValueError(
"Requires Input[{}].name != None, but receive `None` with {}.".format(
......@@ -1472,39 +1497,20 @@ class Engine:
)
shape[0] //= self._k_steps
spec.shape = shape
return specs
return specs or []
def _validate_vars(self, vars):
vars = to_list(vars)
if vars is not None:
for i, var in enumerate(vars):
if not isinstance(var, Variable):
raise TypeError("'var' must be a `Variable`.")
return vars or []
def _is_local_var(self, var):
var_name = _to_name_str(var)
return var_name in self.main_program.global_block().vars
def _get_input_split_info(self, var, dist_context):
# deduce how the input data is split among the cluster
from .utils import _get_comm_group, _get_corresponding_rank
tensor_dist_attr = dist_context.get_tensor_dist_attr_for_program(var)
process_mesh = tensor_dist_attr.process_mesh
dims_mapping = tensor_dist_attr.dims_mapping
if self._cur_rank not in process_mesh.processes:
rank_id = _get_corresponding_rank(
dist_context, process_mesh, self._cur_rank
)
else:
rank_id = self._cur_rank
batch_size_axis = dims_mapping[0]
if batch_size_axis > -1 and process_mesh.topology[batch_size_axis] > 1:
group_ranks = _get_comm_group(
process_mesh.processes,
process_mesh.topology,
batch_size_axis,
rank_id,
)
return len(group_ranks), group_ranks.index(rank_id)
return 1, 0
def _set_recompute_ckpts(self):
# NOTE hack to enable recompute in engine api for GPT-3
# TODO support more PaddleNLP/CV models here
......@@ -1534,12 +1540,6 @@ class Engine:
}
self._logger.info(logs)
def _validate_opt(self, optimizer):
if optimizer is not None:
optimizer._parameter_list = None
optimizer._param_groups = None
return optimizer
def _reset_metrics(self):
for metric in self._metrics:
metric.reset()
......@@ -1551,6 +1551,9 @@ class Engine:
return metrics_name
def _switch_mode(self, mode):
assert (
mode in self._dist_main_progs
), "{} model is not ready, please call `prepare()` first.".format(mode)
self.to_mode(mode)
self._optimizer = self._dist_contexts[mode]._serial_optimizer
......@@ -1691,7 +1694,7 @@ class Engine:
)
return self._state_dict, self._dist_attr
def cost(self, inputs_spec=None, labels_spec=None, mode="train"):
def cost(self, inputs_spec=None, labels_spec=None, mode=None):
"""
Get and Print cost, including memory of every rank,
max memory among all ranks, and the global cost of one step based on
......@@ -1702,7 +1705,7 @@ class Engine:
Args:
inputs_spec(InputSpec): The specification of inputs. Default: None.
labels_spec(InputSpec): The specification of labels. Default: None.
mode (str): The engine mode must be in ["train", "predict", "eval"]. Default: "train".
mode (str): The engine mode must be in ["train", "predict", "eval"]. Default: None.
Returns:
Return the global execution time (ms) and max memory (B).
......@@ -1710,33 +1713,44 @@ class Engine:
"""
# Check parallel mode
if self._strategy.auto_mode == "full":
print(
self._logger.info(
"The cost will be calcudated in the search process when the auto mode is full."
)
return
# Check mode
accepted_modes = ["train", "predict", "eval"]
if mode not in accepted_modes:
mode = mode if mode is not None else self._mode
assert mode is not None, "Please set mode."
if mode not in self._has_prepared:
raise ValueError(
"The mode {} is not in accepted modes {}".format(
mode, accepted_modes
mode, list(self._has_prepared.keys())
)
)
self.to_mode(mode)
if inputs_spec is not None:
self._inputs_spec, self._labels_spec = inputs_spec, labels_spec
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec
)
if inputs_spec is not None and not self._has_prepared[mode]:
self._inputs_spec = self._validate_spec(inputs_spec)
self._labels_spec = self._validate_spec(labels_spec)
self._build(mode)
self._plan(mode)
else:
if _non_static_mode() or self._dygraph_mode:
raise ValueError(
"Please call `engine._prepare_program('mode')` firstly when in the static graph mode."
"Please call `prepare()` or `fit()` or `evaluate()` or `predict()` before calling `cost()`."
)
else:
self._logger.info(
"The program whose cost to be estimated must be static default program. Otherwise, please call `prepare()`before calling `cost()`."
)
program = paddle.static.default_main_program()
if (
not program.global_block().ops
or not program.global_block().ops
) and not self._has_prepared[mode]:
raise ValueError(
"Please call `prepare()` or `fit()` or `evaluate()` or `predict()` before calling `cost()`."
)
# Estimate the exec cost and max memory
global_cost, max_memory = get_cost_from_engine(self, mode)
......
......@@ -1876,3 +1876,34 @@ def initialize_pg_in_full_mode(all_process_groups, cur_rank):
break
process_group.instantiate()
server_socket.close()
def get_input_split_info(cur_rank, var, dist_context):
# deduce how the input data is split among the cluster
tensor_dist_attr = dist_context.get_tensor_dist_attr_for_program(var)
process_mesh = tensor_dist_attr.process_mesh
dims_mapping = tensor_dist_attr.dims_mapping
if cur_rank not in process_mesh.processes:
rank_id = _get_corresponding_rank(dist_context, process_mesh, cur_rank)
else:
rank_id = cur_rank
batch_size_axis = dims_mapping[0]
if batch_size_axis > -1 and process_mesh.topology[batch_size_axis] > 1:
group_ranks = _get_comm_group(
process_mesh.processes,
process_mesh.topology,
batch_size_axis,
rank_id,
)
return len(group_ranks), group_ranks.index(rank_id)
return 1, 0
def validate_opt(optimizer):
if optimizer is not None:
optimizer._parameter_list = None
optimizer._param_groups = None
return optimizer
......@@ -63,6 +63,15 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_engine_callbacks MODULES test_engine_callbacks)
set_tests_properties(test_engine_callbacks
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50)
py_test_modules(test_parallel_tuner MODULES test_parallel_tuner ENVS
${dist_ENVS})
set_tests_properties(test_parallel_tuner PROPERTIES TIMEOUT 120)
py_test_modules(test_parallel_tuner_full MODULES test_parallel_tuner_full
ENVS ${dist_ENVS})
set_tests_properties(test_parallel_tuner_full PROPERTIES TIMEOUT 120)
py_test_modules(test_parallel_tuner_predict MODULES
test_parallel_tuner_predict ENVS ${dist_ENVS})
set_tests_properties(test_parallel_tuner_predict PROPERTIES TIMEOUT 120)
py_test_modules(test_while_op_completion MODULES test_while_op_completion
ENVS ${dist_ENVS})
......@@ -90,6 +99,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_prim_dist_op MODULES test_prim_dist_op ENVS ${dist_ENVS})
py_test_modules(test_to_static MODULES test_to_static ENVS ${dist_ENVS})
py_test_modules(test_dist_op_cost MODULES test_dist_op_cost ENVS ${dist_ENVS})
py_test_modules(test_cluster_v2 MODULES test_cluster_v2)
py_test_modules(test_process_mesh_v2 MODULES test_process_mesh_v2)
py_test_modules(test_dist_attr_v2 MODULES test_dist_attr_v2)
......@@ -99,20 +109,10 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_interface MODULES test_interface)
py_test_modules(test_strategy MODULES test_strategy)
py_test_modules(test_pass_quantization MODULES test_pass_quantization)
py_test_modules(test_dist_shape MODULES test_dist_shape)
py_test_modules(test_dist_assign MODULES test_dist_assign)
py_test_modules(test_conditional_block_reshard MODULES
test_conditional_block_reshard)
py_test_modules(test_parallel_tuner MODULES test_parallel_tuner ENVS
${dist_ENVS})
set_tests_properties(test_parallel_tuner PROPERTIES TIMEOUT 120)
py_test_modules(test_parallel_tuner_full MODULES test_parallel_tuner_full
ENVS ${dist_ENVS})
set_tests_properties(test_parallel_tuner_full PROPERTIES TIMEOUT 120)
py_test_modules(test_parallel_tuner_predict MODULES
test_parallel_tuner_predict ENVS ${dist_ENVS})
set_tests_properties(test_parallel_tuner_predict PROPERTIES TIMEOUT 120)
py_test_modules(test_engine_api_error MODULES test_engine_api_error)
endif()
......@@ -374,6 +374,57 @@ def train_non_builtin_data_vars():
def get_cost():
main_program = static.Program()
startup_program = static.Program()
with static.program_guard(
main_program, startup_program
), utils.unique_name.guard():
input = static.data(
name="input", shape=[batch_size, image_size], dtype='float32'
)
label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
loader = paddle.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=False
)
places = static.cuda_places()
loader.set_batch_generator(batch_generator_creator(), places=places)
mlp = MLPLayer(
hidden_size=hidden_size,
intermediate_size=4 * hidden_size,
dropout_ratio=0.1,
initializer_range=0.02,
)
loss = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(
learning_rate=0.00001,
beta1=0.9,
beta2=0.999,
epsilon=1e-08,
grad_clip=None,
)
metric = paddle.metric.Accuracy()
predict = mlp(input)
loss_var = loss(predict, label)
strategy = auto.Strategy()
strategy.auto_mode = "semi"
engine = auto.Engine(
loss=loss_var, optimizer=optimizer, metrics=metric, strategy=strategy
)
engine.prepare(
main_program=main_program,
startup_program=startup_program,
inputs=[input],
labels=[label],
mode="train",
)
engine.cost()
def get_cost_by_default_program():
main_program = static.default_main_program()
startup_program = static.default_startup_program()
with static.program_guard(
......@@ -414,7 +465,7 @@ def get_cost():
engine = auto.Engine(
loss=loss_var, optimizer=optimizer, metrics=metric, strategy=strategy
)
engine.cost()
engine.cost(mode="train")
def get_cost_by_spec():
......@@ -451,4 +502,5 @@ if __name__ == "__main__":
train_builtin_data_vars()
train_non_builtin_data_vars()
get_cost()
get_cost_by_default_program()
get_cost_by_spec()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import paddle.static as static
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.io import Dataset
from paddle.distributed.fleet import auto
paddle.enable_static()
epoch_num = 1
batch_size = 2
batch_num = 10
hidden_size = 1024
sequence_len = 512
image_size = hidden_size
class_num = 10
is_fetch = True
is_feed = True
my_feed_vars = []
class TrainDataset(Dataset):
def __init__(self, num_samples):
super(TrainDataset, self).__init__()
self.num_samples = num_samples
def __getitem__(self, index):
input = np.random.uniform(size=image_size).astype("float32")
label = np.random.randint(0, class_num - 1, dtype="int64")
return input, label
def __len__(self):
return self.num_samples
class TestDataset(Dataset):
def __init__(self, num_samples):
super(TestDataset, self).__init__()
self.num_samples = num_samples
def __getitem__(self, index):
input = np.random.uniform(size=image_size).astype("float32")
return input
def __len__(self):
return self.num_samples
class MLPLayer(nn.Layer):
def __init__(
self,
hidden_size=1024,
intermediate_size=4 * 1024,
dropout_ratio=0.1,
initializer_range=0.02,
):
super(MLPLayer, self).__init__()
d_model = hidden_size
dim_feedforward = intermediate_size
weight_attr = paddle.ParamAttr(
initializer=nn.initializer.Normal(mean=0.0, std=initializer_range)
)
bias_attr = None
self.linear0 = nn.Linear(
d_model, dim_feedforward, weight_attr, bias_attr=bias_attr
)
self.linear1 = nn.Linear(
dim_feedforward, d_model, weight_attr, bias_attr=bias_attr
)
self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr)
self.norm = nn.LayerNorm(d_model, epsilon=1e-5)
self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train")
def forward(self, input):
out = self.norm(input)
out = self.linear0(out)
if is_feed:
my_feed_vars.append((out, out.shape))
out = F.gelu(out, approximate=True)
out = self.linear1(out)
out = self.dropout(out)
out = self.linear2(out)
if is_feed:
my_feed_vars.append((out, out.shape))
if is_fetch:
auto.fetch(out, "my_fetch", logging=True)
return out
class TestEngineErrorRaise(unittest.TestCase):
def setUp(self):
class NoSupportData1:
def __getitem__(self, index):
input = np.random.uniform(size=image_size).astype("float32")
label = np.random.randint(0, class_num - 1, dtype="int64")
return input, label
class NoSupportData2(TrainDataset):
def __getitem__(self, index):
input = [
list(np.random.uniform(size=image_size).astype("float32"))
]
label = [np.random.randint(0, class_num - 1, dtype="int64")]
return input, label
class NoSupportData3:
def __getitem__(self, index):
input = np.random.uniform(size=image_size).astype("float32")
return input
class NoSupportData4(TestDataset):
def __getitem__(self, index):
input = [
list(np.random.uniform(size=image_size).astype("float32"))
]
return input
self.no_support_data_1 = NoSupportData1()
self.no_support_data_2 = NoSupportData2(10)
self.no_support_data_3 = NoSupportData3()
self.no_support_data_4 = NoSupportData4(10)
def test_Engine(self):
with self.assertRaises(TypeError):
auto.Engine(model=paddle.static.Program())
with self.assertRaises(TypeError):
auto.Engine(loss="CrossEntropyLoss")
with self.assertRaises(TypeError):
auto.Engine(optimizer="adam")
with self.assertRaises(TypeError):
auto.Engine(metrics=["acc"])
with self.assertRaises(TypeError):
auto.Engine(cluster="cluster")
with self.assertRaises(TypeError):
auto.Engine(strategy="strategy")
def test_fit(self):
with self.assertRaises(TypeError):
engine = auto.Engine(
model=MLPLayer(),
loss=paddle.nn.CrossEntropyLoss(),
optimizer=paddle.optimizer.AdamW(0.00001),
)
engine.fit(train_data=self.no_support_data_1)
with self.assertRaises(TypeError):
engine = auto.Engine(
model=MLPLayer(),
loss=paddle.nn.CrossEntropyLoss(),
optimizer=paddle.optimizer.AdamW(0.00001),
)
engine.fit(train_data=self.no_support_data_2)
def test_evaluate(self):
with self.assertRaises(TypeError):
engine = auto.Engine(
model=MLPLayer(),
loss=paddle.nn.CrossEntropyLoss(),
metrics=paddle.metric.Accuracy(),
)
engine.evaluate(valid_data=self.no_support_data_3)
with self.assertRaises(TypeError):
engine = auto.Engine(
model=MLPLayer(),
loss=paddle.nn.CrossEntropyLoss(),
metrics=paddle.metric.Accuracy(),
)
engine.evaluate(
valid_data=self.no_support_data_4, valid_sample_split=1
)
def test_predict(self):
with self.assertRaises(TypeError):
engine = auto.Engine(model=MLPLayer())
engine.predict(
test_data=self.no_support_data_3, test_sample_split=1
)
with self.assertRaises(TypeError):
engine = auto.Engine(model=MLPLayer())
engine.predict(
test_data=self.no_support_data_4, test_sample_split=1
)
def build_program(self):
main_prog = static.Program()
startup_prog = static.Program()
with static.program_guard(main_prog, startup_prog):
input = static.data(
name="input",
shape=[batch_size // 2, image_size],
dtype='float32',
)
label = static.data(
name="label", shape=[batch_size // 2, 1], dtype='int64'
)
mlp = MLPLayer()
loss = paddle.nn.CrossEntropyLoss()
predict = mlp(input)
loss_var = loss(predict, label)
return main_prog, startup_prog, input, label, loss_var
def test_prepare(self):
with self.assertRaises(ValueError):
engine = auto.Engine(model=MLPLayer())
engine.prepare()
with self.assertRaises(AssertionError):
engine = auto.Engine(model=MLPLayer())
engine.prepare(mode="train")
with self.assertRaises(TypeError):
input = static.data(
name="input",
shape=[batch_size / 2, image_size],
dtype='float32',
)
label = static.data(
name="label", shape=[batch_size / 2, 1], dtype='int64'
)
engine = auto.Engine(model=MLPLayer())
engine.prepare(inputs_spec=input, labels_spec=label, mode="eval")
input_spec = static.InputSpec(
shape=[batch_size, image_size], dtype="float32", name="input"
)
label_spec = static.InputSpec(
shape=[batch_size, image_size], dtype="float32", name="input"
)
(
main_prog,
startup_prog,
input_var,
label_var,
loss_var,
) = self.build_program()
with self.assertRaises(TypeError):
engine = auto.Engine(loss=loss_var)
engine.prepare(
inputs=input_spec,
labels=label_spec,
main_program=main_prog,
startup_program=startup_prog,
mode="eval",
)
with self.assertRaises(AssertionError):
engine = auto.Engine(loss=loss_var)
engine.prepare(
inputs_spec=[input_spec, input_spec],
labels_spec=[label_spec, label_spec],
inputs=input_var,
labels=label_var,
main_program=main_prog,
startup_program=startup_prog,
mode="predict",
)
def test_cost(self):
with self.assertRaises(ValueError):
engine = auto.Engine(model=MLPLayer())
engine.cost(mode="predict")
class TestEngineDynamicErrorRaise(unittest.TestCase):
def setUp(self):
paddle.disable_static()
def tearDown(self):
paddle.enable_static()
def test_cost(self):
with self.assertRaises(ValueError):
engine = auto.Engine(model=MLPLayer())
engine.cost(mode="predict")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册