From 686fa07a88e18f59da3df1752838598a3aa18c38 Mon Sep 17 00:00:00 2001 From: Yulong Ao Date: Wed, 12 Oct 2022 19:29:06 +0800 Subject: [PATCH] [Auto Parallel] Improve the fine-grained APIs (#46552) * [Auto Parallel] Suppport different dataloaders * [Auto Parallel] Add num_shards config for dataset * [Auto Parallel] Unify the logger and outputs of Engine API * [Auto Parallel] Fix the bugs of to_static * [Auto Parallel] Adjust the test_to_static.py * [Auto Parallel] Add the prepare API and replace __call__ with run * [Auto Parallel] Improve the private implementations of Engine * [Auto Parallel] Set capacity of dataloader for opt tuning * [Auto Parallel] [WIP] Change the fine-grained API * [Auto Parallel] Improve APIs to support different user cases * [Auto Parallel] Add removed config * [Auto Parallel] Add imports * [Auto Parallel] Fix bugs for to_static * [Auto Parallel] Remove unnecessary imports --- .../distributed/auto_parallel/constants.py | 7 + .../distributed/auto_parallel/dist_loader.py | 183 +++- .../distributed/auto_parallel/engine.py | 901 ++++++++++++------ .../distributed/auto_parallel/interface.py | 6 +- .../distributed/auto_parallel/partitioner.py | 6 +- .../distributed/auto_parallel/reshard.py | 7 +- .../distributed/auto_parallel/strategy.py | 10 + .../auto_parallel/tuner/profiler.py | 17 +- .../unittests/auto_parallel/engine_api.py | 201 +++- 9 files changed, 969 insertions(+), 369 deletions(-) diff --git a/python/paddle/distributed/auto_parallel/constants.py b/python/paddle/distributed/auto_parallel/constants.py index 86a545322a2..82c5011faf0 100644 --- a/python/paddle/distributed/auto_parallel/constants.py +++ b/python/paddle/distributed/auto_parallel/constants.py @@ -116,3 +116,10 @@ set_field_default_config(TUNING, "profile_start_step", 1) set_field_default_config(TUNING, "profile_end_step", 1) set_field_default_config(TUNING, "run_after_tuning", True) set_field_default_config(TUNING, "verbose", True) + +######################################### +# dataset configuration +######################################### +DATASET = "dataset" +set_field_default_config(DATASET, "enable", False) +set_field_default_config(DATASET, "num_shards", 1) diff --git a/python/paddle/distributed/auto_parallel/dist_loader.py b/python/paddle/distributed/auto_parallel/dist_loader.py index e3487d7178e..9eb62b1b74e 100644 --- a/python/paddle/distributed/auto_parallel/dist_loader.py +++ b/python/paddle/distributed/auto_parallel/dist_loader.py @@ -17,38 +17,11 @@ import numpy as np import paddle from paddle.io import BatchSampler, IterableDataset -from paddle.fluid.dataloader.batch_sampler import _InfiniteIterableSampler +from paddle.fluid.dataloader.batch_sampler import _InfiniteIterableSampler, DistributedBatchSampler from paddle.fluid.dataloader.dataloader_iter import _DatasetKind, default_collate_fn, default_convert_fn -class DistributedDataLoader(metaclass=abc.ABCMeta): - - def __init__(self, dataset, batch_size=1, epochs=1, drop_last=False): - if isinstance(dataset, IterableDataset): - self.dataset_kind = _DatasetKind.ITER - else: - self.dataset_kind = _DatasetKind.MAP - - self.dataset = dataset - self.epochs = epochs - self.drop_last = drop_last - - if batch_size is None: - self.batch_size = None - self.batch_sampler = None - else: - self.batch_size = batch_size - if isinstance(dataset, IterableDataset): - self.batch_sampler = _InfiniteIterableSampler( - dataset, batch_size) - else: - self.batch_sampler = BatchSampler(dataset, - batch_size=batch_size, - shuffle=False, - drop_last=drop_last) - - self.auto_collate_batch = self.batch_sampler is not None - self.sampler_iter = iter(self.index_sampler) +class DistributedDataLoaderBase(metaclass=abc.ABCMeta): @abc.abstractmethod def __iter__(self): @@ -58,48 +31,70 @@ class DistributedDataLoader(metaclass=abc.ABCMeta): def __next__(self): raise NotImplementedError - @property - def index_sampler(self): - if self.auto_collate_batch: - return self.batch_sampler - else: - if self.dataset_kind == _DatasetKind.MAP: - return list(range(len(self.dataset))) - else: - return _InfiniteIterableSampler(self.dataset, 1) - -class NonIterableGeneratorLoader(DistributedDataLoader): +class DistributedDataLoaderFromGenerator(DistributedDataLoaderBase): def __init__(self, dataset, - feed_list, - places, + feed_list=None, + capacity=None, + use_double_buffer=True, + iterable=True, + return_list=False, + use_multiprocess=False, + drop_last=True, + places=None, batch_size=1, epochs=1, steps_per_epoch=None, collate_fn=None, + split_data=True, data_parallel_world_size=[], - data_parallel_rank=[], - drop_last=False, - split_data=True): + data_parallel_rank=[]): + self.dataset = dataset self.feed_list = feed_list + self.capacity = capacity + self.use_double_buffer = use_double_buffer + self.iterable = iterable + self.return_list = return_list + self.use_multiprocess = use_multiprocess + self.drop_last = drop_last self.places = places + self.batch_size = batch_size + self.epochs = epochs self.steps_per_epoch = steps_per_epoch - + self.collate_fn = collate_fn + self.split_data = split_data assert len(data_parallel_world_size) == len(feed_list) assert len(data_parallel_rank) == len(feed_list) self.dp_world_sizes = data_parallel_world_size self.dp_ranks = data_parallel_rank - self.split_data = split_data - super(NonIterableGeneratorLoader, - self).__init__(dataset, batch_size, epochs, drop_last) + if isinstance(dataset, IterableDataset): + self.dataset_kind = _DatasetKind.ITER + else: + self.dataset_kind = _DatasetKind.MAP + + if self.batch_size is None: + self.batch_sampler = None + else: + if isinstance(dataset, IterableDataset): + self.batch_sampler = _InfiniteIterableSampler( + dataset, batch_size) + else: + self.batch_sampler = BatchSampler(dataset, + batch_size=batch_size, + shuffle=False, + drop_last=drop_last) + + self.auto_collate_batch = self.batch_sampler is not None + self.sampler_iter = iter(self.index_sampler) if self.auto_collate_batch: self.collate_fn = collate_fn or default_collate_fn else: self.collate_fn = collate_fn or default_convert_fn + self.dataset_fetcher = _DatasetKind.create_fetcher( self.dataset_kind, self.dataset, self.auto_collate_batch, self.collate_fn, self.drop_last) @@ -115,8 +110,10 @@ class NonIterableGeneratorLoader(DistributedDataLoader): def __next__(self): if not self._steps: self._cur_step += 1 + return None elif self._cur_step < self._steps: self._cur_step += 1 + return None else: self._inner_dataloader.reset() self.sampler_iter = iter(self.index_sampler) @@ -138,6 +135,16 @@ class NonIterableGeneratorLoader(DistributedDataLoader): ) return steps_per_epoch + @property + def index_sampler(self): + if self.auto_collate_batch: + return self.batch_sampler + else: + if self.dataset_kind == _DatasetKind.MAP: + return list(range(len(self.dataset))) + else: + return _InfiniteIterableSampler(self.dataset, 1) + def _create_inner_dataloader(self): def data_generator(): @@ -170,7 +177,83 @@ class NonIterableGeneratorLoader(DistributedDataLoader): yield partial_data dataloader = paddle.fluid.io.DataLoader.from_generator( - feed_list=self.feed_list, capacity=70, iterable=False) + feed_list=self.feed_list, + capacity=self.capacity, + use_double_buffer=self.use_double_buffer, + # iterable=self.iterable, + iterable=False, + return_list=self.return_list, + use_multiprocess=self.use_multiprocess, + drop_last=self.drop_last) dataloader.set_batch_generator(data_generator, self.places) return dataloader + + +class DistributedDataLoader(DistributedDataLoaderBase): + + def __init__(self, + dataset, + feed_list=None, + places=None, + return_list=True, + batch_size=1, + shuffle=False, + drop_last=False, + collate_fn=None, + num_workers=0, + use_buffer_reader=True, + use_shared_memory=True, + timeout=0, + worker_init_fn=None, + epochs=1, + steps_per_epoch=None, + split_data=True, + data_parallel_world_size=[], + data_parallel_rank=[]): + self.dataset = dataset + self.feed_list = feed_list + self.return_list = return_list + self.places = places + self.batch_size = batch_size + self.shuffle = shuffle + self.drop_last = drop_last + self.collate_fn = collate_fn + self.num_workers = num_workers + self.use_buffer_reader = use_buffer_reader + self.use_shared_memory = use_shared_memory + self.timeout = timeout + self.worker_init_fn = worker_init_fn + self.epochs = epochs + self.steps_per_epoch = steps_per_epoch + self.dp_world_sizes = data_parallel_world_size + self.dp_ranks = data_parallel_rank + self.split_data = split_data + # TODO: rank info + self.batch_sampler = DistributedBatchSampler( + self.dataset, self.batch_size, self.dp_world_sizes[0], + self.dp_ranks[0], self.shuffle, self.drop_last) + self._inner_dataloader = self._create_inner_dataloader() + + def __iter__(self): + return self + + def __next__(self): + return next(self.data) + + def _create_inner_dataloader(self): + dataloader = paddle.fluid.io.DataLoader( + self.dataset, + feed_list=self.feed_list, + places=self.places, + return_list=self.return_list, + batch_sampler=self.batch_sampler, + collate_fn=self.collate_fn, + num_workers=self.num_workers, + use_buffer_reader=self.use_buffer_reader, + use_shared_memory=self.use_shared_memory, + timeout=self.timeout, + worker_init_fn=self.worker_init_fn) + self.data = (x for x in dataloader) + + return dataloader diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index 6bc5743adb2..76e9863bfe2 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -40,9 +40,8 @@ from .planner_v2 import Planner from .parallelizer_v2 import Parallelizer from .dist_op import DistributedOperator from .dist_saver import DistributedSaver -from .dist_loader import NonIterableGeneratorLoader -from .utils import to_list -from .utils import get_logger, get_dist_attr +from .dist_loader import DistributedDataLoaderFromGenerator, DistributedDataLoader +from .utils import to_list, get_logger, get_dist_attr from .process_group import new_process_group, get_all_process_groups from .dist_context import DistributedContext, get_default_distributed_context from .strategy import Strategy @@ -127,11 +126,11 @@ class Engine: ) self._model = model - if loss and not isinstance(loss, - paddle.nn.Layer) and not callable(loss): - raise TypeError( - "'loss' must be sub classes of `paddle.nn.Layer` or any callable function." - ) + # if loss and not isinstance(loss, + # paddle.nn.Layer) and not callable(loss): + # raise TypeError( + # "'loss' must be sub classes of `paddle.nn.Layer` or any callable function." + # ) self._loss = loss if optimizer and not isinstance( @@ -184,39 +183,184 @@ class Engine: self._feed_vars = {} self._fetch_vars = {} self._planners = {} - self._mode_init_states = { + self._has_prepared = {"train": False, "eval": False, "predict": False} + self._has_prepared_reader = { "train": False, "eval": False, "predict": False } + self._inputs_spec = [] + self._labels_spec = [] + self._inputs = [] + self._labels = [] + self._skip_build = False + self._outside_dataloader = False self._planned_mode = None self._dygraph_mode = False self._tuning = self._strategy.tuning - def _prepare_program(self, mode): - # Do the build process - self._build(mode) - # Do the planning process - self._plan(mode) - # Do the parallel process - self._parallel(mode) - # Init comm and startup program - self._initialize(mode) - self._mode_init_states[mode] = True + def _prepare_data_spec(self, data, split, batch_size): + inputs_spec = [] + labels_spec = [] + if isinstance(data, paddle.io.IterableDataset): + if split is None: + inputs, labels = next(iter(data)) + else: + sample = next(iter(data)) + inputs = sample[:split] + labels = sample[split:] + elif isinstance(data, paddle.io.Dataset): + if split is None: + inputs, labels = data[0] + else: + sample = data[0] + inputs = sample[:split] + labels = sample[split:] + else: + raise ValueError( + "Data should be a Dataset or IterableDatset, but received {}.". + format(type(data).__name__)) + inputs = to_list(inputs) + labels = to_list(labels) + + num_shards = self._strategy.dataset.num_shards - def _prepare_feed(self, user_feeds=None, mode="train"): + def _adjust_item_spec(num_shards, spec): + if num_shards > 1 and len(spec.shape) > 1: + spec.shape[0] = spec.shape[0] * num_shards + + def _infer_item_spec(item, name, batch_size, specs): + if isinstance(item, np.ndarray): + spec = InputSpec.from_numpy(item, name) + if batch_size is None: + _adjust_item_spec(num_shards, spec) + specs.append(spec) + else: + specs.append(spec.batch(batch_size)) + elif isinstance(item, (Variable, core.VarBase, core.eager.Tensor)): + _adjust_item_spec(num_shards, spec) + spec = InputSpec.from_tensor(item, name) + if batch_size is None: + specs.append(spec) + else: + specs.append(spec.batch(batch_size)) + else: + specs.append(InputSpec([batch_size], type(item), name)) + + if inputs is not None: + for i, item in enumerate(inputs): + assert item is not None, "Receive None input." + name = "input" + str(i) + _infer_item_spec(item, name, batch_size, inputs_spec) + if labels is not None: + for i, item in enumerate(labels): + assert item is not None, "Receive None input." + name = "label" + str(i) + _infer_item_spec(item, name, batch_size, labels_spec) + + inputs_spec = self._validate_spec(inputs_spec) + labels_spec = self._validate_spec(labels_spec) + return inputs_spec, labels_spec + + def _prepare_data_tensor(self, + inputs_spec, + labels_spec, + inputs=None, + labels=None): + if _non_static_mode() or self._dygraph_mode: + return None, None + inputs_spec = inputs_spec if inputs_spec else [] + labels_spec = labels_spec if labels_spec else [] + if inputs_spec: + assert isinstance(inputs_spec, list), \ + "inputs should be list, but received {}".format(type(inputs_spec)) + if inputs is None: + inputs = [s._create_feed_layer() for s in inputs_spec] + else: + assert isinstance(inputs, list), \ + "inputs should be list, but received {}".format(type(inputs)) + for input_spec, input in zip(inputs_spec, inputs): + if input_spec.shape != input.shape: + input.desc.set_shape(input_spec.shape) + if labels_spec: + assert isinstance(labels_spec, list), \ + "labels should be list, but received {}".format(type(labels_spec)) + if labels is None: + labels = [s._create_feed_layer() for s in labels_spec] + else: + assert isinstance(labels, list), \ + "labels should be list, but received {}".format(type(labels)) + for label_spec, label in zip(labels_spec, labels): + if label_spec.shape != label.shape: + label.desc.set_shape(label_spec.shape) + return inputs, labels + + def _prepare_reader(self): + dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] + dist_context = self._dist_contexts[self._mode] + dist_main_block = dist_main_prog.global_block() + + # NOTE: this list may be changed if Paddle changes the existing rules. + related_reader_ops = [ + "create_py_reader", "create_double_buffer_reader", "read" + ] + # remove the first three ops if multiple run fit/evaluate/predict + if dist_main_block.ops[0].type == 'create_py_reader': + for i in range(len(related_reader_ops)): + if dist_main_block.ops[0].type in related_reader_ops: + dist_main_block._remove_op(0, sync=False) + dist_main_block._sync_with_cpp() + # Step 1: find the reader ops + reader_op_indices = [] + for idx, op in enumerate(dist_main_block.ops): + if op.type in related_reader_ops: + reader_op_indices.append(idx) + # Step 2: insert the new reader ops to cpp + new_reader_ops = [] + for idx in reversed(reader_op_indices): + new_op_desc = dist_main_block.desc._prepend_op() + new_op_desc.copy_from(dist_main_block.ops[idx].desc) + new_op = Operator(dist_main_block, + new_op_desc, + type=new_op_desc.type()) + new_reader_ops.append(new_op) + dist_op = DistributedOperator(new_op) + dist_context.add_dist_op_for_program(dist_op) + # Step 3: insert the new reader ops to python + for new_op in new_reader_ops: + dist_main_block.ops.insert(0, new_op) + for i in range(len(reader_op_indices)): + reader_op_indices[i] += len(reader_op_indices) + # Step 4: remove the old reader ops from python and cpp + for idx in reversed(reader_op_indices): + op = dist_main_block.ops.pop(idx) + dist_main_block.desc._remove_op(idx, idx + 1) + dist_main_block._sync_with_cpp() + self._has_prepared_reader[self._mode] = True + + def _prepare_feed(self, data, user_feeds, mode): + feeds = {} + if data is not None: + if isinstance(data, (list, tuple)): + if len(data) == 1 and isinstance(data[0], dict): + for name, data in data[0].items(): + feeds[name] = data + else: + raise ValueError("Unsupported data {}".format(data)) + elif isinstance(data, dict): + for name, data in data.items(): + feeds[name] = data + else: + raise ValueError("Unsupported data {}".format(data)) if user_feeds is not None: assert isinstance(user_feeds, dict), \ "user_feeds must be a dict, but receive {}".format(type(user_feeds).__name__) - feeds = {} - # TODO: add inputs and labels feed dict - if user_feeds is not None: - for name, var in user_feeds.items(): - feeds[name] = var + for name, data in user_feeds.items(): + feeds[name] = data return feeds - def _prepare_fetch(self, user_fetches=None, mode="train"): + def _prepare_fetch(self, user_fetches, mode): if user_fetches is not None: assert isinstance(user_fetches, list), \ "user_fetches must be a list, but receive {}".format(type(user_fetches).__name__) @@ -232,6 +376,8 @@ class Engine: if var_name not in fetch_names: fetch_names.append(var_name) group_indices.append(fetch_names.index(var_name)) + if not group_indices: + fetch_names.append([]) fetch_indices.append(group_indices) if mode != "predict": @@ -251,13 +397,13 @@ class Engine: def _prepare_logger(self, outs, - mode="train", epoch=None, step=None, lr=None, fetch_names=None, fetch_indices=None, - profiler_log=""): + profiler_log="", + mode=None): logs = "[{}] ".format(mode) if epoch is not None: logs += "epoch: {:d} ".format(epoch) @@ -274,17 +420,19 @@ class Engine: group_idx += 1 # logging metrics if mode != "predict": - for metric in self._metrics: - metrics_indices = fetch_indices[group_idx] - metric_out = [] - for idx in metrics_indices: - metric_out.append(outs[idx]) - if metric_out: - metric.update(*metric_out) - results = metric.accumulate() - for i, res in enumerate(to_list(results)): - logs += "{}: {:8f} ".format(metric.name()[i], res) - group_idx += 1 + metric_vars = self._fetch_vars[mode]["metrics"] + if metric_vars: + for metric in self._metrics: + metrics_indices = fetch_indices[group_idx] + metric_out = [] + for idx in metrics_indices: + metric_out.append(outs[idx]) + if metric_out: + metric.update(*metric_out) + results = metric.accumulate() + for i, res in enumerate(to_list(results)): + logs += "{}: {:8f} ".format(metric.name()[i], res) + group_idx += 1 # Skip logging outputs if mode == "predict": group_idx += 1 @@ -295,9 +443,10 @@ class Engine: idx = fetch_names.index(var.name) # Use the user defined name for logging logs += "{}: {} ".format(name, outs[idx]) + logs += profiler_log self._logger.info(logs) - def _prepare_history(self, outs, mode="train", fetch_indices=None): + def _prepare_history(self, outs, fetch_indices=None, mode=None): history = {} group_idx = 0 # store loss @@ -310,16 +459,18 @@ class Engine: group_idx += 1 # store metrics if mode != "predict": - for metric in self._metrics: - metrics_indices = fetch_indices[group_idx] - metric_out = [] - for idx in metrics_indices: - metric_out.append(outs[idx]) - if metric_out: - metric.update(*metric_out) - results = metric.accumulate() - history[tuple(metric.name())] = to_list(results) - group_idx += 1 + metric_vars = self._fetch_vars[mode]["metrics"] + if metric_vars: + for metric in self._metrics: + metrics_indices = fetch_indices[group_idx] + metric_out = [] + for idx in metrics_indices: + metric_out.append(outs[idx]) + if metric_out: + metric.update(*metric_out) + results = metric.accumulate() + history[tuple(metric.name())] = to_list(results) + group_idx += 1 # store outputs if mode == "predict": outputs_indices = fetch_indices[group_idx] @@ -336,14 +487,25 @@ class Engine: history["fetches"] = fetches_values return history + def _prepare_program(self, mode): + # Do the build process + self._build(mode) + # Do the planning process + self._plan(mode) + # Do the parallel process + self._parallel(mode) + # Init comm and startup program + self._initialize(mode) + self._has_prepared[mode] = True + def _build(self, mode): if _non_static_mode() or self._dygraph_mode: paddle.disable_static() self._dygraph_mode = True self._logger.info("Building model with 'to_static' method.") - inputs_spec = self.inputs_spec - labels_spec = self.labels_spec if self.labels_spec else [] + inputs_spec = self._inputs_spec + labels_spec = self._labels_spec if self._labels_spec else [] self.program_helper = ProgramHelper(self._model, self._loss, self._metrics, inputs_spec, labels_spec) @@ -360,6 +522,9 @@ class Engine: losses = self.program_helper.loss_vars metrics = self.program_helper.metric_vars + self._inputs = inputs + self._labels = labels + paddle.enable_static() else: # build program in static mode @@ -367,24 +532,26 @@ class Engine: if serial_main_prog is not None: return + outputs = [] losses = [] metrics = [] + inputs = self._inputs + labels = self._labels serial_main_prog = self._orig_main_prog.clone() serial_startup_prog = self._orig_startup_prog.clone() - with static.program_guard(serial_main_prog, serial_startup_prog), \ - utils.unique_name.guard(): - inputs_spec = self.inputs_spec - labels_spec = self.labels_spec if self.labels_spec else [] - inputs = [s._create_feed_layer() for s in inputs_spec] - labels = [s._create_feed_layer() for s in labels_spec] - outputs = to_list(self._model(*inputs)) - if mode != "predict" and self._loss: - losses = to_list(self._loss(*(outputs + labels))) - - if mode != "predict": - for metric in self._metrics: - metrics.append( - to_list(metric.compute(*(outputs + labels)))) + if not self._skip_build: + with static.program_guard(serial_main_prog, serial_startup_prog), \ + utils.unique_name.guard(): + outputs = to_list(self._model(*inputs)) + if mode != "predict" and self._loss: + losses = to_list(self._loss(*(outputs + labels))) + + if mode != "predict" and (outputs or labels): + for metric in self._metrics: + metrics.append( + to_list(metric.compute(*(outputs + labels)))) + else: + losses = to_list(self._loss) default_ctx = get_default_distributed_context() if not default_ctx.has_annotation: @@ -427,8 +594,8 @@ class Engine: self._optimization_tuner = OptimizationTuner(self._tuning.to_dict(), self._dist_contexts[mode], dataset, - self.inputs_spec, - self.labels_spec, + self._inputs_spec, + self._labels_spec, batch_size=batch_size, rank=self._cur_rank) @@ -452,6 +619,7 @@ class Engine: inputs_var = self._dist_contexts[mode].serial_feed_vars["inputs"] labels_var = self._dist_contexts[mode].serial_feed_vars["labels"] block = self._dist_contexts[mode].serial_main_program.global_block() + # TODO: check this feed_list feed_list = [] for var in inputs_var + labels_var: if var.name in block.vars: @@ -555,85 +723,6 @@ class Engine: dist_startup_prog = self._dist_startup_progs[mode][self._cur_rank] self._executor.run(dist_startup_prog) - def _split_sample_item(self, data, split): - if isinstance(data, paddle.io.IterableDataset): - if split is None: - inputs, labels = next(iter(data)) - else: - sample = next(iter(data)) - inputs = sample[:split] - labels = sample[split:] - elif isinstance(data, paddle.io.Dataset): - if split is None: - inputs, labels = data[0] - else: - sample = data[0] - inputs = sample[:split] - labels = sample[split:] - else: - raise ValueError( - "Data should be a Dataset or IterableDatset, but received {}.". - format(type(data).__name__)) - inputs = to_list(inputs) - labels = to_list(labels) - return inputs, labels - - def _infer_sample_spec(self, inputs, labels, batch_size): - self.inputs_spec = [] - self.labels_spec = [] - - def _infer_item_spec(item, name, batch_size, specs): - if isinstance(item, np.ndarray): - spec = InputSpec.from_numpy(item, name) - if batch_size is None: - specs.append(spec) - else: - specs.append(spec.batch(batch_size)) - elif isinstance(item, (Variable, core.VarBase, core.eager.Tensor)): - spec = InputSpec.from_tensor(item, name) - if batch_size is None: - specs.append(spec) - else: - specs.append(spec.batch(batch_size)) - else: - specs.append(InputSpec([batch_size], type(item), name)) - - if inputs is not None: - for i, item in enumerate(inputs): - assert item is not None, "Receive None input." - name = "input" + str(i) - _infer_item_spec(item, name, batch_size, self.inputs_spec) - if labels is not None: - for i, item in enumerate(labels): - assert item is not None, "Receive None input." - name = "label" + str(i) - _infer_item_spec(item, name, batch_size, self.labels_spec) - - self.inputs_spec = self._validate_spec(self.inputs_spec) - self.labels_spec = self._validate_spec(self.labels_spec) - - def __call__(self, - inputs=None, - labels=None, - feeds=None, - fetches=None, - mode="train"): - feed_dict = self._prepare_feed(feeds, mode) - fetch_names, fetch_indices = self._prepare_fetch(fetches, mode) - try: - outs = self._executor.run( - self.main_program, - feed=feed_dict, - fetch_list=fetch_names, - use_program_cache=self._strategy.use_cache, - return_numpy=self._strategy.return_numpy) - except core.EOFException: - pass - self._prepare_logger(outs, self.mode, None, None, None, fetch_names, - fetch_indices) - history = self._prepare_history(outs, self.mode, fetch_indices) - return history - def fit(self, train_data, train_sample_split=None, @@ -712,21 +801,28 @@ class Engine: epochs=2, batch_size=64) """ - self.mode = 'train' - inputs, labels = self._split_sample_item(train_data, train_sample_split) - self._infer_sample_spec(inputs, labels, batch_size) - if not self._mode_init_states[self.mode]: - self._prepare_program(self.mode) + self._mode = 'train' + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + train_data, train_sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) else: - self._switch_mode("train") - - assert self.mode in self._dist_main_progs, \ - "train model is not ready, please call `engine._prepare_program('train')` first." - train_dataloader = self._prepare_dataloader(train_data, batch_size, - epochs, steps_per_epoch, - collate_fn) - - fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode) + self._switch_mode(self._mode) + train_dataloader = self._prepare_dataloader_from_generator( + dataset=train_data, + capacity=70, + # use_double_buffer=use_double_buffer, + iterable=False, + # return_list=return_list, + # use_multiprocess=use_multiprocess, + # drop_last=drop_last, + batch_size=batch_size, + epochs=epochs, + steps_per_epoch=steps_per_epoch, + collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) lr_scheduler = self._get_lr_scheduler(self.main_program) with profiler.Profiler(timer_only=True) as prof: @@ -746,18 +842,18 @@ class Engine: prof.step() - self._prepare_logger(outs, self.mode, epoch, step, lr, + self._prepare_logger(outs, epoch, step, lr, fetch_names, fetch_indices, - prof.step_info()) - history = self._prepare_history(outs, self.mode, - fetch_indices) - - if valid_data and epoch % valid_freq == 0: - self.evaluate(valid_data, valid_sample_split, batch_size, - valid_steps, collate_fn, callbacks) - self._switch_mode("train") - else: - self._reset_metrics() + prof.step_info(), self._mode) + history = self._prepare_history(outs, fetch_indices, + self._mode) + + # if valid_data and epoch % valid_freq == 0: + # self.evaluate(valid_data, valid_sample_split, batch_size, + # valid_steps, collate_fn, callbacks) + # self._switch_mode("train") + # else: + # self._reset_metrics() return history def evaluate(self, @@ -813,22 +909,32 @@ class Engine: engine.evaluate(valid_dataset, batch_size=64) """ - self.mode = 'eval' - inputs, labels = self._split_sample_item(valid_data, valid_sample_split) - self._infer_sample_spec(inputs, labels, batch_size) - if not self._mode_init_states[self.mode]: - self._prepare_program(self.mode) + self._mode = 'eval' + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + valid_data, valid_sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) else: - self._switch_mode("eval") - - assert self.mode in self._dist_main_progs, \ + self._switch_mode(self._mode) + assert self._mode in self._dist_main_progs, \ "eval model is not ready, please call `engine._prepare_program('eval')` first." - valid_dataloader = self._prepare_dataloader(valid_data, - batch_size, - steps_per_epoch=steps, - collate_fn=collate_fn) - - fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode) + valid_dataloader = self._prepare_dataloader_from_generator( + dataset=valid_data, + # feed_list=feed_list, + capacity=70, + # use_double_buffer=use_double_buffer, + iterable=False, + # return_list=return_list, + # use_multiprocess=use_multiprocess, + # drop_last=drop_last, + # places=places, + batch_size=batch_size, + # epochs=epochs, + steps_per_epoch=steps, + collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) for step, _ in enumerate(valid_dataloader): try: @@ -839,9 +945,9 @@ class Engine: return_numpy=self._strategy.return_numpy) except core.EOFException: break - self._prepare_logger(outs, self.mode, None, step, None, fetch_names, - fetch_indices) - history = self._prepare_history(outs, self.mode, fetch_indices) + self._prepare_logger(outs, None, step, None, fetch_names, + fetch_indices, "", self._mode) + history = self._prepare_history(outs, fetch_indices, self._mode) self._reset_metrics() return history @@ -895,22 +1001,32 @@ class Engine: engine = auto.Engine(model) engine.predict(valid_dataset, batch_size=64) """ - self.mode = 'predict' - inputs, labels = self._split_sample_item(test_data, test_sample_split) - self._infer_sample_spec(inputs, labels, batch_size) - if not self._mode_init_states[self.mode]: - self._prepare_program(self.mode) + self._mode = 'predict' + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + test_data, test_sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) else: - self._switch_mode("predict") - - assert self.mode in self._dist_main_progs, \ + self._switch_mode(self._mode) + assert self._mode in self._dist_main_progs, \ "predict model is not ready, please call `engine._prepare_program('predict')` first." - test_dataloader = self._prepare_dataloader(test_data, - batch_size, - steps_per_epoch=steps, - collate_fn=collate_fn) - - fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode) + test_dataloader = self._prepare_dataloader_from_generator( + dataset=test_data, + # feed_list=feed_list, + capacity=70, + # use_double_buffer=use_double_buffer, + iterable=False, + # return_list=return_list, + # use_multiprocess=use_multiprocess, + # drop_last=drop_last, + # places=places, + batch_size=batch_size, + # epochs=epochs, + steps_per_epoch=steps, + collate_fn=collate_fn) + fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) for step, _ in enumerate(test_dataloader): try: @@ -921,62 +1037,200 @@ class Engine: return_numpy=self._strategy.return_numpy) except core.EOFException: break - self._prepare_logger(outs, self.mode, None, step, None, fetch_names, - fetch_indices) - history = self._prepare_history(outs, self.mode, fetch_indices) + self._prepare_logger(outs, None, step, None, fetch_names, + fetch_indices, "", self._mode) + history = self._prepare_history(outs, fetch_indices, self._mode) return history - def _tune(self, tune_data, tune_sample_split=None, batch_size=1): - self.mode = 'train' - inputs, labels = self._split_sample_item(tune_data, tune_sample_split) - self._infer_sample_spec(inputs, labels, batch_size) - self._optimization_tuning(self.mode, tune_data, batch_size) - - def dataloader(self, - dataset, - sample_split=1, - batch_size=1, - epochs=1, - steps_per_epoch=None, - collate_fn=None, - mode="train", - from_generator=True): - assert from_generator, "Only support from_generator for now" - self.mode = mode - inputs, labels = self._split_sample_item(dataset, sample_split) - self._infer_sample_spec(inputs, labels, batch_size) - if not self._mode_init_states[self.mode]: - self._prepare_program(self.mode) + def dataloader( + self, + dataset, + # return_list=True, + batch_size=1, + shuffle=False, + drop_last=False, + collate_fn=None, + num_workers=0, + use_buffer_reader=True, + use_shared_memory=True, + timeout=0, + worker_init_fn=None, + epochs=1, + steps_per_epoch=None, + sample_split=1, + mode=None): + if mode is not None: + self.to_mode(mode) + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + dataset, sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) else: - self._switch_mode("train") - dataloader = self._prepare_dataloader(dataset, batch_size, epochs, - steps_per_epoch, collate_fn) + self._switch_mode(self._mode) + dataloader = self._prepare_dataloader( + dataset, + return_list=False, + batch_size=batch_size, + shuffle=shuffle, + drop_last=drop_last, + collate_fn=collate_fn, + num_workers=num_workers, + use_buffer_reader=use_buffer_reader, + use_shared_memory=use_shared_memory, + timeout=timeout, + worker_init_fn=worker_init_fn, + epochs=epochs, + steps_per_epoch=steps_per_epoch) return dataloader + def dataloader_from_generator( + self, + dataset, + capacity=70, + use_double_buffer=True, + iterable=True, + # return_list=False, + use_multiprocess=False, + drop_last=True, + batch_size=1, + epochs=1, + steps_per_epoch=None, + collate_fn=None, + sample_split=1, + mode=None): + if mode is not None: + self.to_mode(mode) + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + dataset, sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) + else: + self._switch_mode(self._mode) + dataloader = self._prepare_dataloader_from_generator( + dataset=dataset, + # feed_list=feed_list, + capacity=capacity, + use_double_buffer=use_double_buffer, + iterable=iterable, + return_list=False, + use_multiprocess=use_multiprocess, + drop_last=drop_last, + # places=places, + batch_size=batch_size, + epochs=epochs, + steps_per_epoch=steps_per_epoch, + collate_fn=collate_fn) + return dataloader + + def prepare(self, + inputs_spec=None, + labels_spec=None, + inputs=None, + labels=None, + main_program=None, + startup_program=None, + mode=None): + if mode is not None: + self.to_mode(mode) + if inputs or labels: + self._skip_build = True + self._inputs, self._labels = self._prepare_data_tensor( + inputs_spec, labels_spec, inputs, labels) + self._orig_main_prog = main_program + if self._orig_main_prog is None: + self._orig_main_prog = static.default_main_program() + self._orig_startup_prog = startup_program + if self._orig_startup_prog is None: + self._orig_startup_prog = static.default_startup_program() + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) + else: + self._switch_mode(self._mode) + elif inputs_spec or labels_spec: + self._outside_dataloader = True + self._inputs, self._labels = self._prepare_data_tensor( + inputs_spec, labels_spec) + self._orig_main_prog = main_program + if self._orig_main_prog is None: + self._orig_main_prog = static.default_main_program() + self._orig_startup_prog = startup_program + if self._orig_startup_prog is None: + self._orig_startup_prog = static.default_startup_program() + if not self._has_prepared[self._mode]: + self._prepare_program(self._mode) + else: + self._switch_mode(self._mode) + else: + assert self._inputs_spec and self._labels_spec, \ + "Please call the dataloader(...) before calling prepare(...)" + + def run( + self, + data=None, + # program=None, + feed=None, + fetch_list=None, + # feed_var_name='feed', + # fetch_var_name='fetch', + # scope=None, + # return_numpy=True, + # use_program_cache=False, + # return_merged=True, + # use_prune=False, + mode=None): + if mode is not None: + self.to_mode(mode) + feed_dict = self._prepare_feed(data, feed, self._mode) + fetch_names, fetch_indices = self._prepare_fetch(fetch_list, self._mode) + if self._outside_dataloader and not self._has_prepared_reader[ + self._mode]: + self._prepare_reader() + outs = self._executor.run(self.main_program, + feed=feed_dict, + fetch_list=fetch_names, + use_program_cache=self._strategy.use_cache, + return_numpy=self._strategy.return_numpy) + self._prepare_logger(outs, None, None, None, fetch_names, fetch_indices, + "", self._mode) + history = self._prepare_history(outs, fetch_indices, self._mode) + return history + def _prepare_dataloader(self, dataset, - batch_size, + return_list=True, + batch_size=1, + shuffle=False, + drop_last=False, + collate_fn=None, + num_workers=0, + use_buffer_reader=True, + use_shared_memory=True, + timeout=0, + worker_init_fn=None, epochs=1, - steps_per_epoch=None, - collate_fn=None): + steps_per_epoch=None): if self._strategy.gradient_merge and batch_size is not None: assert batch_size % self._k_steps == 0, \ "Requires batch_size:[{}] to be divisible by k_steps:[{}].".format(batch_size, self._k_steps) batch_size //= self._k_steps - dist_main_prog = self._dist_main_progs[self.mode][self._cur_rank] - dist_startup_prog = self._dist_startup_progs[self.mode][self._cur_rank] - dist_context = self._dist_contexts[self.mode] + dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] + dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank] + dist_context = self._dist_contexts[self._mode] dist_main_block = dist_main_prog.global_block() # NOTE: Get feed_list, then insert dataloader op with sharded var shape. # Cause predict_program does not contain labels var, # then we will add labels var from serial_program to dist_program, # that maintains the length of feed_list equal to the length of dataset's values. - inputs_var = self._feed_vars[self.mode]["inputs"] - labels_var = self._feed_vars[self.mode]["labels"] + inputs_var = self._feed_vars[self._mode]["inputs"] + labels_var = self._feed_vars[self._mode]["labels"] feed_list = [] for var in inputs_var + labels_var: if var.name in dist_main_block.vars: @@ -986,45 +1240,121 @@ class Engine: copy_var.desc.set_original_id(var.desc.original_id()) feed_list.append(copy_var) - # remove the first three ops if multi run fit/evaluate/predict - op_size = len(dist_main_block.ops) - if dist_main_block.ops[0].type == 'create_py_reader': - op_size -= 3 - for _ in range(3): - dist_main_block._remove_op(0, sync=False) - # insert read op at the end of program places = paddle.static.cuda_places() with static.program_guard(dist_main_prog, dist_startup_prog): - dataloader = NonIterableGeneratorLoader( + dataloader = DistributedDataLoader( dataset, - feed_list, - places, - batch_size, - epochs, - steps_per_epoch, - collate_fn, + feed_list=feed_list, + places=places, + return_list=return_list, + batch_size=batch_size, + shuffle=shuffle, + drop_last=drop_last, + collate_fn=collate_fn, + num_workers=num_workers, + use_buffer_reader=use_buffer_reader, + use_shared_memory=use_shared_memory, + timeout=timeout, + worker_init_fn=worker_init_fn, + epochs=epochs, + steps_per_epoch=steps_per_epoch, + split_data=self._strategy.split_data, data_parallel_world_size=self._dp_world_sizes, - data_parallel_rank=self._dp_ranks, - split_data=self._strategy.split_data) + data_parallel_rank=self._dp_ranks) - # move read op from the end of program to the start of program - new_op_size = len(dist_main_block.ops) - for _ in range(new_op_size - 1, op_size - 1, -1): - op = dist_main_block.ops[new_op_size - 1] - new_op_desc = dist_main_block.desc._prepend_op() - new_op_desc.copy_from(op.desc) - new_op = Operator(dist_main_block, - new_op_desc, - type=new_op_desc.type()) - dist_main_block.ops.insert(0, new_op) - dist_op = DistributedOperator(new_op) - dist_context.add_dist_op_for_program(dist_op) - for _ in range(new_op_size - op_size): - dist_main_block._remove_op(new_op_size, sync=False) - dist_main_block._sync_with_cpp() return dataloader + def _prepare_dataloader_from_generator(self, + dataset, + capacity=None, + use_double_buffer=True, + iterable=True, + return_list=False, + use_multiprocess=False, + drop_last=True, + batch_size=1, + epochs=1, + steps_per_epoch=None, + collate_fn=None): + + if self._strategy.gradient_merge and batch_size is not None: + assert batch_size % self._k_steps == 0, \ + "Requires batch_size:[{}] to be divisible by k_steps:[{}].".format(batch_size, self._k_steps) + batch_size //= self._k_steps + + dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank] + dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank] + dist_context = self._dist_contexts[self._mode] + dist_main_block = dist_main_prog.global_block() + + # NOTE: Get feed_list, then insert dataloader op with sharded var shape. + # Cause predict_program does not contain labels var, + # then we will add labels var from serial_program to dist_program, + # that maintains the length of feed_list equal to the length of dataset's values. + inputs_var = self._feed_vars[self._mode]["inputs"] + labels_var = self._feed_vars[self._mode]["labels"] + feed_list = [] + for var in inputs_var + labels_var: + if var.name in dist_main_block.vars: + feed_list.append(dist_main_block.vars[var.name]) + else: + copy_var = dist_main_block._clone_variable(var, var.persistable) + copy_var.desc.set_original_id(var.desc.original_id()) + feed_list.append(copy_var) + + # # remove the first three ops if multi run fit/evaluate/predict + # self._op_size = len(dist_main_block.ops) + # if dist_main_block.ops[0].type == 'create_py_reader': + # op_size -= 3 + # for _ in range(3): + # dist_main_block._remove_op(0, sync=False) + + places = paddle.static.cuda_places() + with static.program_guard(dist_main_prog, dist_startup_prog): + dataloader = DistributedDataLoaderFromGenerator( + dataset=dataset, + feed_list=feed_list, + capacity=capacity, + use_double_buffer=use_double_buffer, + iterable=iterable, + return_list=return_list, + use_multiprocess=use_multiprocess, + drop_last=drop_last, + places=places, + batch_size=batch_size, + epochs=epochs, + steps_per_epoch=steps_per_epoch, + collate_fn=collate_fn, + split_data=self._strategy.split_data, + data_parallel_world_size=self._dp_world_sizes, + data_parallel_rank=self._dp_ranks) + self._prepare_reader() + # # move read op from the end of program to the start of program + # new_op_size = len(dist_main_block.ops) + # for _ in range(new_op_size - 1, op_size - 1, -1): + # op = dist_main_block.ops[new_op_size - 1] + # new_op_desc = dist_main_block.desc._prepend_op() + # new_op_desc.copy_from(op.desc) + # new_op = Operator(dist_main_block, + # new_op_desc, + # type=new_op_desc.type()) + # dist_main_block.ops.insert(0, new_op) + # dist_op = DistributedOperator(new_op) + # dist_context.add_dist_op_for_program(dist_op) + # for _ in range(new_op_size - op_size): + # dist_main_block._remove_op(new_op_size, sync=False) + # dist_main_block._sync_with_cpp() + return dataloader + + def _tune(self, tune_data, tune_sample_split=None, batch_size=1): + self._mode = 'train' + self._inputs_spec, self._labels_spec = self._prepare_data_spec( + tune_data, tune_sample_split, batch_size) + self._inputs, self._labels = self._prepare_data_tensor( + self._inputs_spec, self._labels_spec) + self._optimization_tuning(self._mode, tune_data, batch_size) + def _validate_spec(self, specs): specs = to_list(specs) self._k_steps = self._strategy.gradient_merge.k_steps @@ -1108,9 +1438,14 @@ class Engine: metric.reset() def _switch_mode(self, mode): - self.mode = mode + self.to_mode(mode) self._initialize(mode) + def to_mode(self, mode): + assert mode in ["train", "eval", "predict"], \ + "mode {} should be one of ['train', 'eval', 'predict']".format(mode) + self._mode = mode + def _set_state_dict(self, mode, strict, state_dict, dist_attr): program = self._dist_main_progs[mode][self._cur_rank] dist_context = self._dist_contexts[mode] @@ -1129,7 +1464,7 @@ class Engine: is 'dirname/file_prefix' or 'file_prefix'. if empty str. A exception will be raised. training (bool, optional): Whether to save for training. If not, save - for inference only. If `training` is set to True, the optimzer state + for inference only. If `training` is set to True, the optimizer state will be saved. Otherwise, only the model and parameters are saved. This function will silently overwrite existing file at the target location. Default: True. @@ -1259,42 +1594,34 @@ class Engine: " or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer)) ) - @property - def mode(self): - return self._mode - - @mode.setter - def mode(self, mode): - self._mode = mode - @property def main_program(self): - return self._dist_main_progs[self.mode][self._cur_rank] + return self._dist_main_progs[self._mode][self._cur_rank] @property def startup_program(self): - return self._dist_startup_progs[self.mode][self._cur_rank] + return self._dist_startup_progs[self._mode][self._cur_rank] @property def dist_context(self): - return self._dist_contexts[self.mode] + return self._dist_contexts[self._mode] @property def serial_main_program(self): - return self._serial_main_progs[self.mode] + return self._serial_main_progs[self._mode] @property def serial_startup_program(self): - return self._serial_startup_progs[self.mode] + return self._serial_startup_progs[self._mode] @property def fetch_vars(self): - return self._fetch_vars[self.mode] + return self._fetch_vars[self._mode] @property def inputs(self): - return self.inputs_spec + return self._inputs @property def labels(self): - return self.labels_spec + return self._labels diff --git a/python/paddle/distributed/auto_parallel/interface.py b/python/paddle/distributed/auto_parallel/interface.py index 88064cccbe6..f7f6d89a616 100644 --- a/python/paddle/distributed/auto_parallel/interface.py +++ b/python/paddle/distributed/auto_parallel/interface.py @@ -210,11 +210,11 @@ def get_collection(name): return _g_collections[name] -def add_to_collection(collection_name, value, value_name=None): +def add_to_collection(collection_name, value, name=None): if collection_name not in _g_collections: _g_collections[collection_name] = [] - if value_name is not None: - _g_collections[collection_name].append((value_name, value)) + if name is not None: + _g_collections[collection_name].append((name, value)) else: _g_collections[collection_name].append((None, value)) diff --git a/python/paddle/distributed/auto_parallel/partitioner.py b/python/paddle/distributed/auto_parallel/partitioner.py index 399a5a485b5..32917627672 100644 --- a/python/paddle/distributed/auto_parallel/partitioner.py +++ b/python/paddle/distributed/auto_parallel/partitioner.py @@ -23,7 +23,7 @@ from .dist_attribute import OperatorDistributedAttribute from .utils import is_backward_op, is_forward_op, is_loss_op, is_optimize_op from .operators.common import BACKWARD_ONLY_DIST_OPS -__varname_not_in_block__ = ["lod_tensor_blocking_queue_0"] +__varname_not_in_block__ = ["lod_tensor_blocking_queue"] __not_shape_var_type__ = [ core.VarDesc.VarType.READER, core.VarDesc.VarType.STEP_SCOPES ] @@ -238,7 +238,9 @@ class Partitioner(object): target_block, serial_input_varname, new_varname) else: - assert serial_input_varname in __varname_not_in_block__ + for varname_not_in_block in __varname_not_in_block__: + assert varname_not_in_block in serial_input_varname, \ + "{} is not found".format(serial_input_varname) self._serial2dist_varname_mapping[ serial_input_varname] = new_varname diff --git a/python/paddle/distributed/auto_parallel/reshard.py b/python/paddle/distributed/auto_parallel/reshard.py index cf09929ad48..d7f29494441 100644 --- a/python/paddle/distributed/auto_parallel/reshard.py +++ b/python/paddle/distributed/auto_parallel/reshard.py @@ -45,7 +45,8 @@ def get_var_with_recursion(var_name, block, program): parent_block = program.blocks[block.parent_idx] if var_name in parent_block.vars: var = parent_block.vars[var_name] - assert var is not None + assert var is not None, \ + "{} is not found".format(var.name) return var @@ -1838,8 +1839,8 @@ class Resharder: idx_offset = 0 for var_name in input_var_names: - # skip lod_tensor_blocking_queue_0 - if var_name == "lod_tensor_blocking_queue_0": + # skip lod_tensor_blocking_queue_? name + if "lod_tensor_blocking_queue" in var_name: continue var = get_var_with_recursion(var_name, block, self.auto_parallel_main_prog) diff --git a/python/paddle/distributed/auto_parallel/strategy.py b/python/paddle/distributed/auto_parallel/strategy.py index 813b826aaa0..a504bbfed29 100644 --- a/python/paddle/distributed/auto_parallel/strategy.py +++ b/python/paddle/distributed/auto_parallel/strategy.py @@ -114,6 +114,13 @@ class TuningConfig(BaseConfig): super(TuningConfig, self).__init__(category, config_dict) +class DatasetConfig(BaseConfig): + + def __init__(self, config_dict=None): + category = constants.DATASET + super(DatasetConfig, self).__init__(category, config_dict) + + class Strategy(BaseConfig): """ The `Strategy` object is used to configure the paralleization and optimization beheviors. @@ -178,3 +185,6 @@ class Strategy(BaseConfig): config_dict = self._config_dict.get(constants.TUNING, None) self.tuning = TuningConfig(config_dict) + + config_dict = self._config_dict.get(constants.DATASET, None) + self.dataset = DatasetConfig(config_dict) diff --git a/python/paddle/distributed/auto_parallel/tuner/profiler.py b/python/paddle/distributed/auto_parallel/tuner/profiler.py index 478501cfe3f..4b2655028bf 100644 --- a/python/paddle/distributed/auto_parallel/tuner/profiler.py +++ b/python/paddle/distributed/auto_parallel/tuner/profiler.py @@ -23,7 +23,7 @@ import paddle from paddle.fluid.framework import Program, _current_expected_place from paddle.fluid.framework import Operator from paddle.distributed.auto_parallel.process_group import get_all_process_groups, new_process_group -from paddle.distributed.auto_parallel.dist_loader import NonIterableGeneratorLoader +from paddle.distributed.auto_parallel.dist_loader import DistributedDataLoaderFromGenerator from paddle.distributed.collective import _get_global_env paddle.enable_static() @@ -132,13 +132,14 @@ def create_dataloader(main_program, # insert read op at the end of program places = paddle.static.cuda_places() with paddle.static.program_guard(main_program, startup_program): - dataloader = NonIterableGeneratorLoader( - dataset, - feed_list, - places, - dataset.batch_size, - epochs, - steps_per_epoch, + dataloader = DistributedDataLoaderFromGenerator( + dataset=dataset, + feed_list=feed_list, + capacity=70, + places=places, + batch_size=dataset.batch_size, + epochs=epochs, + steps_per_epoch=steps_per_epoch, data_parallel_world_size=dataset.dp_world_size, data_parallel_rank=dataset.dp_rank) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py index 2d477434d3a..ad8d477c81d 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -16,6 +16,8 @@ import tempfile import os import numpy as np import paddle +import paddle.static as static +import paddle.utils as utils import paddle.nn as nn import paddle.nn.functional as F from paddle.io import Dataset @@ -26,7 +28,8 @@ paddle.enable_static() global_process_mesh = auto.ProcessMesh(mesh=[0, 1]) PP_MESH_0 = auto.ProcessMesh([0]) PP_MESH_1 = auto.ProcessMesh([1]) -batch_size = 1 +epoch_num = 1 +batch_size = 2 batch_num = 10 hidden_size = 1024 sequence_len = 512 @@ -36,6 +39,8 @@ class_num = 10 paddle.seed(44) is_fetch = True +is_feed = True +my_feed_vars = [] class MyDataset(Dataset): @@ -53,6 +58,23 @@ class MyDataset(Dataset): return self.num_samples +def get_random_inputs_and_labels(image_shape, label_shape): + input = np.random.random(size=image_shape).astype('float32') + label = np.random.random(size=label_shape).astype('int64') + return input, label + + +def batch_generator_creator(): + + def __reader__(): + for _ in range(batch_num): + batch_input, batch_label = get_random_inputs_and_labels( + [batch_size, image_size], [batch_size, 1]) + yield batch_input, batch_label + + return __reader__ + + class MLPLayer(nn.Layer): def __init__(self, @@ -82,16 +104,20 @@ class MLPLayer(nn.Layer): def forward(self, input): out = auto.shard_op(self.norm, PP_MESH_0)(input) out = self.linear0(out) + if is_feed: + my_feed_vars.append((out, out.shape)) out = F.gelu(out, approximate=True) out = auto.shard_op(self.linear1, PP_MESH_1)(out) out = self.dropout(out) out = self.linear2(out) + if is_feed: + my_feed_vars.append((out, out.shape)) if is_fetch: auto.fetch(out, "my_out", logging=True) return out -def train(fetch): +def train_high_level(fetch): global is_fetch is_fetch = fetch mlp = MLPLayer(hidden_size=hidden_size, @@ -135,7 +161,7 @@ def train(fetch): temp_dir.cleanup() -def train_callable(): +def train_low_level(): mlp = MLPLayer(hidden_size=hidden_size, intermediate_size=4 * hidden_size, dropout_ratio=0.1, @@ -151,31 +177,38 @@ def train_callable(): strategy = auto.Strategy() strategy.auto_mode = "semi" - engine = auto.Engine(mlp, loss, optimizer, metric, strategy=strategy) + engine = auto.Engine(mlp, loss, optimizer, metrics=None, strategy=strategy) + feed_dict = {} + for feed_var, shape in my_feed_vars: + feed_dict[feed_var.name] = np.zeros(shape, dtype="float32") + + # Build normal normal dataloader # train train_dataset = MyDataset(batch_num * batch_size) train_dataloader = engine.dataloader(train_dataset, batch_size=batch_size, mode="train") - for _ in train_dataloader: - outs = engine(mode="train") + engine.prepare(mode="train") + for data in train_dataloader: + outs = engine.run(data, feed=feed_dict, mode="train") # eval eval_dataset2 = MyDataset(batch_size) eval_dataloader = engine.dataloader(eval_dataset2, batch_size=batch_size, mode="eval") - for _ in eval_dataloader: - outs = engine(mode="eval") + engine.prepare(mode="eval") + for data in eval_dataloader: + outs = engine.run(data, feed=feed_dict, mode="eval") # predict + engine.to_mode("predict") test_dataset = MyDataset(batch_size) - predict_dataloader = engine.dataloader(test_dataset, - batch_size=batch_size, - mode="predict") - for _ in predict_dataloader: - outs = engine(mode="predict") + predict_dataloader = engine.dataloader(test_dataset, batch_size=batch_size) + engine.prepare() + for data in predict_dataloader: + outs = engine.run(data, feed=feed_dict) # save temp_dir = tempfile.TemporaryDirectory() @@ -184,8 +217,144 @@ def train_callable(): engine.load(model_filename) temp_dir.cleanup() + # Build dataloader from generator + # train + train_dataset = MyDataset(batch_num * batch_size) + train_dataloader = engine.dataloader_from_generator(train_dataset, + batch_size=batch_size, + mode="train") + engine.prepare(mode="train") + for data in train_dataloader: + outs = engine.run(data, feed=feed_dict, mode="train") + + # eval + engine.to_mode("eval") + eval_dataset2 = MyDataset(batch_size) + eval_dataloader = engine.dataloader_from_generator(eval_dataset2, + batch_size=batch_size) + engine.prepare() + for data in eval_dataloader: + outs = engine.run(data, feed=feed_dict) + + # predict + test_dataset = MyDataset(batch_size) + predict_dataloader = engine.dataloader_from_generator(test_dataset, + batch_size=batch_size, + mode="predict") + engine.prepare(mode="predict") + for data in predict_dataloader: + outs = engine.run(data, feed=feed_dict, mode="predict") + + # save + temp_dir = tempfile.TemporaryDirectory() + model_filename = os.path.join(temp_dir.name, 'mlp') + engine.save(model_filename, training=True) + engine.load(model_filename) + temp_dir.cleanup() + + +def train_builtin_data_vars(): + mlp = MLPLayer(hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02) + loss = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.00001, + beta1=0.9, + beta2=0.999, + epsilon=1e-08, + grad_clip=None) + metric = paddle.metric.Accuracy() + + strategy = auto.Strategy() + strategy.auto_mode = "semi" + + engine = auto.Engine(mlp, loss, optimizer, metric, strategy=strategy) + + # train + engine.to_mode("train") + + input_spec = static.InputSpec([batch_size, image_size], 'float32', 'input') + label_spec = static.InputSpec([batch_size, 1], 'int64', 'label') + engine.prepare(inputs_spec=[input_spec], labels_spec=[label_spec]) + + with static.program_guard(engine.main_program, engine.startup_program): + feed_list = engine.inputs + engine.labels + print(feed_list) + loader = paddle.io.DataLoader.from_generator(feed_list=feed_list, + capacity=4 * batch_size, + iterable=False) + + places = static.cuda_places() + loader.set_batch_generator(batch_generator_creator(), places=places) + + for _ in range(epoch_num): + loader.start() # call DataLoader.start() before each epoch starts + try: + while True: + engine.run() + except paddle.fluid.core.EOFException: + loader.reset( + ) # call DataLoader.reset() after catching EOFException + + +def train_non_builtin_data_vars(): + main_program = static.Program() + startup_program = static.Program() + with static.program_guard(main_program, + startup_program), utils.unique_name.guard(): + input = static.data(name="input", + shape=[batch_size, image_size], + dtype='float32') + label = static.data(name="label", shape=[batch_size, 1], dtype='int64') + + loader = paddle.io.DataLoader.from_generator(feed_list=[input, label], + capacity=4 * batch_size, + iterable=False) + places = static.cuda_places() + loader.set_batch_generator(batch_generator_creator(), places=places) + + mlp = MLPLayer(hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02) + loss = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.00001, + beta1=0.9, + beta2=0.999, + epsilon=1e-08, + grad_clip=None) + metric = paddle.metric.Accuracy() + predict = mlp(input) + loss_var = loss(predict, label) + + strategy = auto.Strategy() + strategy.auto_mode = "semi" + + engine = auto.Engine(loss=loss_var, + optimizer=optimizer, + metrics=metric, + strategy=strategy) + + # train + engine.to_mode("train") + engine.prepare(inputs=[input], + labels=[label], + main_program=main_program, + startup_program=startup_program) + for _ in range(epoch_num): + loader.start() # call DataLoader.start() before each epoch starts + try: + while True: + engine.run() + except paddle.fluid.core.EOFException: + loader.reset( + ) # call DataLoader.reset() after catching EOFException + if __name__ == "__main__": - train(fetch=True) - train(fetch=False) - train_callable() + train_high_level(fetch=True) + train_high_level(fetch=False) + train_low_level() + train_builtin_data_vars() + train_non_builtin_data_vars() -- GitLab