“e983cc90fcee4e5b73bce9d4853b85aac4661e3a”上不存在“paddle/utils/string/printf.h”
未验证 提交 f3b7092c 编写于 作者: Z zhaoyingli 提交者: GitHub

make places configurable for DistributedDataLoader (#55873)

* Update autoparallel DistributedDataLoader

* add places for engine.dataloder()
上级 b19dfb8c
...@@ -983,6 +983,9 @@ class Completer: ...@@ -983,6 +983,9 @@ class Completer:
op_dist_attr.process_mesh = process_mesh op_dist_attr.process_mesh = process_mesh
original_op_dist_attr = copy.deepcopy(op_dist_attr) original_op_dist_attr = copy.deepcopy(op_dist_attr)
if serial_op.type == "create_py_reader":
continue
for arg_name in serial_op.input_arg_names: for arg_name in serial_op.input_arg_names:
serial_tensor = dist_op.get_serial_input(arg_name) serial_tensor = dist_op.get_serial_input(arg_name)
if not serial_tensor.is_parameter: if not serial_tensor.is_parameter:
......
...@@ -34,10 +34,6 @@ class DistributedDataLoaderBase(metaclass=abc.ABCMeta): ...@@ -34,10 +34,6 @@ class DistributedDataLoaderBase(metaclass=abc.ABCMeta):
def __iter__(self): def __iter__(self):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod
def __next__(self):
raise NotImplementedError
class DistributedDataLoaderFromGenerator(DistributedDataLoaderBase): class DistributedDataLoaderFromGenerator(DistributedDataLoaderBase):
def __init__( def __init__(
...@@ -260,23 +256,14 @@ class DistributedDataLoader(DistributedDataLoaderBase): ...@@ -260,23 +256,14 @@ class DistributedDataLoader(DistributedDataLoaderBase):
self.split_data = split_data self.split_data = split_data
# TODO: rank info # TODO: rank info
self.batch_sampler = DistributedBatchSampler( self.batch_sampler = DistributedBatchSampler(
self.dataset, dataset=self.dataset,
self.batch_size, batch_size=self.batch_size,
self.dp_world_sizes[0], num_replicas=self.dp_world_sizes[0],
self.dp_ranks[0], rank=self.dp_ranks[0],
self.shuffle, shuffle=self.shuffle,
self.drop_last, drop_last=self.drop_last,
) )
self._inner_dataloader = self._create_inner_dataloader() self._dataloader = paddle.io.DataLoader(
def __iter__(self):
return self
def __next__(self):
return next(self.data)
def _create_inner_dataloader(self):
dataloader = paddle.io.DataLoader(
self.dataset, self.dataset,
feed_list=self.feed_list, feed_list=self.feed_list,
places=self.places, places=self.places,
...@@ -289,6 +276,12 @@ class DistributedDataLoader(DistributedDataLoaderBase): ...@@ -289,6 +276,12 @@ class DistributedDataLoader(DistributedDataLoaderBase):
timeout=self.timeout, timeout=self.timeout,
worker_init_fn=self.worker_init_fn, worker_init_fn=self.worker_init_fn,
) )
self.data = (x for x in dataloader)
return dataloader def __len__(self):
return len(self._dataloader)
def __iter__(self):
return self._dataloader.__iter__()
def __call__(self):
return self._dataloader.__iter__()
...@@ -1255,6 +1255,7 @@ class Engine: ...@@ -1255,6 +1255,7 @@ class Engine:
steps_per_epoch=None, steps_per_epoch=None,
sample_split=1, sample_split=1,
mode=None, mode=None,
places=None,
): ):
if mode is not None: if mode is not None:
self.to_mode(mode) self.to_mode(mode)
...@@ -1281,6 +1282,7 @@ class Engine: ...@@ -1281,6 +1282,7 @@ class Engine:
worker_init_fn=worker_init_fn, worker_init_fn=worker_init_fn,
epochs=epochs, epochs=epochs,
steps_per_epoch=steps_per_epoch, steps_per_epoch=steps_per_epoch,
places=places,
) )
return dataloader return dataloader
...@@ -1418,6 +1420,7 @@ class Engine: ...@@ -1418,6 +1420,7 @@ class Engine:
worker_init_fn=None, worker_init_fn=None,
epochs=1, epochs=1,
steps_per_epoch=None, steps_per_epoch=None,
places=None,
): ):
dist_context = self._dist_contexts[self._mode] dist_context = self._dist_contexts[self._mode]
dist_main_prog = dist_context.dist_main_programs[self._cur_rank] dist_main_prog = dist_context.dist_main_programs[self._cur_rank]
...@@ -1440,7 +1443,6 @@ class Engine: ...@@ -1440,7 +1443,6 @@ class Engine:
feed_list.append(copy_var) feed_list.append(copy_var)
# insert read op at the end of program # insert read op at the end of program
places = paddle.static.cuda_places()
with static.program_guard(dist_main_prog, dist_startup_prog): with static.program_guard(dist_main_prog, dist_startup_prog):
dataloader = DistributedDataLoader( dataloader = DistributedDataLoader(
dataset, dataset,
......
...@@ -117,6 +117,9 @@ class MLPLayer(nn.Layer): ...@@ -117,6 +117,9 @@ class MLPLayer(nn.Layer):
def train_high_level(fetch): def train_high_level(fetch):
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
global is_fetch global is_fetch
is_fetch = fetch is_fetch = fetch
mlp = MLPLayer( mlp = MLPLayer(
...@@ -169,6 +172,9 @@ def train_high_level(fetch): ...@@ -169,6 +172,9 @@ def train_high_level(fetch):
def train_low_level(): def train_low_level():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
mlp = MLPLayer( mlp = MLPLayer(
hidden_size=hidden_size, hidden_size=hidden_size,
intermediate_size=4 * hidden_size, intermediate_size=4 * hidden_size,
...@@ -194,7 +200,7 @@ def train_low_level(): ...@@ -194,7 +200,7 @@ def train_low_level():
for feed_var, shape in my_feed_vars: for feed_var, shape in my_feed_vars:
feed_dict[feed_var.name] = np.zeros(shape, dtype="float32") feed_dict[feed_var.name] = np.zeros(shape, dtype="float32")
# Build normal normal dataloader # Build normal dataloader
# train # train
train_dataset = MyDataset(batch_num * batch_size) train_dataset = MyDataset(batch_num * batch_size)
train_dataloader = engine.dataloader( train_dataloader = engine.dataloader(
...@@ -266,6 +272,9 @@ def train_low_level(): ...@@ -266,6 +272,9 @@ def train_low_level():
def train_builtin_data_vars(): def train_builtin_data_vars():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
mlp = MLPLayer( mlp = MLPLayer(
hidden_size=hidden_size, hidden_size=hidden_size,
intermediate_size=4 * hidden_size, intermediate_size=4 * hidden_size,
...@@ -314,6 +323,9 @@ def train_builtin_data_vars(): ...@@ -314,6 +323,9 @@ def train_builtin_data_vars():
def train_non_builtin_data_vars(): def train_non_builtin_data_vars():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
main_program = static.Program() main_program = static.Program()
startup_program = static.Program() startup_program = static.Program()
with static.program_guard( with static.program_guard(
...@@ -373,6 +385,9 @@ def train_non_builtin_data_vars(): ...@@ -373,6 +385,9 @@ def train_non_builtin_data_vars():
def get_cost(): def get_cost():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
main_program = static.Program() main_program = static.Program()
startup_program = static.Program() startup_program = static.Program()
with static.program_guard( with static.program_guard(
...@@ -424,6 +439,9 @@ def get_cost(): ...@@ -424,6 +439,9 @@ def get_cost():
def get_cost_by_default_program(): def get_cost_by_default_program():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
main_program = static.default_main_program() main_program = static.default_main_program()
startup_program = static.default_startup_program() startup_program = static.default_startup_program()
with static.program_guard( with static.program_guard(
...@@ -433,6 +451,9 @@ def get_cost_by_default_program(): ...@@ -433,6 +451,9 @@ def get_cost_by_default_program():
name="input", shape=[batch_size, image_size], dtype='float32' name="input", shape=[batch_size, image_size], dtype='float32'
) )
label = static.data(name="label", shape=[batch_size, 1], dtype='int64') label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
auto.shard_tensor(
input, process_mesh=PP_MESH_0, shard_spec=[None, None]
)
loader = paddle.fluid.io.DataLoader.from_generator( loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=False feed_list=[input, label], capacity=4 * batch_size, iterable=False
...@@ -468,6 +489,9 @@ def get_cost_by_default_program(): ...@@ -468,6 +489,9 @@ def get_cost_by_default_program():
def get_cost_by_spec(): def get_cost_by_spec():
paddle.distributed.auto_parallel.static.dist_context.set_default_distributed_context(
None
)
mlp = MLPLayer( mlp = MLPLayer(
hidden_size=hidden_size, hidden_size=hidden_size,
intermediate_size=4 * hidden_size, intermediate_size=4 * hidden_size,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册