From f3b7092c7267f1c27b7568ab0888cdb3ea9e0ddd Mon Sep 17 00:00:00 2001 From: zhaoyingli <86812880+zhaoyinglia@users.noreply.github.com> Date: Wed, 2 Aug 2023 10:34:05 +0800 Subject: [PATCH] make places configurable for DistributedDataLoader (#55873) * Update autoparallel DistributedDataLoader * add places for engine.dataloder() --- .../auto_parallel/static/completion.py | 3 ++ .../auto_parallel/static/dist_loader.py | 37 ++++++++----------- .../auto_parallel/static/engine.py | 4 +- test/auto_parallel/engine_api.py | 26 ++++++++++++- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/python/paddle/distributed/auto_parallel/static/completion.py b/python/paddle/distributed/auto_parallel/static/completion.py index 1f5ccc94c3c..acc7f512e7f 100644 --- a/python/paddle/distributed/auto_parallel/static/completion.py +++ b/python/paddle/distributed/auto_parallel/static/completion.py @@ -983,6 +983,9 @@ class Completer: op_dist_attr.process_mesh = process_mesh original_op_dist_attr = copy.deepcopy(op_dist_attr) + if serial_op.type == "create_py_reader": + continue + for arg_name in serial_op.input_arg_names: serial_tensor = dist_op.get_serial_input(arg_name) if not serial_tensor.is_parameter: diff --git a/python/paddle/distributed/auto_parallel/static/dist_loader.py b/python/paddle/distributed/auto_parallel/static/dist_loader.py index e0343005824..8db3d21e3ad 100644 --- a/python/paddle/distributed/auto_parallel/static/dist_loader.py +++ b/python/paddle/distributed/auto_parallel/static/dist_loader.py @@ -34,10 +34,6 @@ class DistributedDataLoaderBase(metaclass=abc.ABCMeta): def __iter__(self): raise NotImplementedError - @abc.abstractmethod - def __next__(self): - raise NotImplementedError - class DistributedDataLoaderFromGenerator(DistributedDataLoaderBase): def __init__( @@ -260,23 +256,14 @@ class DistributedDataLoader(DistributedDataLoaderBase): self.split_data = split_data # TODO: rank info self.batch_sampler = DistributedBatchSampler( - self.dataset, - self.batch_size, - self.dp_world_sizes[0], - self.dp_ranks[0], - self.shuffle, - self.drop_last, + dataset=self.dataset, + batch_size=self.batch_size, + num_replicas=self.dp_world_sizes[0], + rank=self.dp_ranks[0], + shuffle=self.shuffle, + drop_last=self.drop_last, ) - self._inner_dataloader = self._create_inner_dataloader() - - def __iter__(self): - return self - - def __next__(self): - return next(self.data) - - def _create_inner_dataloader(self): - dataloader = paddle.io.DataLoader( + self._dataloader = paddle.io.DataLoader( self.dataset, feed_list=self.feed_list, places=self.places, @@ -289,6 +276,12 @@ class DistributedDataLoader(DistributedDataLoaderBase): timeout=self.timeout, worker_init_fn=self.worker_init_fn, ) - self.data = (x for x in dataloader) - return dataloader + def __len__(self): + return len(self._dataloader) + + def __iter__(self): + return self._dataloader.__iter__() + + def __call__(self): + return self._dataloader.__iter__() diff --git a/python/paddle/distributed/auto_parallel/static/engine.py b/python/paddle/distributed/auto_parallel/static/engine.py index 4c0c2e2e473..84670977cd7 100644 --- a/python/paddle/distributed/auto_parallel/static/engine.py +++ b/python/paddle/distributed/auto_parallel/static/engine.py @@ -1255,6 +1255,7 @@ class Engine: steps_per_epoch=None, sample_split=1, mode=None, + places=None, ): if mode is not None: self.to_mode(mode) @@ -1281,6 +1282,7 @@ class Engine: worker_init_fn=worker_init_fn, epochs=epochs, steps_per_epoch=steps_per_epoch, + places=places, ) return dataloader @@ -1418,6 +1420,7 @@ class Engine: worker_init_fn=None, epochs=1, steps_per_epoch=None, + places=None, ): dist_context = self._dist_contexts[self._mode] dist_main_prog = dist_context.dist_main_programs[self._cur_rank] @@ -1440,7 +1443,6 @@ class Engine: feed_list.append(copy_var) # insert read op at the end of program - places = paddle.static.cuda_places() with static.program_guard(dist_main_prog, dist_startup_prog): dataloader = DistributedDataLoader( dataset, diff --git a/test/auto_parallel/engine_api.py b/test/auto_parallel/engine_api.py index a2725a57b8e..38e19468a6b 100644 --- a/test/auto_parallel/engine_api.py +++ b/test/auto_parallel/engine_api.py @@ -117,6 +117,9 @@ class MLPLayer(nn.Layer): def train_high_level(fetch): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) global is_fetch is_fetch = fetch mlp = MLPLayer( @@ -169,6 +172,9 @@ def train_high_level(fetch): def train_low_level(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) mlp = MLPLayer( hidden_size=hidden_size, intermediate_size=4 * hidden_size, @@ -194,7 +200,7 @@ def train_low_level(): for feed_var, shape in my_feed_vars: feed_dict[feed_var.name] = np.zeros(shape, dtype="float32") - # Build normal normal dataloader + # Build normal dataloader # train train_dataset = MyDataset(batch_num * batch_size) train_dataloader = engine.dataloader( @@ -266,6 +272,9 @@ def train_low_level(): def train_builtin_data_vars(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) mlp = MLPLayer( hidden_size=hidden_size, intermediate_size=4 * hidden_size, @@ -314,6 +323,9 @@ def train_builtin_data_vars(): def train_non_builtin_data_vars(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) main_program = static.Program() startup_program = static.Program() with static.program_guard( @@ -373,6 +385,9 @@ def train_non_builtin_data_vars(): def get_cost(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) main_program = static.Program() startup_program = static.Program() with static.program_guard( @@ -424,6 +439,9 @@ def get_cost(): def get_cost_by_default_program(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) main_program = static.default_main_program() startup_program = static.default_startup_program() with static.program_guard( @@ -433,6 +451,9 @@ def get_cost_by_default_program(): name="input", shape=[batch_size, image_size], dtype='float32' ) label = static.data(name="label", shape=[batch_size, 1], dtype='int64') + auto.shard_tensor( + input, process_mesh=PP_MESH_0, shard_spec=[None, None] + ) loader = paddle.fluid.io.DataLoader.from_generator( feed_list=[input, label], capacity=4 * batch_size, iterable=False @@ -468,6 +489,9 @@ def get_cost_by_default_program(): def get_cost_by_spec(): + paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context( + None + ) mlp = MLPLayer( hidden_size=hidden_size, intermediate_size=4 * hidden_size, -- GitLab