From 71cb016cad456c6a314885ee0087dac05db03dbe Mon Sep 17 00:00:00 2001 From: zhaoyingli <86812880+zhaoyinglia@users.noreply.github.com> Date: Mon, 7 Mar 2022 11:43:47 +0800 Subject: [PATCH] [AutoParallel]engine support pp (#40084) * engine support pp * fix format * avoid multi print * fix convert * bug fix * add pp unittest --- .../distributed/auto_parallel/engine.py | 24 +++- .../paddle/distributed/auto_parallel/utils.py | 9 +- python/paddle/distributed/utils.py | 16 ++- .../unittests/auto_parallel/CMakeLists.txt | 3 +- .../unittests/auto_parallel/engine_api.py | 132 ++++++++++++++++++ .../auto_parallel/test_engine_api.py | 127 +++-------------- 6 files changed, 186 insertions(+), 125 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index 8efb9eb7192..56beb895741 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -99,11 +99,11 @@ class Engine: all_ranks = world_process_group.ranks for rank in all_ranks: self._parallel(rank) - place = _get_device() - if isinstance(place, fluid.CUDAPlace): + self._place = _get_device() + if isinstance(self._place, fluid.CUDAPlace): self._place = fluid.CUDAPlace(ParallelEnv().dev_id) if self._executor is None: - self._executor = fluid.Executor(place) + self._executor = paddle.static.Executor(self._place) def _build(self): serial_main_prog = self._serial_main_progs.get(self.mode, None) @@ -119,12 +119,13 @@ class Engine: labels = [s._create_feed_layer() for s in to_list(labels_spec)] self._input_vars = inputs self._label_vars = labels - feed_list = self._input_vars + self._label_vars + self._feed_vars = self._input_vars + self._label_vars outputs = to_list(self.model(*inputs)) if self.mode != "predict" and self.loss: loss = self.loss(*(outputs + labels)) self._loss_var = loss + self._fetch_vars = {"outputs": outputs, "loss": loss} self._serial_main_progs[self.mode] = serial_main_prog self._serial_startup_progs[self.mode] = serial_startup_prog self._dist_contexts[self.mode] = DistributedContext( @@ -278,19 +279,32 @@ class Engine: dist_startup_prog = self._dist_startup_progs[self.mode][self._cur_rank] dist_context = self._dist_contexts[self.mode] dist_main_block = dist_main_prog.global_block() + serial_main_prog = self._serial_main_progs[self.mode] + serial_main_block = serial_main_prog.global_block() op_size = len(dist_main_block.ops) places = paddle.static.cuda_places() with fluid.program_guard(dist_main_prog, dist_startup_prog): dataloader = NonIterableGeneratorLoader( dataset, feed_list, places, batch_size, epochs, steps_per_epoch) new_op_size = len(dist_main_block.ops) - for idx in range(new_op_size - 1, op_size - 1, -1): + for _ in range(new_op_size - 1, op_size - 1, -1): op = dist_main_block.ops[new_op_size - 1] new_op_desc = dist_main_block.desc._prepend_op() new_op_desc.copy_from(op.desc) new_op = Operator( dist_main_block, new_op_desc, type=new_op_desc.type()) dist_main_block.ops.insert(0, new_op) + for in_name in new_op.input_arg_names: + if in_name == "lod_tensor_blocking_queue_0": + continue + if in_name not in dist_main_block.vars: + in_var = serial_main_block._var_recursive(in_name) + dist_main_block._clone_variable(in_var, in_var.persistable) + for out_name in new_op.output_arg_names: + if out_name not in dist_main_block.vars: + out_var = serial_main_block._var_recursive(out_name) + dist_main_block._clone_variable(out_var, + out_var.persistable) dist_op = DistributedOperator(new_op) dist_context.add_dist_op_for_program(dist_op) for _ in range(new_op_size - op_size): diff --git a/python/paddle/distributed/auto_parallel/utils.py b/python/paddle/distributed/auto_parallel/utils.py index 75e0ae251ef..241eadcbace 100644 --- a/python/paddle/distributed/auto_parallel/utils.py +++ b/python/paddle/distributed/auto_parallel/utils.py @@ -22,7 +22,6 @@ import logging from functools import reduce import paddle.fluid.core as core -from paddle.framework.io import _to_LodTensor from paddle.distributed.fleet.meta_optimizers.common import OpRole from paddle.fluid.io import is_parameter, is_belong_to_optimizer from paddle.distributed.auto_parallel.dist_attribute import TensorDistributedAttribute, OperatorDistributedAttribute @@ -739,7 +738,7 @@ def merge_and_slice_parameter(dist_param_dict, pre_dist_attr, cur_dist_attr): rank_id = paddle.distributed.get_rank() index = cur_attr["process_group"].index(rank_id) param = dist_param_dict[var_name][index] - dist_param_dict[var_name] = _to_LodTensor(param) + dist_param_dict[var_name] = param continue pre_param = dist_param_dict[var_name] @@ -751,7 +750,7 @@ def merge_and_slice_parameter(dist_param_dict, pre_dist_attr, cur_dist_attr): dist_param_dict[var_name] = complete_param else: complete_param = pre_param[0] - dist_param_dict[var_name] = _to_LodTensor(complete_param) + dist_param_dict[var_name] = complete_param if len(set(cur_dims_mapping)) > 1 or -1 not in cur_dims_mapping: sliced_param = _slice_parameter_with_dist_attr(complete_param, @@ -798,7 +797,7 @@ def _merge_parameter_with_dist_attr(param_list, dist_attr): assert len(partition_param_list) == 1 or not partition_param_list, \ "Fail to merge parameter" - complete_param = _to_LodTensor(partition_param_list[0][0]) + complete_param = partition_param_list[0][0] return complete_param @@ -818,7 +817,7 @@ def _slice_parameter_with_dist_attr(param, dist_attr): rank_id = paddle.distributed.get_rank() sliced_param_index = _get_sliced_param_index( rank_id, param.shape, dims_mapping, process_shape, process_group) - sliced_param = _to_LodTensor(sliced_param_list[sliced_param_index]) + sliced_param = sliced_param_list[sliced_param_index] return sliced_param diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index 53f4a93f648..ae40a42e9d5 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -546,13 +546,15 @@ class Pod(object): def get_logger(log_level, name="root"): logger = logging.getLogger(name) - logger.setLevel(log_level) - - log_handler = logging.StreamHandler() - log_format = logging.Formatter( - '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') - log_handler.setFormatter(log_format) - logger.addHandler(log_handler) + # Avoid printing multiple logs + if not logger.handlers: + logger.setLevel(log_level) + + log_handler = logging.StreamHandler() + log_format = logging.Formatter( + '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') + log_handler.setFormatter(log_format) + logger.addHandler(log_handler) return logger diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 0a9eaf34ba5..80bc206ae7b 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -5,7 +5,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU) set_tests_properties(test_auto_parallel_relaunch PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) py_test_modules(test_relaunch_with_planner MODULES test_relaunch_with_planner ENVS ${dist_ENVS}) set_tests_properties(test_relaunch_with_planner PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) - py_test_modules(test_relaunch_with_gpt_planner MODULES test_relaunch_with_planner ENVS ${dist_ENVS}) + py_test_modules(test_relaunch_with_gpt_planner MODULES test_relaunch_with_gpt_planner ENVS ${dist_ENVS}) set_tests_properties(test_relaunch_with_gpt_planner PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 240) py_test_modules(test_engine_api MODULES test_engine_api ENVS ${dist_ENVS}) + set_tests_properties(test_engine_api PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py new file mode 100644 index 00000000000..8c71c792bf0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -0,0 +1,132 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import time +import paddle.fluid as fluid +import copy +import os +import numpy as np +import subprocess +import paddle +import paddle.nn as nn +import paddle.fluid as fluid +import paddle.static as static +import paddle.nn.functional as F +import paddle.utils as utils +from paddle.fluid import layers +from paddle.io import Dataset, IterableDataset, DataLoader +from paddle.static import InputSpec +from paddle.distributed import fleet +import paddle.distributed.auto_parallel as auto +from paddle.distributed.auto_parallel.engine import Engine + +paddle.enable_static() +global_process_mesh = auto.ProcessMesh(mesh=[0, 1]) +PP_MESH_0 = auto.ProcessMesh([0]) +PP_MESH_1 = auto.ProcessMesh([1]) +batch_size = 1 +batch_num = 10 +hidden_size = 1024 +sequence_len = 512 +image_size = hidden_size +class_num = 10 + +paddle.seed(44) + + +class MyDataset(Dataset): + def __init__(self, num_samples): + super(MyDataset, self).__init__() + self.num_samples = num_samples + + def __getitem__(self, index): + input = np.random.uniform(size=image_size).astype("float32") + label = np.random.randint(0, class_num - 1, dtype="int64") + return input, label + + def __len__(self): + return self.num_samples + + +class MLPLayer(nn.Layer): + def __init__(self, + hidden_size=1024, + intermediate_size=4 * 1024, + dropout_ratio=0.1, + initializer_range=0.02): + super(MLPLayer, self).__init__() + d_model = hidden_size + dim_feedforward = intermediate_size + weight_attr = paddle.ParamAttr(initializer=nn.initializer.Normal( + mean=0.0, std=initializer_range)) + bias_attr = None + + self.linear0 = nn.Linear( + d_model, dim_feedforward, weight_attr, bias_attr=bias_attr) + self.linear1 = nn.Linear( + dim_feedforward, d_model, weight_attr, bias_attr=bias_attr) + self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr) + self.norm = nn.LayerNorm(d_model, epsilon=1e-5) + self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train") + + def forward(self, input): + out = auto.shard_op( + self.norm, dist_attr={"process_mesh": PP_MESH_0})(input)[0] + out = self.linear0(input) + out = F.gelu(out, approximate=True) + out = auto.shard_op( + self.linear1, dist_attr={"process_mesh": PP_MESH_1})(out)[0] + out = self.dropout(out) + out = self.linear2(out) + return out + + +def train(): + mlp = MLPLayer( + hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02) + loss = paddle.nn.CrossEntropyLoss() + optimizer = paddle.fluid.optimizer.AdamOptimizer( + learning_rate=0.00001, + beta1=0.9, + beta2=0.999, + epsilon=1e-08, + grad_clip=None) + + dataset = MyDataset(batch_num * batch_size) + data_spec = [ + InputSpec([batch_size, hidden_size], 'float32', 'x'), + InputSpec([batch_size], 'int64', 'label') + ] + + dist_strategy = fleet.DistributedStrategy() + dist_strategy.amp = False + dist_strategy.pipeline = False + dist_strategy.recompute = False + # init parallel optimizer + dist_strategy.semi_auto = True + fleet.init(is_collective=True, strategy=dist_strategy) + + engine = Engine(mlp, data_spec, strategy=dist_strategy) + engine.prepare(optimizer, loss) + engine.fit(dataset, + batch_size=batch_size, + steps_per_epoch=batch_num * batch_size) + + +if __name__ == "__main__": + train() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py index 0fc1ea41033..a7d51a7e176 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,122 +13,35 @@ # limitations under the License. import unittest -import time -import paddle.fluid as fluid -import copy import os -import numpy as np +import sys +import shutil import subprocess -import paddle -import paddle.nn as nn -import paddle.fluid as fluid -import paddle.static as static -import paddle.nn.functional as F -import paddle.utils as utils -from paddle.fluid import layers -from paddle.io import Dataset, IterableDataset, DataLoader -from paddle.static import InputSpec -from paddle.distributed import fleet -import paddle.distributed.auto_parallel as auto -from paddle.distributed.auto_parallel.engine import Engine - -paddle.enable_static() -global_process_mesh = auto.ProcessMesh(mesh=[0]) -batch_size = 1 -batch_num = 10 -hidden_size = 1024 -sequence_len = 512 -image_size = hidden_size -class_num = 10 - -paddle.seed(44) - - -class MyDataset(Dataset): - def __init__(self, num_samples): - super(MyDataset, self).__init__() - self.num_samples = num_samples - - def __getitem__(self, index): - input = np.random.uniform(size=image_size).astype("float32") - label = np.random.randint(0, class_num - 1, dtype="int64") - return input, label - - def __len__(self): - return self.num_samples - - -class MLPLayer(nn.Layer): - def __init__(self, - hidden_size=1024, - intermediate_size=4 * 1024, - dropout_ratio=0.1, - initializer_range=0.02): - super(MLPLayer, self).__init__() - d_model = hidden_size - dim_feedforward = intermediate_size - weight_attr = paddle.ParamAttr(initializer=nn.initializer.Normal( - mean=0.0, std=initializer_range)) - bias_attr = None - - self.linear0 = nn.Linear( - d_model, dim_feedforward, weight_attr, bias_attr=bias_attr) - self.linear1 = nn.Linear( - dim_feedforward, d_model, weight_attr, bias_attr=bias_attr) - self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr) - # self.norm = nn.LayerNorm(d_model, epsilon=1e-5) - # self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train") - - def forward(self, input): - auto.shard_tensor( - input, - dist_attr={ - "process_mesh": global_process_mesh, - "dims_mappig": [-1] - }) - # out = self.norm(input) - out = self.linear0(input) - out = F.gelu(out, approximate=True) - out = self.linear1(out) - # out = self.dropout(out) - out = self.linear2(out) - return out +from paddle.distributed.fleet.launch_utils import run_with_coverage class TestEngineAPI(unittest.TestCase): def test_engine_api(self): - mlp = MLPLayer( - hidden_size=hidden_size, - intermediate_size=4 * hidden_size, - dropout_ratio=0.1, - initializer_range=0.02) - loss = paddle.nn.CrossEntropyLoss() - optimizer = paddle.fluid.optimizer.AdamOptimizer( - learning_rate=0.00001, - beta1=0.9, - beta2=0.999, - epsilon=1e-08, - grad_clip=None) + file_dir = os.path.dirname(os.path.abspath(__file__)) + launch_model_path = os.path.join(file_dir, "engine_api.py") + + if os.environ.get("WITH_COVERAGE", "OFF") == "ON": + coverage_args = ["-m", "coverage", "run", "--branch", "-p"] + else: + coverage_args = [] - dataset = MyDataset(batch_num * batch_size) - data_spec = [ - InputSpec([batch_size, hidden_size], 'float32', 'x'), - InputSpec([batch_size], 'int64', 'label') + cmd = [sys.executable, "-u"] + coverage_args + [ + "-m", "launch", "--gpus", "0,1", launch_model_path ] - dist_strategy = fleet.DistributedStrategy() - dist_strategy.amp = False - dist_strategy.pipeline = False - dist_strategy.recompute = False - # init parallel optimizer - dist_strategy.semi_auto = True - fleet.init(is_collective=True, strategy=dist_strategy) + process = subprocess.Popen(cmd) + process.wait() + self.assertEqual(process.returncode, 0) - engine = Engine(mlp, data_spec, strategy=dist_strategy) - engine.prepare(optimizer, loss) - engine.fit(dataset, - batch_size=batch_size, - steps_per_epoch=batch_num * batch_size) + # Remove unnecessary files + log_path = os.path.join(file_dir, "log") + if os.path.exists(log_path): + shutil.rmtree(log_path) if __name__ == "__main__": -- GitLab