未验证 提交 686fa07a 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto Parallel] Improve the fine-grained APIs (#46552)

* [Auto Parallel] Suppport different dataloaders

* [Auto Parallel] Add num_shards config for dataset

* [Auto Parallel] Unify the logger and outputs of Engine API

* [Auto Parallel] Fix the bugs of to_static

* [Auto Parallel] Adjust the test_to_static.py

* [Auto Parallel] Add the prepare API and replace __call__ with run

* [Auto Parallel] Improve the private implementations of Engine

* [Auto Parallel] Set capacity of dataloader for opt tuning

* [Auto Parallel] [WIP] Change the fine-grained API

* [Auto Parallel] Improve APIs to support different user cases

* [Auto Parallel] Add removed config

* [Auto Parallel] Add imports

* [Auto Parallel] Fix bugs for to_static

* [Auto Parallel] Remove unnecessary imports
上级 01baa0b6
...@@ -116,3 +116,10 @@ set_field_default_config(TUNING, "profile_start_step", 1) ...@@ -116,3 +116,10 @@ set_field_default_config(TUNING, "profile_start_step", 1)
set_field_default_config(TUNING, "profile_end_step", 1) set_field_default_config(TUNING, "profile_end_step", 1)
set_field_default_config(TUNING, "run_after_tuning", True) set_field_default_config(TUNING, "run_after_tuning", True)
set_field_default_config(TUNING, "verbose", True) set_field_default_config(TUNING, "verbose", True)
#########################################
# dataset configuration
#########################################
DATASET = "dataset"
set_field_default_config(DATASET, "enable", False)
set_field_default_config(DATASET, "num_shards", 1)
...@@ -17,38 +17,11 @@ import numpy as np ...@@ -17,38 +17,11 @@ import numpy as np
import paddle import paddle
from paddle.io import BatchSampler, IterableDataset from paddle.io import BatchSampler, IterableDataset
from paddle.fluid.dataloader.batch_sampler import _InfiniteIterableSampler from paddle.fluid.dataloader.batch_sampler import _InfiniteIterableSampler, DistributedBatchSampler
from paddle.fluid.dataloader.dataloader_iter import _DatasetKind, default_collate_fn, default_convert_fn from paddle.fluid.dataloader.dataloader_iter import _DatasetKind, default_collate_fn, default_convert_fn
class DistributedDataLoader(metaclass=abc.ABCMeta): class DistributedDataLoaderBase(metaclass=abc.ABCMeta):
def __init__(self, dataset, batch_size=1, epochs=1, drop_last=False):
if isinstance(dataset, IterableDataset):
self.dataset_kind = _DatasetKind.ITER
else:
self.dataset_kind = _DatasetKind.MAP
self.dataset = dataset
self.epochs = epochs
self.drop_last = drop_last
if batch_size is None:
self.batch_size = None
self.batch_sampler = None
else:
self.batch_size = batch_size
if isinstance(dataset, IterableDataset):
self.batch_sampler = _InfiniteIterableSampler(
dataset, batch_size)
else:
self.batch_sampler = BatchSampler(dataset,
batch_size=batch_size,
shuffle=False,
drop_last=drop_last)
self.auto_collate_batch = self.batch_sampler is not None
self.sampler_iter = iter(self.index_sampler)
@abc.abstractmethod @abc.abstractmethod
def __iter__(self): def __iter__(self):
...@@ -58,48 +31,70 @@ class DistributedDataLoader(metaclass=abc.ABCMeta): ...@@ -58,48 +31,70 @@ class DistributedDataLoader(metaclass=abc.ABCMeta):
def __next__(self): def __next__(self):
raise NotImplementedError raise NotImplementedError
@property
def index_sampler(self):
if self.auto_collate_batch:
return self.batch_sampler
else:
if self.dataset_kind == _DatasetKind.MAP:
return list(range(len(self.dataset)))
else:
return _InfiniteIterableSampler(self.dataset, 1)
class NonIterableGeneratorLoader(DistributedDataLoader): class DistributedDataLoaderFromGenerator(DistributedDataLoaderBase):
def __init__(self, def __init__(self,
dataset, dataset,
feed_list, feed_list=None,
places, capacity=None,
use_double_buffer=True,
iterable=True,
return_list=False,
use_multiprocess=False,
drop_last=True,
places=None,
batch_size=1, batch_size=1,
epochs=1, epochs=1,
steps_per_epoch=None, steps_per_epoch=None,
collate_fn=None, collate_fn=None,
split_data=True,
data_parallel_world_size=[], data_parallel_world_size=[],
data_parallel_rank=[], data_parallel_rank=[]):
drop_last=False, self.dataset = dataset
split_data=True):
self.feed_list = feed_list self.feed_list = feed_list
self.capacity = capacity
self.use_double_buffer = use_double_buffer
self.iterable = iterable
self.return_list = return_list
self.use_multiprocess = use_multiprocess
self.drop_last = drop_last
self.places = places self.places = places
self.batch_size = batch_size
self.epochs = epochs
self.steps_per_epoch = steps_per_epoch self.steps_per_epoch = steps_per_epoch
self.collate_fn = collate_fn
self.split_data = split_data
assert len(data_parallel_world_size) == len(feed_list) assert len(data_parallel_world_size) == len(feed_list)
assert len(data_parallel_rank) == len(feed_list) assert len(data_parallel_rank) == len(feed_list)
self.dp_world_sizes = data_parallel_world_size self.dp_world_sizes = data_parallel_world_size
self.dp_ranks = data_parallel_rank self.dp_ranks = data_parallel_rank
self.split_data = split_data
super(NonIterableGeneratorLoader, if isinstance(dataset, IterableDataset):
self).__init__(dataset, batch_size, epochs, drop_last) self.dataset_kind = _DatasetKind.ITER
else:
self.dataset_kind = _DatasetKind.MAP
if self.batch_size is None:
self.batch_sampler = None
else:
if isinstance(dataset, IterableDataset):
self.batch_sampler = _InfiniteIterableSampler(
dataset, batch_size)
else:
self.batch_sampler = BatchSampler(dataset,
batch_size=batch_size,
shuffle=False,
drop_last=drop_last)
self.auto_collate_batch = self.batch_sampler is not None
self.sampler_iter = iter(self.index_sampler)
if self.auto_collate_batch: if self.auto_collate_batch:
self.collate_fn = collate_fn or default_collate_fn self.collate_fn = collate_fn or default_collate_fn
else: else:
self.collate_fn = collate_fn or default_convert_fn self.collate_fn = collate_fn or default_convert_fn
self.dataset_fetcher = _DatasetKind.create_fetcher( self.dataset_fetcher = _DatasetKind.create_fetcher(
self.dataset_kind, self.dataset, self.auto_collate_batch, self.dataset_kind, self.dataset, self.auto_collate_batch,
self.collate_fn, self.drop_last) self.collate_fn, self.drop_last)
...@@ -115,8 +110,10 @@ class NonIterableGeneratorLoader(DistributedDataLoader): ...@@ -115,8 +110,10 @@ class NonIterableGeneratorLoader(DistributedDataLoader):
def __next__(self): def __next__(self):
if not self._steps: if not self._steps:
self._cur_step += 1 self._cur_step += 1
return None
elif self._cur_step < self._steps: elif self._cur_step < self._steps:
self._cur_step += 1 self._cur_step += 1
return None
else: else:
self._inner_dataloader.reset() self._inner_dataloader.reset()
self.sampler_iter = iter(self.index_sampler) self.sampler_iter = iter(self.index_sampler)
...@@ -138,6 +135,16 @@ class NonIterableGeneratorLoader(DistributedDataLoader): ...@@ -138,6 +135,16 @@ class NonIterableGeneratorLoader(DistributedDataLoader):
) )
return steps_per_epoch return steps_per_epoch
@property
def index_sampler(self):
if self.auto_collate_batch:
return self.batch_sampler
else:
if self.dataset_kind == _DatasetKind.MAP:
return list(range(len(self.dataset)))
else:
return _InfiniteIterableSampler(self.dataset, 1)
def _create_inner_dataloader(self): def _create_inner_dataloader(self):
def data_generator(): def data_generator():
...@@ -170,7 +177,83 @@ class NonIterableGeneratorLoader(DistributedDataLoader): ...@@ -170,7 +177,83 @@ class NonIterableGeneratorLoader(DistributedDataLoader):
yield partial_data yield partial_data
dataloader = paddle.fluid.io.DataLoader.from_generator( dataloader = paddle.fluid.io.DataLoader.from_generator(
feed_list=self.feed_list, capacity=70, iterable=False) feed_list=self.feed_list,
capacity=self.capacity,
use_double_buffer=self.use_double_buffer,
# iterable=self.iterable,
iterable=False,
return_list=self.return_list,
use_multiprocess=self.use_multiprocess,
drop_last=self.drop_last)
dataloader.set_batch_generator(data_generator, self.places) dataloader.set_batch_generator(data_generator, self.places)
return dataloader return dataloader
class DistributedDataLoader(DistributedDataLoaderBase):
def __init__(self,
dataset,
feed_list=None,
places=None,
return_list=True,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
epochs=1,
steps_per_epoch=None,
split_data=True,
data_parallel_world_size=[],
data_parallel_rank=[]):
self.dataset = dataset
self.feed_list = feed_list
self.return_list = return_list
self.places = places
self.batch_size = batch_size
self.shuffle = shuffle
self.drop_last = drop_last
self.collate_fn = collate_fn
self.num_workers = num_workers
self.use_buffer_reader = use_buffer_reader
self.use_shared_memory = use_shared_memory
self.timeout = timeout
self.worker_init_fn = worker_init_fn
self.epochs = epochs
self.steps_per_epoch = steps_per_epoch
self.dp_world_sizes = data_parallel_world_size
self.dp_ranks = data_parallel_rank
self.split_data = split_data
# TODO: rank info
self.batch_sampler = DistributedBatchSampler(
self.dataset, self.batch_size, self.dp_world_sizes[0],
self.dp_ranks[0], self.shuffle, self.drop_last)
self._inner_dataloader = self._create_inner_dataloader()
def __iter__(self):
return self
def __next__(self):
return next(self.data)
def _create_inner_dataloader(self):
dataloader = paddle.fluid.io.DataLoader(
self.dataset,
feed_list=self.feed_list,
places=self.places,
return_list=self.return_list,
batch_sampler=self.batch_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_workers,
use_buffer_reader=self.use_buffer_reader,
use_shared_memory=self.use_shared_memory,
timeout=self.timeout,
worker_init_fn=self.worker_init_fn)
self.data = (x for x in dataloader)
return dataloader
...@@ -40,9 +40,8 @@ from .planner_v2 import Planner ...@@ -40,9 +40,8 @@ from .planner_v2 import Planner
from .parallelizer_v2 import Parallelizer from .parallelizer_v2 import Parallelizer
from .dist_op import DistributedOperator from .dist_op import DistributedOperator
from .dist_saver import DistributedSaver from .dist_saver import DistributedSaver
from .dist_loader import NonIterableGeneratorLoader from .dist_loader import DistributedDataLoaderFromGenerator, DistributedDataLoader
from .utils import to_list from .utils import to_list, get_logger, get_dist_attr
from .utils import get_logger, get_dist_attr
from .process_group import new_process_group, get_all_process_groups from .process_group import new_process_group, get_all_process_groups
from .dist_context import DistributedContext, get_default_distributed_context from .dist_context import DistributedContext, get_default_distributed_context
from .strategy import Strategy from .strategy import Strategy
...@@ -127,11 +126,11 @@ class Engine: ...@@ -127,11 +126,11 @@ class Engine:
) )
self._model = model self._model = model
if loss and not isinstance(loss, # if loss and not isinstance(loss,
paddle.nn.Layer) and not callable(loss): # paddle.nn.Layer) and not callable(loss):
raise TypeError( # raise TypeError(
"'loss' must be sub classes of `paddle.nn.Layer` or any callable function." # "'loss' must be sub classes of `paddle.nn.Layer` or any callable function."
) # )
self._loss = loss self._loss = loss
if optimizer and not isinstance( if optimizer and not isinstance(
...@@ -184,39 +183,184 @@ class Engine: ...@@ -184,39 +183,184 @@ class Engine:
self._feed_vars = {} self._feed_vars = {}
self._fetch_vars = {} self._fetch_vars = {}
self._planners = {} self._planners = {}
self._mode_init_states = { self._has_prepared = {"train": False, "eval": False, "predict": False}
self._has_prepared_reader = {
"train": False, "train": False,
"eval": False, "eval": False,
"predict": False "predict": False
} }
self._inputs_spec = []
self._labels_spec = []
self._inputs = []
self._labels = []
self._skip_build = False
self._outside_dataloader = False
self._planned_mode = None self._planned_mode = None
self._dygraph_mode = False self._dygraph_mode = False
self._tuning = self._strategy.tuning self._tuning = self._strategy.tuning
def _prepare_program(self, mode): def _prepare_data_spec(self, data, split, batch_size):
# Do the build process inputs_spec = []
self._build(mode) labels_spec = []
# Do the planning process if isinstance(data, paddle.io.IterableDataset):
self._plan(mode) if split is None:
# Do the parallel process inputs, labels = next(iter(data))
self._parallel(mode) else:
# Init comm and startup program sample = next(iter(data))
self._initialize(mode) inputs = sample[:split]
self._mode_init_states[mode] = True labels = sample[split:]
elif isinstance(data, paddle.io.Dataset):
if split is None:
inputs, labels = data[0]
else:
sample = data[0]
inputs = sample[:split]
labels = sample[split:]
else:
raise ValueError(
"Data should be a Dataset or IterableDatset, but received {}.".
format(type(data).__name__))
inputs = to_list(inputs)
labels = to_list(labels)
num_shards = self._strategy.dataset.num_shards
def _adjust_item_spec(num_shards, spec):
if num_shards > 1 and len(spec.shape) > 1:
spec.shape[0] = spec.shape[0] * num_shards
def _prepare_feed(self, user_feeds=None, mode="train"): def _infer_item_spec(item, name, batch_size, specs):
if isinstance(item, np.ndarray):
spec = InputSpec.from_numpy(item, name)
if batch_size is None:
_adjust_item_spec(num_shards, spec)
specs.append(spec)
else:
specs.append(spec.batch(batch_size))
elif isinstance(item, (Variable, core.VarBase, core.eager.Tensor)):
_adjust_item_spec(num_shards, spec)
spec = InputSpec.from_tensor(item, name)
if batch_size is None:
specs.append(spec)
else:
specs.append(spec.batch(batch_size))
else:
specs.append(InputSpec([batch_size], type(item), name))
if inputs is not None:
for i, item in enumerate(inputs):
assert item is not None, "Receive None input."
name = "input" + str(i)
_infer_item_spec(item, name, batch_size, inputs_spec)
if labels is not None:
for i, item in enumerate(labels):
assert item is not None, "Receive None input."
name = "label" + str(i)
_infer_item_spec(item, name, batch_size, labels_spec)
inputs_spec = self._validate_spec(inputs_spec)
labels_spec = self._validate_spec(labels_spec)
return inputs_spec, labels_spec
def _prepare_data_tensor(self,
inputs_spec,
labels_spec,
inputs=None,
labels=None):
if _non_static_mode() or self._dygraph_mode:
return None, None
inputs_spec = inputs_spec if inputs_spec else []
labels_spec = labels_spec if labels_spec else []
if inputs_spec:
assert isinstance(inputs_spec, list), \
"inputs should be list, but received {}".format(type(inputs_spec))
if inputs is None:
inputs = [s._create_feed_layer() for s in inputs_spec]
else:
assert isinstance(inputs, list), \
"inputs should be list, but received {}".format(type(inputs))
for input_spec, input in zip(inputs_spec, inputs):
if input_spec.shape != input.shape:
input.desc.set_shape(input_spec.shape)
if labels_spec:
assert isinstance(labels_spec, list), \
"labels should be list, but received {}".format(type(labels_spec))
if labels is None:
labels = [s._create_feed_layer() for s in labels_spec]
else:
assert isinstance(labels, list), \
"labels should be list, but received {}".format(type(labels))
for label_spec, label in zip(labels_spec, labels):
if label_spec.shape != label.shape:
label.desc.set_shape(label_spec.shape)
return inputs, labels
def _prepare_reader(self):
dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self._mode]
dist_main_block = dist_main_prog.global_block()
# NOTE: this list may be changed if Paddle changes the existing rules.
related_reader_ops = [
"create_py_reader", "create_double_buffer_reader", "read"
]
# remove the first three ops if multiple run fit/evaluate/predict
if dist_main_block.ops[0].type == 'create_py_reader':
for i in range(len(related_reader_ops)):
if dist_main_block.ops[0].type in related_reader_ops:
dist_main_block._remove_op(0, sync=False)
dist_main_block._sync_with_cpp()
# Step 1: find the reader ops
reader_op_indices = []
for idx, op in enumerate(dist_main_block.ops):
if op.type in related_reader_ops:
reader_op_indices.append(idx)
# Step 2: insert the new reader ops to cpp
new_reader_ops = []
for idx in reversed(reader_op_indices):
new_op_desc = dist_main_block.desc._prepend_op()
new_op_desc.copy_from(dist_main_block.ops[idx].desc)
new_op = Operator(dist_main_block,
new_op_desc,
type=new_op_desc.type())
new_reader_ops.append(new_op)
dist_op = DistributedOperator(new_op)
dist_context.add_dist_op_for_program(dist_op)
# Step 3: insert the new reader ops to python
for new_op in new_reader_ops:
dist_main_block.ops.insert(0, new_op)
for i in range(len(reader_op_indices)):
reader_op_indices[i] += len(reader_op_indices)
# Step 4: remove the old reader ops from python and cpp
for idx in reversed(reader_op_indices):
op = dist_main_block.ops.pop(idx)
dist_main_block.desc._remove_op(idx, idx + 1)
dist_main_block._sync_with_cpp()
self._has_prepared_reader[self._mode] = True
def _prepare_feed(self, data, user_feeds, mode):
feeds = {}
if data is not None:
if isinstance(data, (list, tuple)):
if len(data) == 1 and isinstance(data[0], dict):
for name, data in data[0].items():
feeds[name] = data
else:
raise ValueError("Unsupported data {}".format(data))
elif isinstance(data, dict):
for name, data in data.items():
feeds[name] = data
else:
raise ValueError("Unsupported data {}".format(data))
if user_feeds is not None: if user_feeds is not None:
assert isinstance(user_feeds, dict), \ assert isinstance(user_feeds, dict), \
"user_feeds must be a dict, but receive {}".format(type(user_feeds).__name__) "user_feeds must be a dict, but receive {}".format(type(user_feeds).__name__)
feeds = {} for name, data in user_feeds.items():
# TODO: add inputs and labels feed dict feeds[name] = data
if user_feeds is not None:
for name, var in user_feeds.items():
feeds[name] = var
return feeds return feeds
def _prepare_fetch(self, user_fetches=None, mode="train"): def _prepare_fetch(self, user_fetches, mode):
if user_fetches is not None: if user_fetches is not None:
assert isinstance(user_fetches, list), \ assert isinstance(user_fetches, list), \
"user_fetches must be a list, but receive {}".format(type(user_fetches).__name__) "user_fetches must be a list, but receive {}".format(type(user_fetches).__name__)
...@@ -232,6 +376,8 @@ class Engine: ...@@ -232,6 +376,8 @@ class Engine:
if var_name not in fetch_names: if var_name not in fetch_names:
fetch_names.append(var_name) fetch_names.append(var_name)
group_indices.append(fetch_names.index(var_name)) group_indices.append(fetch_names.index(var_name))
if not group_indices:
fetch_names.append([])
fetch_indices.append(group_indices) fetch_indices.append(group_indices)
if mode != "predict": if mode != "predict":
...@@ -251,13 +397,13 @@ class Engine: ...@@ -251,13 +397,13 @@ class Engine:
def _prepare_logger(self, def _prepare_logger(self,
outs, outs,
mode="train",
epoch=None, epoch=None,
step=None, step=None,
lr=None, lr=None,
fetch_names=None, fetch_names=None,
fetch_indices=None, fetch_indices=None,
profiler_log=""): profiler_log="",
mode=None):
logs = "[{}] ".format(mode) logs = "[{}] ".format(mode)
if epoch is not None: if epoch is not None:
logs += "epoch: {:d} ".format(epoch) logs += "epoch: {:d} ".format(epoch)
...@@ -274,6 +420,8 @@ class Engine: ...@@ -274,6 +420,8 @@ class Engine:
group_idx += 1 group_idx += 1
# logging metrics # logging metrics
if mode != "predict": if mode != "predict":
metric_vars = self._fetch_vars[mode]["metrics"]
if metric_vars:
for metric in self._metrics: for metric in self._metrics:
metrics_indices = fetch_indices[group_idx] metrics_indices = fetch_indices[group_idx]
metric_out = [] metric_out = []
...@@ -295,9 +443,10 @@ class Engine: ...@@ -295,9 +443,10 @@ class Engine:
idx = fetch_names.index(var.name) idx = fetch_names.index(var.name)
# Use the user defined name for logging # Use the user defined name for logging
logs += "{}: {} ".format(name, outs[idx]) logs += "{}: {} ".format(name, outs[idx])
logs += profiler_log
self._logger.info(logs) self._logger.info(logs)
def _prepare_history(self, outs, mode="train", fetch_indices=None): def _prepare_history(self, outs, fetch_indices=None, mode=None):
history = {} history = {}
group_idx = 0 group_idx = 0
# store loss # store loss
...@@ -310,6 +459,8 @@ class Engine: ...@@ -310,6 +459,8 @@ class Engine:
group_idx += 1 group_idx += 1
# store metrics # store metrics
if mode != "predict": if mode != "predict":
metric_vars = self._fetch_vars[mode]["metrics"]
if metric_vars:
for metric in self._metrics: for metric in self._metrics:
metrics_indices = fetch_indices[group_idx] metrics_indices = fetch_indices[group_idx]
metric_out = [] metric_out = []
...@@ -336,14 +487,25 @@ class Engine: ...@@ -336,14 +487,25 @@ class Engine:
history["fetches"] = fetches_values history["fetches"] = fetches_values
return history return history
def _prepare_program(self, mode):
# Do the build process
self._build(mode)
# Do the planning process
self._plan(mode)
# Do the parallel process
self._parallel(mode)
# Init comm and startup program
self._initialize(mode)
self._has_prepared[mode] = True
def _build(self, mode): def _build(self, mode):
if _non_static_mode() or self._dygraph_mode: if _non_static_mode() or self._dygraph_mode:
paddle.disable_static() paddle.disable_static()
self._dygraph_mode = True self._dygraph_mode = True
self._logger.info("Building model with 'to_static' method.") self._logger.info("Building model with 'to_static' method.")
inputs_spec = self.inputs_spec inputs_spec = self._inputs_spec
labels_spec = self.labels_spec if self.labels_spec else [] labels_spec = self._labels_spec if self._labels_spec else []
self.program_helper = ProgramHelper(self._model, self._loss, self.program_helper = ProgramHelper(self._model, self._loss,
self._metrics, inputs_spec, self._metrics, inputs_spec,
labels_spec) labels_spec)
...@@ -360,6 +522,9 @@ class Engine: ...@@ -360,6 +522,9 @@ class Engine:
losses = self.program_helper.loss_vars losses = self.program_helper.loss_vars
metrics = self.program_helper.metric_vars metrics = self.program_helper.metric_vars
self._inputs = inputs
self._labels = labels
paddle.enable_static() paddle.enable_static()
else: else:
# build program in static mode # build program in static mode
...@@ -367,24 +532,26 @@ class Engine: ...@@ -367,24 +532,26 @@ class Engine:
if serial_main_prog is not None: if serial_main_prog is not None:
return return
outputs = []
losses = [] losses = []
metrics = [] metrics = []
inputs = self._inputs
labels = self._labels
serial_main_prog = self._orig_main_prog.clone() serial_main_prog = self._orig_main_prog.clone()
serial_startup_prog = self._orig_startup_prog.clone() serial_startup_prog = self._orig_startup_prog.clone()
if not self._skip_build:
with static.program_guard(serial_main_prog, serial_startup_prog), \ with static.program_guard(serial_main_prog, serial_startup_prog), \
utils.unique_name.guard(): utils.unique_name.guard():
inputs_spec = self.inputs_spec
labels_spec = self.labels_spec if self.labels_spec else []
inputs = [s._create_feed_layer() for s in inputs_spec]
labels = [s._create_feed_layer() for s in labels_spec]
outputs = to_list(self._model(*inputs)) outputs = to_list(self._model(*inputs))
if mode != "predict" and self._loss: if mode != "predict" and self._loss:
losses = to_list(self._loss(*(outputs + labels))) losses = to_list(self._loss(*(outputs + labels)))
if mode != "predict": if mode != "predict" and (outputs or labels):
for metric in self._metrics: for metric in self._metrics:
metrics.append( metrics.append(
to_list(metric.compute(*(outputs + labels)))) to_list(metric.compute(*(outputs + labels))))
else:
losses = to_list(self._loss)
default_ctx = get_default_distributed_context() default_ctx = get_default_distributed_context()
if not default_ctx.has_annotation: if not default_ctx.has_annotation:
...@@ -427,8 +594,8 @@ class Engine: ...@@ -427,8 +594,8 @@ class Engine:
self._optimization_tuner = OptimizationTuner(self._tuning.to_dict(), self._optimization_tuner = OptimizationTuner(self._tuning.to_dict(),
self._dist_contexts[mode], self._dist_contexts[mode],
dataset, dataset,
self.inputs_spec, self._inputs_spec,
self.labels_spec, self._labels_spec,
batch_size=batch_size, batch_size=batch_size,
rank=self._cur_rank) rank=self._cur_rank)
...@@ -452,6 +619,7 @@ class Engine: ...@@ -452,6 +619,7 @@ class Engine:
inputs_var = self._dist_contexts[mode].serial_feed_vars["inputs"] inputs_var = self._dist_contexts[mode].serial_feed_vars["inputs"]
labels_var = self._dist_contexts[mode].serial_feed_vars["labels"] labels_var = self._dist_contexts[mode].serial_feed_vars["labels"]
block = self._dist_contexts[mode].serial_main_program.global_block() block = self._dist_contexts[mode].serial_main_program.global_block()
# TODO: check this feed_list
feed_list = [] feed_list = []
for var in inputs_var + labels_var: for var in inputs_var + labels_var:
if var.name in block.vars: if var.name in block.vars:
...@@ -555,85 +723,6 @@ class Engine: ...@@ -555,85 +723,6 @@ class Engine:
dist_startup_prog = self._dist_startup_progs[mode][self._cur_rank] dist_startup_prog = self._dist_startup_progs[mode][self._cur_rank]
self._executor.run(dist_startup_prog) self._executor.run(dist_startup_prog)
def _split_sample_item(self, data, split):
if isinstance(data, paddle.io.IterableDataset):
if split is None:
inputs, labels = next(iter(data))
else:
sample = next(iter(data))
inputs = sample[:split]
labels = sample[split:]
elif isinstance(data, paddle.io.Dataset):
if split is None:
inputs, labels = data[0]
else:
sample = data[0]
inputs = sample[:split]
labels = sample[split:]
else:
raise ValueError(
"Data should be a Dataset or IterableDatset, but received {}.".
format(type(data).__name__))
inputs = to_list(inputs)
labels = to_list(labels)
return inputs, labels
def _infer_sample_spec(self, inputs, labels, batch_size):
self.inputs_spec = []
self.labels_spec = []
def _infer_item_spec(item, name, batch_size, specs):
if isinstance(item, np.ndarray):
spec = InputSpec.from_numpy(item, name)
if batch_size is None:
specs.append(spec)
else:
specs.append(spec.batch(batch_size))
elif isinstance(item, (Variable, core.VarBase, core.eager.Tensor)):
spec = InputSpec.from_tensor(item, name)
if batch_size is None:
specs.append(spec)
else:
specs.append(spec.batch(batch_size))
else:
specs.append(InputSpec([batch_size], type(item), name))
if inputs is not None:
for i, item in enumerate(inputs):
assert item is not None, "Receive None input."
name = "input" + str(i)
_infer_item_spec(item, name, batch_size, self.inputs_spec)
if labels is not None:
for i, item in enumerate(labels):
assert item is not None, "Receive None input."
name = "label" + str(i)
_infer_item_spec(item, name, batch_size, self.labels_spec)
self.inputs_spec = self._validate_spec(self.inputs_spec)
self.labels_spec = self._validate_spec(self.labels_spec)
def __call__(self,
inputs=None,
labels=None,
feeds=None,
fetches=None,
mode="train"):
feed_dict = self._prepare_feed(feeds, mode)
fetch_names, fetch_indices = self._prepare_fetch(fetches, mode)
try:
outs = self._executor.run(
self.main_program,
feed=feed_dict,
fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy)
except core.EOFException:
pass
self._prepare_logger(outs, self.mode, None, None, None, fetch_names,
fetch_indices)
history = self._prepare_history(outs, self.mode, fetch_indices)
return history
def fit(self, def fit(self,
train_data, train_data,
train_sample_split=None, train_sample_split=None,
...@@ -712,21 +801,28 @@ class Engine: ...@@ -712,21 +801,28 @@ class Engine:
epochs=2, epochs=2,
batch_size=64) batch_size=64)
""" """
self.mode = 'train' self._mode = 'train'
inputs, labels = self._split_sample_item(train_data, train_sample_split) self._inputs_spec, self._labels_spec = self._prepare_data_spec(
self._infer_sample_spec(inputs, labels, batch_size) train_data, train_sample_split, batch_size)
if not self._mode_init_states[self.mode]: self._inputs, self._labels = self._prepare_data_tensor(
self._prepare_program(self.mode) self._inputs_spec, self._labels_spec)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else: else:
self._switch_mode("train") self._switch_mode(self._mode)
train_dataloader = self._prepare_dataloader_from_generator(
assert self.mode in self._dist_main_progs, \ dataset=train_data,
"train model is not ready, please call `engine._prepare_program('train')` first." capacity=70,
train_dataloader = self._prepare_dataloader(train_data, batch_size, # use_double_buffer=use_double_buffer,
epochs, steps_per_epoch, iterable=False,
collate_fn) # return_list=return_list,
# use_multiprocess=use_multiprocess,
fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode) # drop_last=drop_last,
batch_size=batch_size,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
lr_scheduler = self._get_lr_scheduler(self.main_program) lr_scheduler = self._get_lr_scheduler(self.main_program)
with profiler.Profiler(timer_only=True) as prof: with profiler.Profiler(timer_only=True) as prof:
...@@ -746,18 +842,18 @@ class Engine: ...@@ -746,18 +842,18 @@ class Engine:
prof.step() prof.step()
self._prepare_logger(outs, self.mode, epoch, step, lr, self._prepare_logger(outs, epoch, step, lr,
fetch_names, fetch_indices, fetch_names, fetch_indices,
prof.step_info()) prof.step_info(), self._mode)
history = self._prepare_history(outs, self.mode, history = self._prepare_history(outs, fetch_indices,
fetch_indices) self._mode)
if valid_data and epoch % valid_freq == 0: # if valid_data and epoch % valid_freq == 0:
self.evaluate(valid_data, valid_sample_split, batch_size, # self.evaluate(valid_data, valid_sample_split, batch_size,
valid_steps, collate_fn, callbacks) # valid_steps, collate_fn, callbacks)
self._switch_mode("train") # self._switch_mode("train")
else: # else:
self._reset_metrics() # self._reset_metrics()
return history return history
def evaluate(self, def evaluate(self,
...@@ -813,22 +909,32 @@ class Engine: ...@@ -813,22 +909,32 @@ class Engine:
engine.evaluate(valid_dataset, batch_size=64) engine.evaluate(valid_dataset, batch_size=64)
""" """
self.mode = 'eval' self._mode = 'eval'
inputs, labels = self._split_sample_item(valid_data, valid_sample_split) self._inputs_spec, self._labels_spec = self._prepare_data_spec(
self._infer_sample_spec(inputs, labels, batch_size) valid_data, valid_sample_split, batch_size)
if not self._mode_init_states[self.mode]: self._inputs, self._labels = self._prepare_data_tensor(
self._prepare_program(self.mode) self._inputs_spec, self._labels_spec)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else: else:
self._switch_mode("eval") self._switch_mode(self._mode)
assert self._mode in self._dist_main_progs, \
assert self.mode in self._dist_main_progs, \
"eval model is not ready, please call `engine._prepare_program('eval')` first." "eval model is not ready, please call `engine._prepare_program('eval')` first."
valid_dataloader = self._prepare_dataloader(valid_data, valid_dataloader = self._prepare_dataloader_from_generator(
batch_size, dataset=valid_data,
# feed_list=feed_list,
capacity=70,
# use_double_buffer=use_double_buffer,
iterable=False,
# return_list=return_list,
# use_multiprocess=use_multiprocess,
# drop_last=drop_last,
# places=places,
batch_size=batch_size,
# epochs=epochs,
steps_per_epoch=steps, steps_per_epoch=steps,
collate_fn=collate_fn) collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode)
for step, _ in enumerate(valid_dataloader): for step, _ in enumerate(valid_dataloader):
try: try:
...@@ -839,9 +945,9 @@ class Engine: ...@@ -839,9 +945,9 @@ class Engine:
return_numpy=self._strategy.return_numpy) return_numpy=self._strategy.return_numpy)
except core.EOFException: except core.EOFException:
break break
self._prepare_logger(outs, self.mode, None, step, None, fetch_names, self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices) fetch_indices, "", self._mode)
history = self._prepare_history(outs, self.mode, fetch_indices) history = self._prepare_history(outs, fetch_indices, self._mode)
self._reset_metrics() self._reset_metrics()
return history return history
...@@ -895,22 +1001,32 @@ class Engine: ...@@ -895,22 +1001,32 @@ class Engine:
engine = auto.Engine(model) engine = auto.Engine(model)
engine.predict(valid_dataset, batch_size=64) engine.predict(valid_dataset, batch_size=64)
""" """
self.mode = 'predict' self._mode = 'predict'
inputs, labels = self._split_sample_item(test_data, test_sample_split) self._inputs_spec, self._labels_spec = self._prepare_data_spec(
self._infer_sample_spec(inputs, labels, batch_size) test_data, test_sample_split, batch_size)
if not self._mode_init_states[self.mode]: self._inputs, self._labels = self._prepare_data_tensor(
self._prepare_program(self.mode) self._inputs_spec, self._labels_spec)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else: else:
self._switch_mode("predict") self._switch_mode(self._mode)
assert self._mode in self._dist_main_progs, \
assert self.mode in self._dist_main_progs, \
"predict model is not ready, please call `engine._prepare_program('predict')` first." "predict model is not ready, please call `engine._prepare_program('predict')` first."
test_dataloader = self._prepare_dataloader(test_data, test_dataloader = self._prepare_dataloader_from_generator(
batch_size, dataset=test_data,
# feed_list=feed_list,
capacity=70,
# use_double_buffer=use_double_buffer,
iterable=False,
# return_list=return_list,
# use_multiprocess=use_multiprocess,
# drop_last=drop_last,
# places=places,
batch_size=batch_size,
# epochs=epochs,
steps_per_epoch=steps, steps_per_epoch=steps,
collate_fn=collate_fn) collate_fn=collate_fn)
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
fetch_names, fetch_indices = self._prepare_fetch(mode=self.mode)
for step, _ in enumerate(test_dataloader): for step, _ in enumerate(test_dataloader):
try: try:
...@@ -921,62 +1037,200 @@ class Engine: ...@@ -921,62 +1037,200 @@ class Engine:
return_numpy=self._strategy.return_numpy) return_numpy=self._strategy.return_numpy)
except core.EOFException: except core.EOFException:
break break
self._prepare_logger(outs, self.mode, None, step, None, fetch_names, self._prepare_logger(outs, None, step, None, fetch_names,
fetch_indices) fetch_indices, "", self._mode)
history = self._prepare_history(outs, self.mode, fetch_indices) history = self._prepare_history(outs, fetch_indices, self._mode)
return history return history
def _tune(self, tune_data, tune_sample_split=None, batch_size=1): def dataloader(
self.mode = 'train' self,
inputs, labels = self._split_sample_item(tune_data, tune_sample_split)
self._infer_sample_spec(inputs, labels, batch_size)
self._optimization_tuning(self.mode, tune_data, batch_size)
def dataloader(self,
dataset, dataset,
# return_list=True,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
epochs=1,
steps_per_epoch=None,
sample_split=1, sample_split=1,
mode=None):
if mode is not None:
self.to_mode(mode)
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
dataset, sample_split, batch_size)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
dataloader = self._prepare_dataloader(
dataset,
return_list=False,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
collate_fn=collate_fn,
num_workers=num_workers,
use_buffer_reader=use_buffer_reader,
use_shared_memory=use_shared_memory,
timeout=timeout,
worker_init_fn=worker_init_fn,
epochs=epochs,
steps_per_epoch=steps_per_epoch)
return dataloader
def dataloader_from_generator(
self,
dataset,
capacity=70,
use_double_buffer=True,
iterable=True,
# return_list=False,
use_multiprocess=False,
drop_last=True,
batch_size=1, batch_size=1,
epochs=1, epochs=1,
steps_per_epoch=None, steps_per_epoch=None,
collate_fn=None, collate_fn=None,
mode="train", sample_split=1,
from_generator=True): mode=None):
assert from_generator, "Only support from_generator for now" if mode is not None:
self.mode = mode self.to_mode(mode)
inputs, labels = self._split_sample_item(dataset, sample_split) self._inputs_spec, self._labels_spec = self._prepare_data_spec(
self._infer_sample_spec(inputs, labels, batch_size) dataset, sample_split, batch_size)
if not self._mode_init_states[self.mode]: self._inputs, self._labels = self._prepare_data_tensor(
self._prepare_program(self.mode) self._inputs_spec, self._labels_spec)
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else: else:
self._switch_mode("train") self._switch_mode(self._mode)
dataloader = self._prepare_dataloader(dataset, batch_size, epochs, dataloader = self._prepare_dataloader_from_generator(
steps_per_epoch, collate_fn) dataset=dataset,
# feed_list=feed_list,
capacity=capacity,
use_double_buffer=use_double_buffer,
iterable=iterable,
return_list=False,
use_multiprocess=use_multiprocess,
drop_last=drop_last,
# places=places,
batch_size=batch_size,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
collate_fn=collate_fn)
return dataloader return dataloader
def prepare(self,
inputs_spec=None,
labels_spec=None,
inputs=None,
labels=None,
main_program=None,
startup_program=None,
mode=None):
if mode is not None:
self.to_mode(mode)
if inputs or labels:
self._skip_build = True
self._inputs, self._labels = self._prepare_data_tensor(
inputs_spec, labels_spec, inputs, labels)
self._orig_main_prog = main_program
if self._orig_main_prog is None:
self._orig_main_prog = static.default_main_program()
self._orig_startup_prog = startup_program
if self._orig_startup_prog is None:
self._orig_startup_prog = static.default_startup_program()
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
elif inputs_spec or labels_spec:
self._outside_dataloader = True
self._inputs, self._labels = self._prepare_data_tensor(
inputs_spec, labels_spec)
self._orig_main_prog = main_program
if self._orig_main_prog is None:
self._orig_main_prog = static.default_main_program()
self._orig_startup_prog = startup_program
if self._orig_startup_prog is None:
self._orig_startup_prog = static.default_startup_program()
if not self._has_prepared[self._mode]:
self._prepare_program(self._mode)
else:
self._switch_mode(self._mode)
else:
assert self._inputs_spec and self._labels_spec, \
"Please call the dataloader(...) before calling prepare(...)"
def run(
self,
data=None,
# program=None,
feed=None,
fetch_list=None,
# feed_var_name='feed',
# fetch_var_name='fetch',
# scope=None,
# return_numpy=True,
# use_program_cache=False,
# return_merged=True,
# use_prune=False,
mode=None):
if mode is not None:
self.to_mode(mode)
feed_dict = self._prepare_feed(data, feed, self._mode)
fetch_names, fetch_indices = self._prepare_fetch(fetch_list, self._mode)
if self._outside_dataloader and not self._has_prepared_reader[
self._mode]:
self._prepare_reader()
outs = self._executor.run(self.main_program,
feed=feed_dict,
fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy)
self._prepare_logger(outs, None, None, None, fetch_names, fetch_indices,
"", self._mode)
history = self._prepare_history(outs, fetch_indices, self._mode)
return history
def _prepare_dataloader(self, def _prepare_dataloader(self,
dataset, dataset,
batch_size, return_list=True,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
epochs=1, epochs=1,
steps_per_epoch=None, steps_per_epoch=None):
collate_fn=None):
if self._strategy.gradient_merge and batch_size is not None: if self._strategy.gradient_merge and batch_size is not None:
assert batch_size % self._k_steps == 0, \ assert batch_size % self._k_steps == 0, \
"Requires batch_size:[{}] to be divisible by k_steps:[{}].".format(batch_size, self._k_steps) "Requires batch_size:[{}] to be divisible by k_steps:[{}].".format(batch_size, self._k_steps)
batch_size //= self._k_steps batch_size //= self._k_steps
dist_main_prog = self._dist_main_progs[self.mode][self._cur_rank] dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_startup_prog = self._dist_startup_progs[self.mode][self._cur_rank] dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self.mode] dist_context = self._dist_contexts[self._mode]
dist_main_block = dist_main_prog.global_block() dist_main_block = dist_main_prog.global_block()
# NOTE: Get feed_list, then insert dataloader op with sharded var shape. # NOTE: Get feed_list, then insert dataloader op with sharded var shape.
# Cause predict_program does not contain labels var, # Cause predict_program does not contain labels var,
# then we will add labels var from serial_program to dist_program, # then we will add labels var from serial_program to dist_program,
# that maintains the length of feed_list equal to the length of dataset's values. # that maintains the length of feed_list equal to the length of dataset's values.
inputs_var = self._feed_vars[self.mode]["inputs"] inputs_var = self._feed_vars[self._mode]["inputs"]
labels_var = self._feed_vars[self.mode]["labels"] labels_var = self._feed_vars[self._mode]["labels"]
feed_list = [] feed_list = []
for var in inputs_var + labels_var: for var in inputs_var + labels_var:
if var.name in dist_main_block.vars: if var.name in dist_main_block.vars:
...@@ -986,45 +1240,121 @@ class Engine: ...@@ -986,45 +1240,121 @@ class Engine:
copy_var.desc.set_original_id(var.desc.original_id()) copy_var.desc.set_original_id(var.desc.original_id())
feed_list.append(copy_var) feed_list.append(copy_var)
# remove the first three ops if multi run fit/evaluate/predict
op_size = len(dist_main_block.ops)
if dist_main_block.ops[0].type == 'create_py_reader':
op_size -= 3
for _ in range(3):
dist_main_block._remove_op(0, sync=False)
# insert read op at the end of program # insert read op at the end of program
places = paddle.static.cuda_places() places = paddle.static.cuda_places()
with static.program_guard(dist_main_prog, dist_startup_prog): with static.program_guard(dist_main_prog, dist_startup_prog):
dataloader = NonIterableGeneratorLoader( dataloader = DistributedDataLoader(
dataset, dataset,
feed_list, feed_list=feed_list,
places, places=places,
batch_size, return_list=return_list,
epochs, batch_size=batch_size,
steps_per_epoch, shuffle=shuffle,
collate_fn, drop_last=drop_last,
collate_fn=collate_fn,
num_workers=num_workers,
use_buffer_reader=use_buffer_reader,
use_shared_memory=use_shared_memory,
timeout=timeout,
worker_init_fn=worker_init_fn,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
split_data=self._strategy.split_data,
data_parallel_world_size=self._dp_world_sizes, data_parallel_world_size=self._dp_world_sizes,
data_parallel_rank=self._dp_ranks, data_parallel_rank=self._dp_ranks)
split_data=self._strategy.split_data)
# move read op from the end of program to the start of program
new_op_size = len(dist_main_block.ops)
for _ in range(new_op_size - 1, op_size - 1, -1):
op = dist_main_block.ops[new_op_size - 1]
new_op_desc = dist_main_block.desc._prepend_op()
new_op_desc.copy_from(op.desc)
new_op = Operator(dist_main_block,
new_op_desc,
type=new_op_desc.type())
dist_main_block.ops.insert(0, new_op)
dist_op = DistributedOperator(new_op)
dist_context.add_dist_op_for_program(dist_op)
for _ in range(new_op_size - op_size):
dist_main_block._remove_op(new_op_size, sync=False)
dist_main_block._sync_with_cpp()
return dataloader return dataloader
def _prepare_dataloader_from_generator(self,
dataset,
capacity=None,
use_double_buffer=True,
iterable=True,
return_list=False,
use_multiprocess=False,
drop_last=True,
batch_size=1,
epochs=1,
steps_per_epoch=None,
collate_fn=None):
if self._strategy.gradient_merge and batch_size is not None:
assert batch_size % self._k_steps == 0, \
"Requires batch_size:[{}] to be divisible by k_steps:[{}].".format(batch_size, self._k_steps)
batch_size //= self._k_steps
dist_main_prog = self._dist_main_progs[self._mode][self._cur_rank]
dist_startup_prog = self._dist_startup_progs[self._mode][self._cur_rank]
dist_context = self._dist_contexts[self._mode]
dist_main_block = dist_main_prog.global_block()
# NOTE: Get feed_list, then insert dataloader op with sharded var shape.
# Cause predict_program does not contain labels var,
# then we will add labels var from serial_program to dist_program,
# that maintains the length of feed_list equal to the length of dataset's values.
inputs_var = self._feed_vars[self._mode]["inputs"]
labels_var = self._feed_vars[self._mode]["labels"]
feed_list = []
for var in inputs_var + labels_var:
if var.name in dist_main_block.vars:
feed_list.append(dist_main_block.vars[var.name])
else:
copy_var = dist_main_block._clone_variable(var, var.persistable)
copy_var.desc.set_original_id(var.desc.original_id())
feed_list.append(copy_var)
# # remove the first three ops if multi run fit/evaluate/predict
# self._op_size = len(dist_main_block.ops)
# if dist_main_block.ops[0].type == 'create_py_reader':
# op_size -= 3
# for _ in range(3):
# dist_main_block._remove_op(0, sync=False)
places = paddle.static.cuda_places()
with static.program_guard(dist_main_prog, dist_startup_prog):
dataloader = DistributedDataLoaderFromGenerator(
dataset=dataset,
feed_list=feed_list,
capacity=capacity,
use_double_buffer=use_double_buffer,
iterable=iterable,
return_list=return_list,
use_multiprocess=use_multiprocess,
drop_last=drop_last,
places=places,
batch_size=batch_size,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
collate_fn=collate_fn,
split_data=self._strategy.split_data,
data_parallel_world_size=self._dp_world_sizes,
data_parallel_rank=self._dp_ranks)
self._prepare_reader()
# # move read op from the end of program to the start of program
# new_op_size = len(dist_main_block.ops)
# for _ in range(new_op_size - 1, op_size - 1, -1):
# op = dist_main_block.ops[new_op_size - 1]
# new_op_desc = dist_main_block.desc._prepend_op()
# new_op_desc.copy_from(op.desc)
# new_op = Operator(dist_main_block,
# new_op_desc,
# type=new_op_desc.type())
# dist_main_block.ops.insert(0, new_op)
# dist_op = DistributedOperator(new_op)
# dist_context.add_dist_op_for_program(dist_op)
# for _ in range(new_op_size - op_size):
# dist_main_block._remove_op(new_op_size, sync=False)
# dist_main_block._sync_with_cpp()
return dataloader
def _tune(self, tune_data, tune_sample_split=None, batch_size=1):
self._mode = 'train'
self._inputs_spec, self._labels_spec = self._prepare_data_spec(
tune_data, tune_sample_split, batch_size)
self._inputs, self._labels = self._prepare_data_tensor(
self._inputs_spec, self._labels_spec)
self._optimization_tuning(self._mode, tune_data, batch_size)
def _validate_spec(self, specs): def _validate_spec(self, specs):
specs = to_list(specs) specs = to_list(specs)
self._k_steps = self._strategy.gradient_merge.k_steps self._k_steps = self._strategy.gradient_merge.k_steps
...@@ -1108,9 +1438,14 @@ class Engine: ...@@ -1108,9 +1438,14 @@ class Engine:
metric.reset() metric.reset()
def _switch_mode(self, mode): def _switch_mode(self, mode):
self.mode = mode self.to_mode(mode)
self._initialize(mode) self._initialize(mode)
def to_mode(self, mode):
assert mode in ["train", "eval", "predict"], \
"mode {} should be one of ['train', 'eval', 'predict']".format(mode)
self._mode = mode
def _set_state_dict(self, mode, strict, state_dict, dist_attr): def _set_state_dict(self, mode, strict, state_dict, dist_attr):
program = self._dist_main_progs[mode][self._cur_rank] program = self._dist_main_progs[mode][self._cur_rank]
dist_context = self._dist_contexts[mode] dist_context = self._dist_contexts[mode]
...@@ -1129,7 +1464,7 @@ class Engine: ...@@ -1129,7 +1464,7 @@ class Engine:
is 'dirname/file_prefix' or 'file_prefix'. if empty str. is 'dirname/file_prefix' or 'file_prefix'. if empty str.
A exception will be raised. A exception will be raised.
training (bool, optional): Whether to save for training. If not, save training (bool, optional): Whether to save for training. If not, save
for inference only. If `training` is set to True, the optimzer state for inference only. If `training` is set to True, the optimizer state
will be saved. Otherwise, only the model and parameters are saved. will be saved. Otherwise, only the model and parameters are saved.
This function will silently overwrite existing file at the target This function will silently overwrite existing file at the target
location. Default: True. location. Default: True.
...@@ -1259,42 +1594,34 @@ class Engine: ...@@ -1259,42 +1594,34 @@ class Engine:
" or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer)) " or `paddle.fluid.optimizer.Optimizer`, but got {}.".format(type(optimizer))
) )
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, mode):
self._mode = mode
@property @property
def main_program(self): def main_program(self):
return self._dist_main_progs[self.mode][self._cur_rank] return self._dist_main_progs[self._mode][self._cur_rank]
@property @property
def startup_program(self): def startup_program(self):
return self._dist_startup_progs[self.mode][self._cur_rank] return self._dist_startup_progs[self._mode][self._cur_rank]
@property @property
def dist_context(self): def dist_context(self):
return self._dist_contexts[self.mode] return self._dist_contexts[self._mode]
@property @property
def serial_main_program(self): def serial_main_program(self):
return self._serial_main_progs[self.mode] return self._serial_main_progs[self._mode]
@property @property
def serial_startup_program(self): def serial_startup_program(self):
return self._serial_startup_progs[self.mode] return self._serial_startup_progs[self._mode]
@property @property
def fetch_vars(self): def fetch_vars(self):
return self._fetch_vars[self.mode] return self._fetch_vars[self._mode]
@property @property
def inputs(self): def inputs(self):
return self.inputs_spec return self._inputs
@property @property
def labels(self): def labels(self):
return self.labels_spec return self._labels
...@@ -210,11 +210,11 @@ def get_collection(name): ...@@ -210,11 +210,11 @@ def get_collection(name):
return _g_collections[name] return _g_collections[name]
def add_to_collection(collection_name, value, value_name=None): def add_to_collection(collection_name, value, name=None):
if collection_name not in _g_collections: if collection_name not in _g_collections:
_g_collections[collection_name] = [] _g_collections[collection_name] = []
if value_name is not None: if name is not None:
_g_collections[collection_name].append((value_name, value)) _g_collections[collection_name].append((name, value))
else: else:
_g_collections[collection_name].append((None, value)) _g_collections[collection_name].append((None, value))
......
...@@ -23,7 +23,7 @@ from .dist_attribute import OperatorDistributedAttribute ...@@ -23,7 +23,7 @@ from .dist_attribute import OperatorDistributedAttribute
from .utils import is_backward_op, is_forward_op, is_loss_op, is_optimize_op from .utils import is_backward_op, is_forward_op, is_loss_op, is_optimize_op
from .operators.common import BACKWARD_ONLY_DIST_OPS from .operators.common import BACKWARD_ONLY_DIST_OPS
__varname_not_in_block__ = ["lod_tensor_blocking_queue_0"] __varname_not_in_block__ = ["lod_tensor_blocking_queue"]
__not_shape_var_type__ = [ __not_shape_var_type__ = [
core.VarDesc.VarType.READER, core.VarDesc.VarType.STEP_SCOPES core.VarDesc.VarType.READER, core.VarDesc.VarType.STEP_SCOPES
] ]
...@@ -238,7 +238,9 @@ class Partitioner(object): ...@@ -238,7 +238,9 @@ class Partitioner(object):
target_block, serial_input_varname, target_block, serial_input_varname,
new_varname) new_varname)
else: else:
assert serial_input_varname in __varname_not_in_block__ for varname_not_in_block in __varname_not_in_block__:
assert varname_not_in_block in serial_input_varname, \
"{} is not found".format(serial_input_varname)
self._serial2dist_varname_mapping[ self._serial2dist_varname_mapping[
serial_input_varname] = new_varname serial_input_varname] = new_varname
......
...@@ -45,7 +45,8 @@ def get_var_with_recursion(var_name, block, program): ...@@ -45,7 +45,8 @@ def get_var_with_recursion(var_name, block, program):
parent_block = program.blocks[block.parent_idx] parent_block = program.blocks[block.parent_idx]
if var_name in parent_block.vars: if var_name in parent_block.vars:
var = parent_block.vars[var_name] var = parent_block.vars[var_name]
assert var is not None assert var is not None, \
"{} is not found".format(var.name)
return var return var
...@@ -1838,8 +1839,8 @@ class Resharder: ...@@ -1838,8 +1839,8 @@ class Resharder:
idx_offset = 0 idx_offset = 0
for var_name in input_var_names: for var_name in input_var_names:
# skip lod_tensor_blocking_queue_0 # skip lod_tensor_blocking_queue_? name
if var_name == "lod_tensor_blocking_queue_0": if "lod_tensor_blocking_queue" in var_name:
continue continue
var = get_var_with_recursion(var_name, block, var = get_var_with_recursion(var_name, block,
self.auto_parallel_main_prog) self.auto_parallel_main_prog)
......
...@@ -114,6 +114,13 @@ class TuningConfig(BaseConfig): ...@@ -114,6 +114,13 @@ class TuningConfig(BaseConfig):
super(TuningConfig, self).__init__(category, config_dict) super(TuningConfig, self).__init__(category, config_dict)
class DatasetConfig(BaseConfig):
def __init__(self, config_dict=None):
category = constants.DATASET
super(DatasetConfig, self).__init__(category, config_dict)
class Strategy(BaseConfig): class Strategy(BaseConfig):
""" """
The `Strategy` object is used to configure the paralleization and optimization beheviors. The `Strategy` object is used to configure the paralleization and optimization beheviors.
...@@ -178,3 +185,6 @@ class Strategy(BaseConfig): ...@@ -178,3 +185,6 @@ class Strategy(BaseConfig):
config_dict = self._config_dict.get(constants.TUNING, None) config_dict = self._config_dict.get(constants.TUNING, None)
self.tuning = TuningConfig(config_dict) self.tuning = TuningConfig(config_dict)
config_dict = self._config_dict.get(constants.DATASET, None)
self.dataset = DatasetConfig(config_dict)
...@@ -23,7 +23,7 @@ import paddle ...@@ -23,7 +23,7 @@ import paddle
from paddle.fluid.framework import Program, _current_expected_place from paddle.fluid.framework import Program, _current_expected_place
from paddle.fluid.framework import Operator from paddle.fluid.framework import Operator
from paddle.distributed.auto_parallel.process_group import get_all_process_groups, new_process_group from paddle.distributed.auto_parallel.process_group import get_all_process_groups, new_process_group
from paddle.distributed.auto_parallel.dist_loader import NonIterableGeneratorLoader from paddle.distributed.auto_parallel.dist_loader import DistributedDataLoaderFromGenerator
from paddle.distributed.collective import _get_global_env from paddle.distributed.collective import _get_global_env
paddle.enable_static() paddle.enable_static()
...@@ -132,13 +132,14 @@ def create_dataloader(main_program, ...@@ -132,13 +132,14 @@ def create_dataloader(main_program,
# insert read op at the end of program # insert read op at the end of program
places = paddle.static.cuda_places() places = paddle.static.cuda_places()
with paddle.static.program_guard(main_program, startup_program): with paddle.static.program_guard(main_program, startup_program):
dataloader = NonIterableGeneratorLoader( dataloader = DistributedDataLoaderFromGenerator(
dataset, dataset=dataset,
feed_list, feed_list=feed_list,
places, capacity=70,
dataset.batch_size, places=places,
epochs, batch_size=dataset.batch_size,
steps_per_epoch, epochs=epochs,
steps_per_epoch=steps_per_epoch,
data_parallel_world_size=dataset.dp_world_size, data_parallel_world_size=dataset.dp_world_size,
data_parallel_rank=dataset.dp_rank) data_parallel_rank=dataset.dp_rank)
......
...@@ -16,6 +16,8 @@ import tempfile ...@@ -16,6 +16,8 @@ import tempfile
import os import os
import numpy as np import numpy as np
import paddle import paddle
import paddle.static as static
import paddle.utils as utils
import paddle.nn as nn import paddle.nn as nn
import paddle.nn.functional as F import paddle.nn.functional as F
from paddle.io import Dataset from paddle.io import Dataset
...@@ -26,7 +28,8 @@ paddle.enable_static() ...@@ -26,7 +28,8 @@ paddle.enable_static()
global_process_mesh = auto.ProcessMesh(mesh=[0, 1]) global_process_mesh = auto.ProcessMesh(mesh=[0, 1])
PP_MESH_0 = auto.ProcessMesh([0]) PP_MESH_0 = auto.ProcessMesh([0])
PP_MESH_1 = auto.ProcessMesh([1]) PP_MESH_1 = auto.ProcessMesh([1])
batch_size = 1 epoch_num = 1
batch_size = 2
batch_num = 10 batch_num = 10
hidden_size = 1024 hidden_size = 1024
sequence_len = 512 sequence_len = 512
...@@ -36,6 +39,8 @@ class_num = 10 ...@@ -36,6 +39,8 @@ class_num = 10
paddle.seed(44) paddle.seed(44)
is_fetch = True is_fetch = True
is_feed = True
my_feed_vars = []
class MyDataset(Dataset): class MyDataset(Dataset):
...@@ -53,6 +58,23 @@ class MyDataset(Dataset): ...@@ -53,6 +58,23 @@ class MyDataset(Dataset):
return self.num_samples return self.num_samples
def get_random_inputs_and_labels(image_shape, label_shape):
input = np.random.random(size=image_shape).astype('float32')
label = np.random.random(size=label_shape).astype('int64')
return input, label
def batch_generator_creator():
def __reader__():
for _ in range(batch_num):
batch_input, batch_label = get_random_inputs_and_labels(
[batch_size, image_size], [batch_size, 1])
yield batch_input, batch_label
return __reader__
class MLPLayer(nn.Layer): class MLPLayer(nn.Layer):
def __init__(self, def __init__(self,
...@@ -82,16 +104,20 @@ class MLPLayer(nn.Layer): ...@@ -82,16 +104,20 @@ class MLPLayer(nn.Layer):
def forward(self, input): def forward(self, input):
out = auto.shard_op(self.norm, PP_MESH_0)(input) out = auto.shard_op(self.norm, PP_MESH_0)(input)
out = self.linear0(out) out = self.linear0(out)
if is_feed:
my_feed_vars.append((out, out.shape))
out = F.gelu(out, approximate=True) out = F.gelu(out, approximate=True)
out = auto.shard_op(self.linear1, PP_MESH_1)(out) out = auto.shard_op(self.linear1, PP_MESH_1)(out)
out = self.dropout(out) out = self.dropout(out)
out = self.linear2(out) out = self.linear2(out)
if is_feed:
my_feed_vars.append((out, out.shape))
if is_fetch: if is_fetch:
auto.fetch(out, "my_out", logging=True) auto.fetch(out, "my_out", logging=True)
return out return out
def train(fetch): def train_high_level(fetch):
global is_fetch global is_fetch
is_fetch = fetch is_fetch = fetch
mlp = MLPLayer(hidden_size=hidden_size, mlp = MLPLayer(hidden_size=hidden_size,
...@@ -135,7 +161,7 @@ def train(fetch): ...@@ -135,7 +161,7 @@ def train(fetch):
temp_dir.cleanup() temp_dir.cleanup()
def train_callable(): def train_low_level():
mlp = MLPLayer(hidden_size=hidden_size, mlp = MLPLayer(hidden_size=hidden_size,
intermediate_size=4 * hidden_size, intermediate_size=4 * hidden_size,
dropout_ratio=0.1, dropout_ratio=0.1,
...@@ -151,31 +177,73 @@ def train_callable(): ...@@ -151,31 +177,73 @@ def train_callable():
strategy = auto.Strategy() strategy = auto.Strategy()
strategy.auto_mode = "semi" strategy.auto_mode = "semi"
engine = auto.Engine(mlp, loss, optimizer, metric, strategy=strategy) engine = auto.Engine(mlp, loss, optimizer, metrics=None, strategy=strategy)
feed_dict = {}
for feed_var, shape in my_feed_vars:
feed_dict[feed_var.name] = np.zeros(shape, dtype="float32")
# Build normal normal dataloader
# train # train
train_dataset = MyDataset(batch_num * batch_size) train_dataset = MyDataset(batch_num * batch_size)
train_dataloader = engine.dataloader(train_dataset, train_dataloader = engine.dataloader(train_dataset,
batch_size=batch_size, batch_size=batch_size,
mode="train") mode="train")
for _ in train_dataloader: engine.prepare(mode="train")
outs = engine(mode="train") for data in train_dataloader:
outs = engine.run(data, feed=feed_dict, mode="train")
# eval # eval
eval_dataset2 = MyDataset(batch_size) eval_dataset2 = MyDataset(batch_size)
eval_dataloader = engine.dataloader(eval_dataset2, eval_dataloader = engine.dataloader(eval_dataset2,
batch_size=batch_size, batch_size=batch_size,
mode="eval") mode="eval")
for _ in eval_dataloader: engine.prepare(mode="eval")
outs = engine(mode="eval") for data in eval_dataloader:
outs = engine.run(data, feed=feed_dict, mode="eval")
# predict # predict
engine.to_mode("predict")
test_dataset = MyDataset(batch_size) test_dataset = MyDataset(batch_size)
predict_dataloader = engine.dataloader(test_dataset, predict_dataloader = engine.dataloader(test_dataset, batch_size=batch_size)
engine.prepare()
for data in predict_dataloader:
outs = engine.run(data, feed=feed_dict)
# save
temp_dir = tempfile.TemporaryDirectory()
model_filename = os.path.join(temp_dir.name, 'mlp')
engine.save(model_filename, training=True)
engine.load(model_filename)
temp_dir.cleanup()
# Build dataloader from generator
# train
train_dataset = MyDataset(batch_num * batch_size)
train_dataloader = engine.dataloader_from_generator(train_dataset,
batch_size=batch_size,
mode="train")
engine.prepare(mode="train")
for data in train_dataloader:
outs = engine.run(data, feed=feed_dict, mode="train")
# eval
engine.to_mode("eval")
eval_dataset2 = MyDataset(batch_size)
eval_dataloader = engine.dataloader_from_generator(eval_dataset2,
batch_size=batch_size)
engine.prepare()
for data in eval_dataloader:
outs = engine.run(data, feed=feed_dict)
# predict
test_dataset = MyDataset(batch_size)
predict_dataloader = engine.dataloader_from_generator(test_dataset,
batch_size=batch_size, batch_size=batch_size,
mode="predict") mode="predict")
for _ in predict_dataloader: engine.prepare(mode="predict")
outs = engine(mode="predict") for data in predict_dataloader:
outs = engine.run(data, feed=feed_dict, mode="predict")
# save # save
temp_dir = tempfile.TemporaryDirectory() temp_dir = tempfile.TemporaryDirectory()
...@@ -185,7 +253,108 @@ def train_callable(): ...@@ -185,7 +253,108 @@ def train_callable():
temp_dir.cleanup() temp_dir.cleanup()
def train_builtin_data_vars():
mlp = MLPLayer(hidden_size=hidden_size,
intermediate_size=4 * hidden_size,
dropout_ratio=0.1,
initializer_range=0.02)
loss = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(learning_rate=0.00001,
beta1=0.9,
beta2=0.999,
epsilon=1e-08,
grad_clip=None)
metric = paddle.metric.Accuracy()
strategy = auto.Strategy()
strategy.auto_mode = "semi"
engine = auto.Engine(mlp, loss, optimizer, metric, strategy=strategy)
# train
engine.to_mode("train")
input_spec = static.InputSpec([batch_size, image_size], 'float32', 'input')
label_spec = static.InputSpec([batch_size, 1], 'int64', 'label')
engine.prepare(inputs_spec=[input_spec], labels_spec=[label_spec])
with static.program_guard(engine.main_program, engine.startup_program):
feed_list = engine.inputs + engine.labels
print(feed_list)
loader = paddle.io.DataLoader.from_generator(feed_list=feed_list,
capacity=4 * batch_size,
iterable=False)
places = static.cuda_places()
loader.set_batch_generator(batch_generator_creator(), places=places)
for _ in range(epoch_num):
loader.start() # call DataLoader.start() before each epoch starts
try:
while True:
engine.run()
except paddle.fluid.core.EOFException:
loader.reset(
) # call DataLoader.reset() after catching EOFException
def train_non_builtin_data_vars():
main_program = static.Program()
startup_program = static.Program()
with static.program_guard(main_program,
startup_program), utils.unique_name.guard():
input = static.data(name="input",
shape=[batch_size, image_size],
dtype='float32')
label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
loader = paddle.io.DataLoader.from_generator(feed_list=[input, label],
capacity=4 * batch_size,
iterable=False)
places = static.cuda_places()
loader.set_batch_generator(batch_generator_creator(), places=places)
mlp = MLPLayer(hidden_size=hidden_size,
intermediate_size=4 * hidden_size,
dropout_ratio=0.1,
initializer_range=0.02)
loss = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(learning_rate=0.00001,
beta1=0.9,
beta2=0.999,
epsilon=1e-08,
grad_clip=None)
metric = paddle.metric.Accuracy()
predict = mlp(input)
loss_var = loss(predict, label)
strategy = auto.Strategy()
strategy.auto_mode = "semi"
engine = auto.Engine(loss=loss_var,
optimizer=optimizer,
metrics=metric,
strategy=strategy)
# train
engine.to_mode("train")
engine.prepare(inputs=[input],
labels=[label],
main_program=main_program,
startup_program=startup_program)
for _ in range(epoch_num):
loader.start() # call DataLoader.start() before each epoch starts
try:
while True:
engine.run()
except paddle.fluid.core.EOFException:
loader.reset(
) # call DataLoader.reset() after catching EOFException
if __name__ == "__main__": if __name__ == "__main__":
train(fetch=True) train_high_level(fetch=True)
train(fetch=False) train_high_level(fetch=False)
train_callable() train_low_level()
train_builtin_data_vars()
train_non_builtin_data_vars()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册