From 3650c4a85dd2a0e52885897cfa4a3303a7f45645 Mon Sep 17 00:00:00 2001 From: liuzhenhai93 Date: Wed, 26 Apr 2023 19:59:48 +0800 Subject: [PATCH] =?UTF-8?q?=20pp=20=E7=AD=96=E7=95=A5=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E5=90=8E=EF=BC=8C=E6=A8=A1=E5=9E=8B=E8=BD=AC=E6=8D=A2=EF=BC=8C?= =?UTF-8?q?=E4=BB=A5=E4=BE=BF=E6=A8=A1=E5=9E=8B=E7=83=AD=E5=90=AF=20(#5292?= =?UTF-8?q?7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish * polish --- .../parallel_layers/pp_layers.py | 4 +- .../fleet/utils/pp_parallel_adaptor.py | 612 ++++++++++++++++++ .../unittests/collective/fleet/CMakeLists.txt | 13 + .../fleet/hybrid_parallel_pp_transformer.py | 4 +- .../hybrid_parallel_pp_transformer_save.py | 101 +++ ..._pp_transformer_save_with_virtual_stage.py | 105 +++ ...allel_pp_transformer_with_virtual_stage.py | 5 +- .../fleet/test_parallel_dygraph_pp_adaptor.py | 163 +++++ .../unittests/collective/fleet/testslist.csv | 1 + python/paddle/framework/io.py | 9 +- test/auto_parallel/CMakeLists.txt | 2 +- test/distributed_passes/CMakeLists.txt | 2 +- test/ir/inference/CMakeLists.txt | 2 +- 13 files changed, 1012 insertions(+), 11 deletions(-) create mode 100644 python/paddle/distributed/fleet/utils/pp_parallel_adaptor.py create mode 100644 python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save.py create mode 100644 python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save_with_virtual_stage.py create mode 100644 python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_pp_adaptor.py diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index f3be9894a9c..bc403da76ba 100755 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -564,7 +564,7 @@ class PipelineLayer(nn.Layer): self.segment_parts = seg.do_segment() logger.info( - "segment result:" + f"segment with method: {seg_method}; result: " + ", ".join(str(arg) for arg in self.segment_parts) ) @@ -594,7 +594,7 @@ class PipelineLayer(nn.Layer): self.segment_parts = seg.do_segment() logger.info( - "segment result:" + f"segment with method: {seg_method}; result: " + ", ".join(str(arg) for arg in self.segment_parts) ) diff --git a/python/paddle/distributed/fleet/utils/pp_parallel_adaptor.py b/python/paddle/distributed/fleet/utils/pp_parallel_adaptor.py new file mode 100644 index 00000000000..9fa1caf35a5 --- /dev/null +++ b/python/paddle/distributed/fleet/utils/pp_parallel_adaptor.py @@ -0,0 +1,612 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import math +import re +import shutil +from collections import OrderedDict + +import paddle + + +class ParallelConfig: + def __init__(self, mp: int, pp: int, vpp: int = 1, sharding: int = 1): + self.mp = mp + self.pp = pp + self.vpp = vpp + self.sharding = sharding + + def pipe_parallel_group(self, i: int, j: int): + ans = [] + for k in range(self.pp): + ans.append((i, j, k)) + return ans + + +class LayerReNamingHelper: + def __init__(self, template: str): + self._template = template + self._i = -1 + self._last_old_layer_name = None + + def get_new_layer_name(self, old_layer_name: str): + old_layer_name = old_layer_name.split(".")[0] + if ( + self._last_old_layer_name is None + or old_layer_name != self._last_old_layer_name + ): + self._i = self._i + 1 + self._last_old_layer_name = old_layer_name + return self._template.format(self._i) + + +class LayerReNamingManager: + def __init__(self): + self._renaming_helpers = OrderedDict() + self._renaming_helpers["linear"] = LayerReNamingHelper("linear_{}") + self._renaming_helpers["layer_norm"] = LayerReNamingHelper( + "layer_norm_{}" + ) + self._renaming_helpers["embedding"] = LayerReNamingHelper( + "embedding_{}" + ) + + def get_new_layer_name(self, old_name: str): + layer_name = "" + for (k, v) in self._renaming_helpers.items(): + if old_name.startswith(k): + layer_name = v.get_new_layer_name(old_name) + break + return layer_name + + def get_new_param_name(self, old_name: str): + names = old_name.split(".") + layer_name = self.get_new_layer_name(names[0]) + assert layer_name, f"can not rename layer {names[0]}" + names[0] = layer_name + return ".".join(names) + + +class PipeLineModelAdaptor: + def __init__( + self, + src_parallel_config: ParallelConfig, + dst_parallel_config: ParallelConfig, + transformer_layer_num: int, + segment_method: str = "layer", + ): + self._src_parallel_config = src_parallel_config + self._dst_parallel_config = dst_parallel_config + self._transformer_layer_num = transformer_layer_num + self._segment_method = segment_method + + def apply(self, src_model_path: str, dst_model_path: str): + for i in range(self._src_parallel_config.mp): + for j in range(self._src_parallel_config.sharding): + # TODO(liuzhenhai): use multiple processs + layers = [] + + # 1、extract layers in the same pp group + group = self._src_parallel_config.pipe_parallel_group(i, j) + src_dirs = [ + "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( + src_model_path, *e + ) + for e in group + ] + # first rank extract shared layer + with_shared = True + for dir in src_dirs: + print("extract layer params in dir %s" % dir) + layers.extend(self.extract_layers(dir, with_shared)) + with_shared = False + # 2、sort and unique layers + layers = self.sort_layers(layers) + + # 3、resplit layers among pp group according new pp config + layer_segments = self.segment_layers( + layers, self._dst_parallel_config, self._segment_method + ) + dst_group = self._dst_parallel_config.pipe_parallel_group(i, j) + dst_dirs = [ + "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( + dst_model_path, *e + ) + for e in dst_group + ] + + # 4、merge layers belonging to the same node + for (layer_segment, dir_) in zip(layer_segments, dst_dirs): + print(f"merge {len(layer_segment)} layers to {dir_}") + self.merge_layers(layer_segment, dir_) + + # 5、copy meta_state.pdopt + for (src_dir, dst_dir) in zip(src_dirs, dst_dirs): + shutil.copyfile( + f"{src_dir}/meta_state.pdopt", + f"{dst_dir}/meta_state.pdopt", + ) + + def peek_model(self, model_dir: str): + for i in range(self._src_parallel_config.mp): + for j in range(self._src_parallel_config.sharding): + group = self._src_parallel_config.pipe_parallel_group(i, j) + dirs = [ + "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( + model_dir, *e + ) + for e in group + ] + for dir in dirs: + print(f"peek partial model in {dir}:") + self.peek_partial_model(dir) + + def peek_partial_model(self, sub_dir: str): + state_dict = paddle.load(f"{sub_dir}/model.pdparams") + for (k, v) in state_dict.items(): + print(f"\t{k} -> {v.name}") + + def extract_layers(self, dir: str, with_shared: bool): + opt = paddle.load(dir + "/model_state.pdopt") + params = paddle.load(dir + "/model.pdparams") + shared_layer_parsed = False + # tname -> (layer, param_name) + tname_to_layer_and_pname = {} + for (k, v) in params.items(): + layer = self._extract_layer_name(k) + assert layer + # special treatment for embedding layer, skip duplicated shared layer + # shared layer may exist or not, if it exist it share weight with _layers.0 + # _layers.shared_layers.embed.word_embeddings.weight -> embedding_0.w_0 + # _layers.shared_layers.embed.position_embeddings.weight -> embedding_1.w_0 + # _layers.0.word_embeddings.weight -> embedding_0.w_0 + # _layers.0.position_embeddings.weight -> embedding_1.w_0 + shared_layer_parsed = shared_layer_parsed or ( + "_layers.shared_layers" in layer + ) + if ( + "_layers.shared_layers" not in layer + and ("word_embeddings" in k or "position_embeddings" in k) + and shared_layer_parsed + ): + continue + tname_to_layer_and_pname[v.name] = (layer, k) + + # get opt-> param mapping + tensor_names = list(tname_to_layer_and_pname.keys()) + opt_names = [ + e for e in opt.keys() if e not in ["master_weights", "LR_Scheduler"] + ] + opt_to_t = self._opt_name_to_tname(tensor_names, opt_names) + # gather tensors belonging to one layer togather + layers = OrderedDict() + for (k, v) in params.items(): + layer, p = tname_to_layer_and_pname[v.name] + if layer not in layers: + layers[layer] = {} + layers[layer]["opt"] = OrderedDict() + layers[layer]["params"] = OrderedDict() + layers[layer]["master_weights"] = OrderedDict() + layers[layer]["params"][p] = v + + for (k, v) in opt.items(): + if k in ["master_weights", "LR_Scheduler"]: + continue + layer, _ = tname_to_layer_and_pname[opt_to_t[v.name]] + layers[layer]["opt"][k] = v + + if "master_weights" in opt: + for (k, v) in opt["master_weights"].items(): + layer, _ = tname_to_layer_and_pname[k] + layers[layer]["master_weights"][k] = v + + if "LR_Scheduler" in opt: + for layer in layers: + layers[layer]["LR_Scheduler"] = opt["LR_Scheduler"] + + ans = [] + + for (layer_name, layer) in layers.items(): + # special treatment for embedding layer + if (not with_shared) and "shared_layers" in layer_name: + continue + file_name = f"./tmp_layer_files/{layer_name}.tmp" + paddle.save(layer, file_name) + ans.append((layer_name, file_name)) + print(f"save layer {layer_name} to {file_name}") + return ans + + def sort_layers(self, layers: list): + def priority(elem): + layer_name = elem[0] + if "shared_layers" in layer_name: + return -float(0.5) + match = re.search( + r"^_layers((\.\d+)+|(\.shared_layers\.[^\.]+))", layer_name + ) + assert match, f"{layer_name} not a valid layer name" + return float(match.group(1).lstrip(".")) + + # strictly sort layers + print("before sort %s" % ("|".join([e[0] for e in layers]))) + layers.sort(key=priority) + # unique + unique_layers = [] + for e in layers: + if unique_layers and e[0] == unique_layers[-1][0]: + continue + unique_layers.append(e) + print("after sort %s " % ("|".join([e[0] for e in unique_layers]))) + return unique_layers + + def segment_layers( + self, + layers: list, + config: ParallelConfig, + segment_method: str = "layer", + ): + layer_num = len(layers) + stage_num = config.pp * config.vpp + + # segment by weights + def segment_by_layer(): + # assume model is of the structure below + # embedding -> n*(transformer layer) -> [optional output layer] + # segment index + weights = [0 for _ in range(layer_num)] + non_zero_layers = range(1, layer_num - 1) + # input layer is embedding + if self._transformer_layer_num: + assert self._transformer_layer_num < layer_num + non_zero_layers = range(1, 1 + self._transformer_layer_num) + for i in non_zero_layers: + weights[i] = 1 + + part_size = sum(weights) // stage_num + result = [0 for _ in range(stage_num + 1)] + memory_counter = 0 + result_idx = 1 + for idx, weight in enumerate(weights): + memory_counter += weight + if memory_counter == part_size: + result[result_idx] = idx + 1 + result_idx += 1 + memory_counter = 0 + result[stage_num] = layer_num + return result + + def segment_uniform(): + result = [0 for _ in range(stage_num + 1)] + part_size = math.floor(layer_num / stage_num) + extra_layers = layer_num % stage_num + for i in range(1, stage_num): + offset = 1 if i > (stage_num - extra_layers) else 0 + result[i] = int( + min(result[i - 1] + part_size + offset, layer_num) + ) + result[stage_num] = layer_num + return result + + result = ( + segment_uniform() + if (segment_method == "uniform") + else segment_by_layer() + ) + index_segments = [[] for _ in range(config.pp)] + for i in range(stage_num): + index_segments[i % config.pp].append((result[i], result[i + 1])) + + # name layers + segments = [[] for i in range(config.pp)] + for i in range(config.pp): + for (start, end) in index_segments[i]: + for j in range(start, end): + if config.vpp > 1: + segments[i].append( + ( + [f"_layers.{start}.{j - start}"], + layers[j][1], + ) + ) + else: + segments[i].append(([f"_layers.{j}"], layers[j][1])) + + shared_layer_exist = any( + "_layers.shared_layers" in e[0] for e in layers + ) + if shared_layer_exist: + # special treatment for shared layer + if config.vpp > 1: + segments[0] = [ + ([layers[0][0], segments[0][0][0][0]], layers[0][1]) + ] + segments[0][1:] + else: + segments[0] = [([layers[0][0]], layers[0][1])] + segments[0][1:] + + for i in range(1, config.pp): + segments[i] = [([layers[0][0]], layers[0][1])] + segments[i] + + for (pp_rank, segs) in enumerate(segments): + print(f"segmentment result for pp_rank {pp_rank}:") + print(50 * "=") + for seg in segs: + print(f"{seg[0]} => {seg[1]}") + return segments + + def merge_layers(self, layers_segment: list, save_dir: str): + params = OrderedDict() + opt = OrderedDict() + master_weights = OrderedDict() + renaming_manager = LayerReNamingManager() + + def merge(src, dst, map_k=None): + for (k, v) in src.items(): + k = map_k(k) if map_k is not None else k + dst[k] = v + + lr_scheduler = None + for (layer_names, file_path) in layers_segment: + print("load %s" % file_path) + layer = paddle.load(file_path) + + def get_param_name_mapper(layer_name): + # replace layer name + def map_param_name(param_name): + layer_pre = self._extract_layer_name(param_name) + return layer_name + param_name[len(layer_pre) :] + + return map_param_name + + ( + layer_params, + layer_opt, + layer_master_weight, + ) = self._map_tensor_names( + layer["params"], + layer["opt"], + layer["master_weights"], + renaming_manager, + ) + for layer_name in layer_names: + merge(layer_params, params, get_param_name_mapper(layer_name)) + merge(layer_opt, opt) + merge(layer_master_weight, master_weights) + lr_scheduler = layer["LR_Scheduler"] + + opt = self._pack_opt_state_dict(opt, master_weights, lr_scheduler) + paddle.save(params, save_dir + "/model.pdparams") + paddle.save(opt, save_dir + "/model_state.pdopt") + + def _pack_opt_state_dict(self, opt, master_weights, lr_scheduler): + opt["master_weights"] = master_weights + opt["LR_Scheduler"] = lr_scheduler + return opt + + def _extract_layer_name(self, param_name: str): + match = re.search( + r"^_layers((\.\d+)+|(\.shared_layers\.[^\.]+))", param_name + ) + layer_name = "" + return "" if (not match) else match.group() + + # map opt names to tensor name + def _opt_name_to_tname(self, tensor_names, opt_names): + tensor_names = set(tensor_names) + all_names = [] + all_names.extend(list(tensor_names)) + all_names.extend(opt_names) + all_names.sort() + pre_t_name = "" + opt_to_t = {} + for n in all_names: + if n in tensor_names: + # we get a param + pre_t_name = n + else: + assert pre_t_name + opt_to_t[n] = pre_t_name + return opt_to_t + + def _map_tensor_names(self, params, opt, master_weights, renaming_manager): + opt_renamed = OrderedDict() + master_weights_renamed = OrderedDict() + # old name to new name + t_name_mapping = {} + # map tensor names + for (k, v) in params.items(): + t_name_mapping[v.name] = renaming_manager.get_new_param_name(v.name) + v.name = t_name_mapping[v.name] + # map opt names + opt_to_tname = self._opt_name_to_tname( + t_name_mapping.keys(), opt.keys() + ) + for (k, v) in opt.items(): + old_t_name = opt_to_tname[k] + t_name = t_name_mapping[old_t_name] + opt_name = t_name + k[len(old_t_name) :] + v.name = opt_name + opt_renamed[opt_name] = v + + # map master names + for (k, v) in master_weights.items(): + t_name = t_name_mapping[k] + v.name = t_name + v.name[len(k) :] + master_weights_renamed[t_name] = v + return (params, opt_renamed, master_weights_renamed) + + +def parse_args(): + + parser = argparse.ArgumentParser( + prog='model converter', description='converter a model' + ) + parser.add_argument( + '--src_path', + type=str, + default="./output/epoch_0_step_30", + help='path of the model to convert', + ) + + parser.add_argument( + '--dst_path', + type=str, + default="./test_adapt", + help='path to saved the converted model', + ) + + parser.add_argument( + '--src_mp', + type=int, + default=2, + help='mp degree of the origin triaing task that dumpped this model', + ) + + parser.add_argument( + '--src_pp', + type=int, + default=2, + help='pp degree of the origin triaing task that dumpped this model', + ) + + parser.add_argument( + '--src_vp', + type=int, + default=2, + help='vp degree of the origin triaing task that dumpped this model', + ) + + parser.add_argument( + '--dst_mp', + type=int, + default=None, + help='mp degree of the origin triaing task that dumpped this model', + ) + + parser.add_argument( + '--dst_pp', + type=int, + default=None, + help='pp degree of the expected triaing task that would recover this model', + ) + + parser.add_argument( + '--dst_vp', + type=int, + default=2, + help='vp degree of the expected triaing task that would recover this model', + ) + + parser.add_argument( + '--sharding', + type=int, + default=1, + help=" sharding degree of both the origin triaing task that dumpped this model and the expected triaing task that would recover this model", + ) + + parser.add_argument( + '--method', + type=str, + default="adapt_model", + help='vp degree of the expected triaing task that would recover this model', + ) + + parser.add_argument( + '--segment_method', + type=str, + default="layer", + help='method to segment layers to pp or vp stages', + ) + + parser.add_argument( + '--transformer_layer_num', + type=int, + default=0, + help='transformer_layer_num of the model', + ) + # assume model is of the structure below + # embedding -> n*[transformer layer] -> optional output layer + + args = parser.parse_args() + + if args.dst_mp is None: + args.dst_mp = args.src_mp + if args.dst_pp is None: + args.dst_pp = args.src_pp + + assert args.src_mp == args.dst_mp, "src mp {} dst mp {}".format( + args.src_mp, args.dst_mp + ) + + assert args.method in [ + 'peek_model', + 'adapt_model', + ], "method should be in ['peek_model', 'adapt_model']" + assert args.segment_method in [ + "uniform", + "layer", + ], "segment_method should be 'uniform' or 'layer" + + print( + "adapt model dumped by task with pp degree:{}, vp degree:{}, mp degree:{} to task with pp degree:{}, vp degree:{}, mp degree:{}".format( + args.src_pp, + args.src_vp, + args.src_mp, + args.dst_pp, + args.dst_vp, + args.dst_mp, + ) + ) + + return args + + +def adaptor_from_args(args): + src_parallel_config = ParallelConfig( + args.src_mp, args.src_pp, args.src_vp, args.sharding + ) + + dst_parallel_config = ParallelConfig( + args.dst_mp, args.dst_pp, args.dst_vp, args.sharding + ) + + adaptor = PipeLineModelAdaptor( + src_parallel_config, + dst_parallel_config, + args.transformer_layer_num, + args.segment_method, + ) + return adaptor + + +def main(): + + args = parse_args() + adaptor = adaptor_from_args(args) + if args.method == "peek_model": + adaptor.peek_model(args.dst_path) + elif args.method == "adapt_model": + adaptor.apply(args.src_path, args.dst_path) + + +if __name__ == "__main__": + """ + Usage: + python pp_parallel_adaptor.py --src_mp xxx --src_path xxx --method \ + adapt_model/peek_model --dst_path xxx --sharding xxx --segment_method xxx --transformer_layer_num xxx + + for the meaning of a specific arg, please use: + python pp_parallel_adaptor.py -h + """ + main() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt index 637dffe4289..dad5f1d4b5b 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt @@ -168,6 +168,19 @@ if((WITH_GPU) AND LOCAL_ALL_PLAT) test_parallel_dygraph_pipeline_parallel_with_virtual_stage PROPERTIES TIMEOUT "500") endif() +if((WITH_GPU) AND LOCAL_ALL_PLAT) + bash_test_modules( + test_parallel_dygraph_pp_adaptor + START_BASH + ../../dist_test.sh + LABELS + "RUN_TYPE=DIST" + ENVS + "PADDLE_DIST_UT_PORT=21976;http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python" + ) + set_tests_properties(test_parallel_dygraph_pp_adaptor PROPERTIES TIMEOUT + "500") +endif() if((WITH_GPU OR WITH_XPU) AND (LINUX)) py_test_modules( test_fleet_localsgd_meta_optimizer MODULES diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer.py index 6c1ae3eac44..216f37796da 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer.py @@ -121,11 +121,11 @@ class CriterionPipe(Layer): class ModelPipe(PipelineLayer): - def __init__(self, topology): + def __init__(self, topology, transformer_layer_num: int = 6): self.descs = [] self.descs.append(LayerDesc(EmbeddingPipe)) - for x in range(6): + for x in range(transformer_layer_num): self.descs.append(LayerDesc(TransformerNetPipe)) self.descs.append(lambda x: x[0]) diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save.py new file mode 100644 index 00000000000..a8cf970f73d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save.py @@ -0,0 +1,101 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import numpy as np +from hybrid_parallel_pp_transformer import ModelPipe, set_random_seed + +import paddle +import paddle.distributed as dist +from paddle.distributed import fleet + +batch_size = 8 +length = 8 +micro_batch_size = 2 +vocab_size = 128 +transformer_layer_num = 8 + + +class TestDistPPSaveTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size, + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + print(f"pwd {os.getcwd()}") + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + topology = hcg.topology() + set_random_seed(1024, dp_id, rank_id) + + model = ModelPipe(topology, transformer_layer_num=transformer_layer_num) + scheduler = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2], values=[0.001, 0.002], verbose=True + ) + optimizer = paddle.optimizer.SGD( + learning_rate=scheduler, parameters=model.parameters() + ) + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + output_dir = "{}/mp_00_sharding_00_pp_{:0>2d}".format( + "./pp_transformer", pp_id + ) + try: + os.makedirs(output_dir) + except: + # dir is already created, do nothing + pass + for step_id in range(2): + x_data = np.random.randint(0, vocab_size, size=[batch_size, length]) + x = paddle.to_tensor(x_data) + x.stop_gradient = True + loss = model.train_batch([x, x], optimizer, scheduler) + + paddle.save( + model.state_dict(), + os.path.join(output_dir, "model.pdparams"), + ) + + paddle.save( + optimizer.state_dict(), + os.path.join(output_dir, "model_state.pdopt"), + ) + meta_dict = { + "epoch": 0, + "step": 2, + "cuda_rng_state": paddle.get_cuda_rng_state(), + } + paddle.save(meta_dict, os.path.join(output_dir, "meta_state.pdopt")) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save_with_virtual_stage.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save_with_virtual_stage.py new file mode 100644 index 00000000000..372cbe7f48d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_save_with_virtual_stage.py @@ -0,0 +1,105 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import numpy as np +from hybrid_parallel_pp_transformer_with_virtual_stage import ( + ModelPipe, + set_random_seed, +) + +import paddle +import paddle.distributed as dist +from paddle.distributed import fleet + +batch_size = 8 +length = 8 +micro_batch_size = 2 +vocab_size = 128 + +transformer_layer_num = 8 + + +class TestDistPPSaveTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size, + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + print(f"pwd {os.getcwd()}") + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + topology = hcg.topology() + set_random_seed(1024, dp_id, rank_id) + + model = ModelPipe(topology, transformer_layer_num=transformer_layer_num) + scheduler = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2], values=[0.001, 0.002], verbose=True + ) + optimizer = paddle.optimizer.SGD( + learning_rate=scheduler, parameters=model.parameters() + ) + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + + output_dir = "{}/mp_00_sharding_00_pp_{:0>2d}".format( + "./pp_transformer_vp", pp_id + ) + try: + os.makedirs(output_dir) + except: + # dir is already created, do nothing + pass + for step_id in range(2): + x_data = np.random.randint(0, vocab_size, size=[batch_size, length]) + x = paddle.to_tensor(x_data) + x.stop_gradient = True + loss = model.train_batch([x, x], optimizer, scheduler) + + paddle.save( + model.state_dict(), + os.path.join(output_dir, "model.pdparams"), + ) + paddle.save( + optimizer.state_dict(), + os.path.join(output_dir, "model_state.pdopt"), + ) + meta_dict = { + "epoch": 0, + "step": 2, + "cuda_rng_state": paddle.get_cuda_rng_state(), + } + paddle.save(meta_dict, os.path.join(output_dir, "meta_state.pdopt")) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py index 21cc9134e4d..9f43fc4c9ef 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_pp_transformer_with_virtual_stage.py @@ -120,11 +120,10 @@ class CriterionPipe(Layer): class ModelPipe(PipelineLayer): - def __init__(self, topology): + def __init__(self, topology, transformer_layer_num: int = 8): self.descs = [] self.descs.append(LayerDesc(EmbeddingPipe)) - - for x in range(8): + for x in range(transformer_layer_num): self.descs.append(LayerDesc(TransformerNetPipe)) self.descs.append(lambda x: x[0]) diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_pp_adaptor.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_pp_adaptor.py new file mode 100644 index 00000000000..3b11210991d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_pp_adaptor.py @@ -0,0 +1,163 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import unittest + +from test_parallel_dygraph_dataparallel import TestMultipleGpus + +import paddle +from paddle.distributed.fleet.utils.pp_parallel_adaptor import ( + ParallelConfig, + PipeLineModelAdaptor, + adaptor_from_args, + parse_args, +) + + +class TestPPAdaptor(TestMultipleGpus): + def test_parse_args(self): + args = parse_args() + self.assertEqual(args.src_mp, args.dst_mp) + adaptor = adaptor_from_args(args) + self.assertTrue(adaptor is not None) + + def test_hybrid_parallel_transformer_unbalanced_data(self): + print(f"pwd {os.getcwd()}") + self.run_mnist_2gpu('hybrid_parallel_pp_transformer_save.py') + self.run_mnist_2gpu( + 'hybrid_parallel_pp_transformer_save_with_virtual_stage.py' + ) + # test pp adaptor + dir1 = "./pp_transformer" + p_config1 = ParallelConfig(mp=1, pp=2, vpp=1, sharding=1) + dir2 = "./pp_transformer_vp" + p_config2 = ParallelConfig(mp=1, pp=2, vpp=2, sharding=1) + + pp_to_vp = PipeLineModelAdaptor( + src_parallel_config=p_config1, + dst_parallel_config=p_config2, + transformer_layer_num=8, + segment_method="layer", + ) + vp_to_pp = PipeLineModelAdaptor( + src_parallel_config=p_config2, + dst_parallel_config=p_config1, + transformer_layer_num=8, + segment_method="layer", + ) + + def check_converted_model(converted_model_dir, expected_model_dir): + # for compatibility, converted_model_dir may contain more key than + # expected model, which does not hinder model recovering + for i in range(p_config1.pp): + sub_converted_model_dir = ( + "{}/mp_00_sharding_00_pp_{:0>2d}".format( + converted_model_dir, i + ) + ) + sub_expected_model_dir = ( + "{}/mp_00_sharding_00_pp_{:0>2d}".format( + expected_model_dir, i + ) + ) + print( + f"converted_model_dir: {sub_converted_model_dir}; expected_model_dir: {sub_expected_model_dir}" + ) + + def check_names(dict_1, dict_2): + for (k, v) in dict_2.items(): + self.assertTrue(k in dict_1) + self.assertEqual( + getattr(v, "name", ""), + getattr(dict_1[k], "name", ""), + ) + + # check param + params_1 = paddle.load( + f"{sub_converted_model_dir}/model.pdparams" + ) + params_2 = paddle.load( + f"{sub_expected_model_dir}/model.pdparams" + ) + check_names(params_1, params_2) + del params_1 + del params_2 + # check opt + opt_1 = paddle.load( + f"{sub_converted_model_dir}/model_state.pdopt" + ) + opt_2 = paddle.load( + f"{sub_expected_model_dir}/model_state.pdopt" + ) + check_names(opt_1, opt_2) + # check master wieghts + if "master_weights" in opt_2: + self.assertTrue("master_weights" in opt_1) + check_names( + opt_2["master_weights"], opt_1["master_weights"] + ) + + def create_dir_if_nonexist(dir: str): + if not os.path.exists(dir): + os.makedirs(dir) + + # check pp to vp + tmp_dir1 = "./tmp_pp_to_vp" + create_dir_if_nonexist(tmp_dir1) + pp_to_vp.apply(dir1, tmp_dir1) + # browse the converted model + pp_to_vp.peek_model(tmp_dir1) + # check + check_converted_model(tmp_dir1, dir2) + + # check vp to pp + tmp_dir2 = "./tmp_vp_to_pp" + create_dir_if_nonexist(tmp_dir2) + vp_to_pp.apply(dir2, tmp_dir2) + vp_to_pp.peek_model(tmp_dir2) + check_converted_model(tmp_dir2, dir1) + + # check uniform segment + tmp_dir3 = "./tmp_vp_to_pp_uniform" + create_dir_if_nonexist(tmp_dir3) + vp_to_pp_uniform = PipeLineModelAdaptor( + src_parallel_config=p_config2, + dst_parallel_config=p_config1, + transformer_layer_num=8, + segment_method="uniform", + ) + vp_to_pp_uniform.apply(dir2, tmp_dir3) + vp_to_pp_uniform.peek_model(tmp_dir3) + + tmp_dir4 = "./tmp_pp_to_pp_uniform" + create_dir_if_nonexist(tmp_dir4) + pp_to_pp_uniform = PipeLineModelAdaptor( + src_parallel_config=p_config1, + dst_parallel_config=p_config1, + transformer_layer_num=8, + segment_method="uniform", + ) + pp_to_pp_uniform.apply(dir1, tmp_dir4) + pp_to_pp_uniform.peek_model(tmp_dir4) + check_converted_model(tmp_dir3, tmp_dir4) + + # rm dirs + for d in [dir1, dir2, tmp_dir1, tmp_dir2, tmp_dir3, tmp_dir4]: + shutil.rmtree(d, ignore_errors=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv index 459a3e39df8..688b5e759b1 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv +++ b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv @@ -14,6 +14,7 @@ test_dygraph_sharding_stage3_for_eager,,,350,DIST,../../dist_test.sh,2,,http_pro test_communicator_half_async,,,120,DIST,test_runner.py,2,,FLAGS_communicator_send_queue_size=1;FLAGS_communicator_max_merge_var_num=1;http_proxy=;https_proxy=;PYTHONPATH=../..,WITH_NCCL test_parallel_dygraph_pipeline_parallel,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_parallel_dygraph_pipeline_parallel_with_virtual_stage,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., +test_parallel_dygraph_pp_adaptor,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_fleet_localsgd_meta_optimizer,LINUX,GPU;XPU,,,test_runner.py,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_parallel_class_center_sample,,GPU,120,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,WITH_NCCL test_pipeline,,,120,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., diff --git a/python/paddle/framework/io.py b/python/paddle/framework/io.py index a482d8bed41..e5d011453f2 100644 --- a/python/paddle/framework/io.py +++ b/python/paddle/framework/io.py @@ -1071,11 +1071,18 @@ def load(path, **configs): # paddle2.0: paddle.save/load if "StructuredToParameterName@@" in load_result: - for key in load_result["StructuredToParameterName@@"]: + for (key, name) in load_result[ + "StructuredToParameterName@@" + ].items(): if isinstance(load_result[key], np.ndarray): load_result[key] = _ndarray_to_tensor( load_result[key], config.return_numpy ) + # default name is "generatedxxx" which is set in Tensor init, if not set + if not config.return_numpy and getattr( + load_result[key], "name", "" + ): + load_result[key].name = name if ( not config.keep_name_table diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index 74b2fdb47a1..e8660fc7b02 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -76,7 +76,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_pass_quantization MODULES test_pass_quantization) set_tests_properties(test_pass_quantization PROPERTIES TIMEOUT 60) py_test_modules(test_tuning_recompute MODULES test_tuning_recompute) - set_tests_properties(test_tuning_recompute PROPERTIES TIMEOUT 240) + set_tests_properties(test_tuning_recompute PROPERTIES TIMEOUT 300) py_test_modules(test_fused_linear_pass MODULES test_fused_linear_pass) set_tests_properties(test_fused_linear_pass PROPERTIES TIMEOUT 20) py_test_modules(test_align_tool MODULES test_align_tool) diff --git a/test/distributed_passes/CMakeLists.txt b/test/distributed_passes/CMakeLists.txt index 83d2f0fb529..e2b8697fc85 100644 --- a/test/distributed_passes/CMakeLists.txt +++ b/test/distributed_passes/CMakeLists.txt @@ -29,6 +29,6 @@ endif() foreach(TEST_OP ${TEST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) list(APPEND DIST_TEST_OPS ${TEST_OP}) - set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 120) + set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 200) set_tests_properties(${TEST_OP} PROPERTIES LABELS "RUN_TYPE=DIST") endforeach() diff --git a/test/ir/inference/CMakeLists.txt b/test/ir/inference/CMakeLists.txt index b3bb181ea4a..90620dab0fd 100755 --- a/test/ir/inference/CMakeLists.txt +++ b/test/ir/inference/CMakeLists.txt @@ -197,7 +197,7 @@ if(WITH_GPU AND TENSORRT_FOUND) set_tests_properties(test_trt_fc_fuse_quant_dequant_pass PROPERTIES TIMEOUT 100) set_tests_properties(test_trt_conv_quant_dequant_pass PROPERTIES TIMEOUT 100) - set_tests_properties(test_trt_matmul_quant_dequant PROPERTIES TIMEOUT 100) + set_tests_properties(test_trt_matmul_quant_dequant PROPERTIES TIMEOUT 180) set_tests_properties(test_trt_conv3d_op PROPERTIES TIMEOUT 60) set_tests_properties(test_trt_conv3d_transpose_op PROPERTIES TIMEOUT 60) set_tests_properties(test_trt_nearest_interp_v2_op PROPERTIES TIMEOUT 30) -- GitLab