未验证 提交 3650c4a8 编写于 作者: zhenhailiu's avatar zhenhailiu 提交者: GitHub

pp 策略调整后,模型转换,以便模型热启 (#52927)

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish

* polish
上级 0e30d56a
......@@ -564,7 +564,7 @@ class PipelineLayer(nn.Layer):
self.segment_parts = seg.do_segment()
logger.info(
"segment result:"
f"segment with method: {seg_method}; result: "
+ ", ".join(str(arg) for arg in self.segment_parts)
)
......@@ -594,7 +594,7 @@ class PipelineLayer(nn.Layer):
self.segment_parts = seg.do_segment()
logger.info(
"segment result:"
f"segment with method: {seg_method}; result: "
+ ", ".join(str(arg) for arg in self.segment_parts)
)
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import math
import re
import shutil
from collections import OrderedDict
import paddle
class ParallelConfig:
def __init__(self, mp: int, pp: int, vpp: int = 1, sharding: int = 1):
self.mp = mp
self.pp = pp
self.vpp = vpp
self.sharding = sharding
def pipe_parallel_group(self, i: int, j: int):
ans = []
for k in range(self.pp):
ans.append((i, j, k))
return ans
class LayerReNamingHelper:
def __init__(self, template: str):
self._template = template
self._i = -1
self._last_old_layer_name = None
def get_new_layer_name(self, old_layer_name: str):
old_layer_name = old_layer_name.split(".")[0]
if (
self._last_old_layer_name is None
or old_layer_name != self._last_old_layer_name
):
self._i = self._i + 1
self._last_old_layer_name = old_layer_name
return self._template.format(self._i)
class LayerReNamingManager:
def __init__(self):
self._renaming_helpers = OrderedDict()
self._renaming_helpers["linear"] = LayerReNamingHelper("linear_{}")
self._renaming_helpers["layer_norm"] = LayerReNamingHelper(
"layer_norm_{}"
)
self._renaming_helpers["embedding"] = LayerReNamingHelper(
"embedding_{}"
)
def get_new_layer_name(self, old_name: str):
layer_name = ""
for (k, v) in self._renaming_helpers.items():
if old_name.startswith(k):
layer_name = v.get_new_layer_name(old_name)
break
return layer_name
def get_new_param_name(self, old_name: str):
names = old_name.split(".")
layer_name = self.get_new_layer_name(names[0])
assert layer_name, f"can not rename layer {names[0]}"
names[0] = layer_name
return ".".join(names)
class PipeLineModelAdaptor:
def __init__(
self,
src_parallel_config: ParallelConfig,
dst_parallel_config: ParallelConfig,
transformer_layer_num: int,
segment_method: str = "layer",
):
self._src_parallel_config = src_parallel_config
self._dst_parallel_config = dst_parallel_config
self._transformer_layer_num = transformer_layer_num
self._segment_method = segment_method
def apply(self, src_model_path: str, dst_model_path: str):
for i in range(self._src_parallel_config.mp):
for j in range(self._src_parallel_config.sharding):
# TODO(liuzhenhai): use multiple processs
layers = []
# 1、extract layers in the same pp group
group = self._src_parallel_config.pipe_parallel_group(i, j)
src_dirs = [
"{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format(
src_model_path, *e
)
for e in group
]
# first rank extract shared layer
with_shared = True
for dir in src_dirs:
print("extract layer params in dir %s" % dir)
layers.extend(self.extract_layers(dir, with_shared))
with_shared = False
# 2、sort and unique layers
layers = self.sort_layers(layers)
# 3、resplit layers among pp group according new pp config
layer_segments = self.segment_layers(
layers, self._dst_parallel_config, self._segment_method
)
dst_group = self._dst_parallel_config.pipe_parallel_group(i, j)
dst_dirs = [
"{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format(
dst_model_path, *e
)
for e in dst_group
]
# 4、merge layers belonging to the same node
for (layer_segment, dir_) in zip(layer_segments, dst_dirs):
print(f"merge {len(layer_segment)} layers to {dir_}")
self.merge_layers(layer_segment, dir_)
# 5、copy meta_state.pdopt
for (src_dir, dst_dir) in zip(src_dirs, dst_dirs):
shutil.copyfile(
f"{src_dir}/meta_state.pdopt",
f"{dst_dir}/meta_state.pdopt",
)
def peek_model(self, model_dir: str):
for i in range(self._src_parallel_config.mp):
for j in range(self._src_parallel_config.sharding):
group = self._src_parallel_config.pipe_parallel_group(i, j)
dirs = [
"{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format(
model_dir, *e
)
for e in group
]
for dir in dirs:
print(f"peek partial model in {dir}:")
self.peek_partial_model(dir)
def peek_partial_model(self, sub_dir: str):
state_dict = paddle.load(f"{sub_dir}/model.pdparams")
for (k, v) in state_dict.items():
print(f"\t{k} -> {v.name}")
def extract_layers(self, dir: str, with_shared: bool):
opt = paddle.load(dir + "/model_state.pdopt")
params = paddle.load(dir + "/model.pdparams")
shared_layer_parsed = False
# tname -> (layer, param_name)
tname_to_layer_and_pname = {}
for (k, v) in params.items():
layer = self._extract_layer_name(k)
assert layer
# special treatment for embedding layer, skip duplicated shared layer
# shared layer may exist or not, if it exist it share weight with _layers.0
# _layers.shared_layers.embed.word_embeddings.weight -> embedding_0.w_0
# _layers.shared_layers.embed.position_embeddings.weight -> embedding_1.w_0
# _layers.0.word_embeddings.weight -> embedding_0.w_0
# _layers.0.position_embeddings.weight -> embedding_1.w_0
shared_layer_parsed = shared_layer_parsed or (
"_layers.shared_layers" in layer
)
if (
"_layers.shared_layers" not in layer
and ("word_embeddings" in k or "position_embeddings" in k)
and shared_layer_parsed
):
continue
tname_to_layer_and_pname[v.name] = (layer, k)
# get opt-> param mapping
tensor_names = list(tname_to_layer_and_pname.keys())
opt_names = [
e for e in opt.keys() if e not in ["master_weights", "LR_Scheduler"]
]
opt_to_t = self._opt_name_to_tname(tensor_names, opt_names)
# gather tensors belonging to one layer togather
layers = OrderedDict()
for (k, v) in params.items():
layer, p = tname_to_layer_and_pname[v.name]
if layer not in layers:
layers[layer] = {}
layers[layer]["opt"] = OrderedDict()
layers[layer]["params"] = OrderedDict()
layers[layer]["master_weights"] = OrderedDict()
layers[layer]["params"][p] = v
for (k, v) in opt.items():
if k in ["master_weights", "LR_Scheduler"]:
continue
layer, _ = tname_to_layer_and_pname[opt_to_t[v.name]]
layers[layer]["opt"][k] = v
if "master_weights" in opt:
for (k, v) in opt["master_weights"].items():
layer, _ = tname_to_layer_and_pname[k]
layers[layer]["master_weights"][k] = v
if "LR_Scheduler" in opt:
for layer in layers:
layers[layer]["LR_Scheduler"] = opt["LR_Scheduler"]
ans = []
for (layer_name, layer) in layers.items():
# special treatment for embedding layer
if (not with_shared) and "shared_layers" in layer_name:
continue
file_name = f"./tmp_layer_files/{layer_name}.tmp"
paddle.save(layer, file_name)
ans.append((layer_name, file_name))
print(f"save layer {layer_name} to {file_name}")
return ans
def sort_layers(self, layers: list):
def priority(elem):
layer_name = elem[0]
if "shared_layers" in layer_name:
return -float(0.5)
match = re.search(
r"^_layers((\.\d+)+|(\.shared_layers\.[^\.]+))", layer_name
)
assert match, f"{layer_name} not a valid layer name"
return float(match.group(1).lstrip("."))
# strictly sort layers
print("before sort %s" % ("|".join([e[0] for e in layers])))
layers.sort(key=priority)
# unique
unique_layers = []
for e in layers:
if unique_layers and e[0] == unique_layers[-1][0]:
continue
unique_layers.append(e)
print("after sort %s " % ("|".join([e[0] for e in unique_layers])))
return unique_layers
def segment_layers(
self,
layers: list,
config: ParallelConfig,
segment_method: str = "layer",
):
layer_num = len(layers)
stage_num = config.pp * config.vpp
# segment by weights
def segment_by_layer():
# assume model is of the structure below
# embedding -> n*(transformer layer) -> [optional output layer]
# segment index
weights = [0 for _ in range(layer_num)]
non_zero_layers = range(1, layer_num - 1)
# input layer is embedding
if self._transformer_layer_num:
assert self._transformer_layer_num < layer_num
non_zero_layers = range(1, 1 + self._transformer_layer_num)
for i in non_zero_layers:
weights[i] = 1
part_size = sum(weights) // stage_num
result = [0 for _ in range(stage_num + 1)]
memory_counter = 0
result_idx = 1
for idx, weight in enumerate(weights):
memory_counter += weight
if memory_counter == part_size:
result[result_idx] = idx + 1
result_idx += 1
memory_counter = 0
result[stage_num] = layer_num
return result
def segment_uniform():
result = [0 for _ in range(stage_num + 1)]
part_size = math.floor(layer_num / stage_num)
extra_layers = layer_num % stage_num
for i in range(1, stage_num):
offset = 1 if i > (stage_num - extra_layers) else 0
result[i] = int(
min(result[i - 1] + part_size + offset, layer_num)
)
result[stage_num] = layer_num
return result
result = (
segment_uniform()
if (segment_method == "uniform")
else segment_by_layer()
)
index_segments = [[] for _ in range(config.pp)]
for i in range(stage_num):
index_segments[i % config.pp].append((result[i], result[i + 1]))
# name layers
segments = [[] for i in range(config.pp)]
for i in range(config.pp):
for (start, end) in index_segments[i]:
for j in range(start, end):
if config.vpp > 1:
segments[i].append(
(
[f"_layers.{start}.{j - start}"],
layers[j][1],
)
)
else:
segments[i].append(([f"_layers.{j}"], layers[j][1]))
shared_layer_exist = any(
"_layers.shared_layers" in e[0] for e in layers
)
if shared_layer_exist:
# special treatment for shared layer
if config.vpp > 1:
segments[0] = [
([layers[0][0], segments[0][0][0][0]], layers[0][1])
] + segments[0][1:]
else:
segments[0] = [([layers[0][0]], layers[0][1])] + segments[0][1:]
for i in range(1, config.pp):
segments[i] = [([layers[0][0]], layers[0][1])] + segments[i]
for (pp_rank, segs) in enumerate(segments):
print(f"segmentment result for pp_rank {pp_rank}:")
print(50 * "=")
for seg in segs:
print(f"{seg[0]} => {seg[1]}")
return segments
def merge_layers(self, layers_segment: list, save_dir: str):
params = OrderedDict()
opt = OrderedDict()
master_weights = OrderedDict()
renaming_manager = LayerReNamingManager()
def merge(src, dst, map_k=None):
for (k, v) in src.items():
k = map_k(k) if map_k is not None else k
dst[k] = v
lr_scheduler = None
for (layer_names, file_path) in layers_segment:
print("load %s" % file_path)
layer = paddle.load(file_path)
def get_param_name_mapper(layer_name):
# replace layer name
def map_param_name(param_name):
layer_pre = self._extract_layer_name(param_name)
return layer_name + param_name[len(layer_pre) :]
return map_param_name
(
layer_params,
layer_opt,
layer_master_weight,
) = self._map_tensor_names(
layer["params"],
layer["opt"],
layer["master_weights"],
renaming_manager,
)
for layer_name in layer_names:
merge(layer_params, params, get_param_name_mapper(layer_name))
merge(layer_opt, opt)
merge(layer_master_weight, master_weights)
lr_scheduler = layer["LR_Scheduler"]
opt = self._pack_opt_state_dict(opt, master_weights, lr_scheduler)
paddle.save(params, save_dir + "/model.pdparams")
paddle.save(opt, save_dir + "/model_state.pdopt")
def _pack_opt_state_dict(self, opt, master_weights, lr_scheduler):
opt["master_weights"] = master_weights
opt["LR_Scheduler"] = lr_scheduler
return opt
def _extract_layer_name(self, param_name: str):
match = re.search(
r"^_layers((\.\d+)+|(\.shared_layers\.[^\.]+))", param_name
)
layer_name = ""
return "" if (not match) else match.group()
# map opt names to tensor name
def _opt_name_to_tname(self, tensor_names, opt_names):
tensor_names = set(tensor_names)
all_names = []
all_names.extend(list(tensor_names))
all_names.extend(opt_names)
all_names.sort()
pre_t_name = ""
opt_to_t = {}
for n in all_names:
if n in tensor_names:
# we get a param
pre_t_name = n
else:
assert pre_t_name
opt_to_t[n] = pre_t_name
return opt_to_t
def _map_tensor_names(self, params, opt, master_weights, renaming_manager):
opt_renamed = OrderedDict()
master_weights_renamed = OrderedDict()
# old name to new name
t_name_mapping = {}
# map tensor names
for (k, v) in params.items():
t_name_mapping[v.name] = renaming_manager.get_new_param_name(v.name)
v.name = t_name_mapping[v.name]
# map opt names
opt_to_tname = self._opt_name_to_tname(
t_name_mapping.keys(), opt.keys()
)
for (k, v) in opt.items():
old_t_name = opt_to_tname[k]
t_name = t_name_mapping[old_t_name]
opt_name = t_name + k[len(old_t_name) :]
v.name = opt_name
opt_renamed[opt_name] = v
# map master names
for (k, v) in master_weights.items():
t_name = t_name_mapping[k]
v.name = t_name + v.name[len(k) :]
master_weights_renamed[t_name] = v
return (params, opt_renamed, master_weights_renamed)
def parse_args():
parser = argparse.ArgumentParser(
prog='model converter', description='converter a model'
)
parser.add_argument(
'--src_path',
type=str,
default="./output/epoch_0_step_30",
help='path of the model to convert',
)
parser.add_argument(
'--dst_path',
type=str,
default="./test_adapt",
help='path to saved the converted model',
)
parser.add_argument(
'--src_mp',
type=int,
default=2,
help='mp degree of the origin triaing task that dumpped this model',
)
parser.add_argument(
'--src_pp',
type=int,
default=2,
help='pp degree of the origin triaing task that dumpped this model',
)
parser.add_argument(
'--src_vp',
type=int,
default=2,
help='vp degree of the origin triaing task that dumpped this model',
)
parser.add_argument(
'--dst_mp',
type=int,
default=None,
help='mp degree of the origin triaing task that dumpped this model',
)
parser.add_argument(
'--dst_pp',
type=int,
default=None,
help='pp degree of the expected triaing task that would recover this model',
)
parser.add_argument(
'--dst_vp',
type=int,
default=2,
help='vp degree of the expected triaing task that would recover this model',
)
parser.add_argument(
'--sharding',
type=int,
default=1,
help=" sharding degree of both the origin triaing task that dumpped this model and the expected triaing task that would recover this model",
)
parser.add_argument(
'--method',
type=str,
default="adapt_model",
help='vp degree of the expected triaing task that would recover this model',
)
parser.add_argument(
'--segment_method',
type=str,
default="layer",
help='method to segment layers to pp or vp stages',
)
parser.add_argument(
'--transformer_layer_num',
type=int,
default=0,
help='transformer_layer_num of the model',
)
# assume model is of the structure below
# embedding -> n*[transformer layer] -> optional output layer
args = parser.parse_args()
if args.dst_mp is None:
args.dst_mp = args.src_mp
if args.dst_pp is None:
args.dst_pp = args.src_pp
assert args.src_mp == args.dst_mp, "src mp {} dst mp {}".format(
args.src_mp, args.dst_mp
)
assert args.method in [
'peek_model',
'adapt_model',
], "method should be in ['peek_model', 'adapt_model']"
assert args.segment_method in [
"uniform",
"layer",
], "segment_method should be 'uniform' or 'layer"
print(
"adapt model dumped by task with pp degree:{}, vp degree:{}, mp degree:{} to task with pp degree:{}, vp degree:{}, mp degree:{}".format(
args.src_pp,
args.src_vp,
args.src_mp,
args.dst_pp,
args.dst_vp,
args.dst_mp,
)
)
return args
def adaptor_from_args(args):
src_parallel_config = ParallelConfig(
args.src_mp, args.src_pp, args.src_vp, args.sharding
)
dst_parallel_config = ParallelConfig(
args.dst_mp, args.dst_pp, args.dst_vp, args.sharding
)
adaptor = PipeLineModelAdaptor(
src_parallel_config,
dst_parallel_config,
args.transformer_layer_num,
args.segment_method,
)
return adaptor
def main():
args = parse_args()
adaptor = adaptor_from_args(args)
if args.method == "peek_model":
adaptor.peek_model(args.dst_path)
elif args.method == "adapt_model":
adaptor.apply(args.src_path, args.dst_path)
if __name__ == "__main__":
"""
Usage:
python pp_parallel_adaptor.py --src_mp xxx --src_path xxx --method \
adapt_model/peek_model --dst_path xxx --sharding xxx --segment_method xxx --transformer_layer_num xxx
for the meaning of a specific arg, please use:
python pp_parallel_adaptor.py -h
"""
main()
......@@ -168,6 +168,19 @@ if((WITH_GPU) AND LOCAL_ALL_PLAT)
test_parallel_dygraph_pipeline_parallel_with_virtual_stage
PROPERTIES TIMEOUT "500")
endif()
if((WITH_GPU) AND LOCAL_ALL_PLAT)
bash_test_modules(
test_parallel_dygraph_pp_adaptor
START_BASH
../../dist_test.sh
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21976;http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python"
)
set_tests_properties(test_parallel_dygraph_pp_adaptor PROPERTIES TIMEOUT
"500")
endif()
if((WITH_GPU OR WITH_XPU) AND (LINUX))
py_test_modules(
test_fleet_localsgd_meta_optimizer MODULES
......
......@@ -121,11 +121,11 @@ class CriterionPipe(Layer):
class ModelPipe(PipelineLayer):
def __init__(self, topology):
def __init__(self, topology, transformer_layer_num: int = 6):
self.descs = []
self.descs.append(LayerDesc(EmbeddingPipe))
for x in range(6):
for x in range(transformer_layer_num):
self.descs.append(LayerDesc(TransformerNetPipe))
self.descs.append(lambda x: x[0])
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import numpy as np
from hybrid_parallel_pp_transformer import ModelPipe, set_random_seed
import paddle
import paddle.distributed as dist
from paddle.distributed import fleet
batch_size = 8
length = 8
micro_batch_size = 2
vocab_size = 128
transformer_layer_num = 8
class TestDistPPSaveTraning(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
self.model_parallel_size = 1
self.data_parallel_size = 1
self.pipeline_parallel_size = 2
strategy.hybrid_configs = {
"dp_degree": self.data_parallel_size,
"mp_degree": self.model_parallel_size,
"pp_degree": self.pipeline_parallel_size,
}
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size,
}
fleet.init(is_collective=True, strategy=strategy)
def test_pp_model(self):
print(f"pwd {os.getcwd()}")
hcg = fleet.get_hybrid_communicate_group()
word_size = hcg.get_model_parallel_world_size()
dp_id = hcg.get_data_parallel_rank()
pp_id = hcg.get_stage_id()
rank_id = dist.get_rank()
topology = hcg.topology()
set_random_seed(1024, dp_id, rank_id)
model = ModelPipe(topology, transformer_layer_num=transformer_layer_num)
scheduler = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True
)
optimizer = paddle.optimizer.SGD(
learning_rate=scheduler, parameters=model.parameters()
)
model = fleet.distributed_model(model)
optimizer = fleet.distributed_optimizer(optimizer)
output_dir = "{}/mp_00_sharding_00_pp_{:0>2d}".format(
"./pp_transformer", pp_id
)
try:
os.makedirs(output_dir)
except:
# dir is already created, do nothing
pass
for step_id in range(2):
x_data = np.random.randint(0, vocab_size, size=[batch_size, length])
x = paddle.to_tensor(x_data)
x.stop_gradient = True
loss = model.train_batch([x, x], optimizer, scheduler)
paddle.save(
model.state_dict(),
os.path.join(output_dir, "model.pdparams"),
)
paddle.save(
optimizer.state_dict(),
os.path.join(output_dir, "model_state.pdopt"),
)
meta_dict = {
"epoch": 0,
"step": 2,
"cuda_rng_state": paddle.get_cuda_rng_state(),
}
paddle.save(meta_dict, os.path.join(output_dir, "meta_state.pdopt"))
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import numpy as np
from hybrid_parallel_pp_transformer_with_virtual_stage import (
ModelPipe,
set_random_seed,
)
import paddle
import paddle.distributed as dist
from paddle.distributed import fleet
batch_size = 8
length = 8
micro_batch_size = 2
vocab_size = 128
transformer_layer_num = 8
class TestDistPPSaveTraning(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
self.model_parallel_size = 1
self.data_parallel_size = 1
self.pipeline_parallel_size = 2
strategy.hybrid_configs = {
"dp_degree": self.data_parallel_size,
"mp_degree": self.model_parallel_size,
"pp_degree": self.pipeline_parallel_size,
}
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size,
}
fleet.init(is_collective=True, strategy=strategy)
def test_pp_model(self):
print(f"pwd {os.getcwd()}")
hcg = fleet.get_hybrid_communicate_group()
word_size = hcg.get_model_parallel_world_size()
dp_id = hcg.get_data_parallel_rank()
pp_id = hcg.get_stage_id()
rank_id = dist.get_rank()
topology = hcg.topology()
set_random_seed(1024, dp_id, rank_id)
model = ModelPipe(topology, transformer_layer_num=transformer_layer_num)
scheduler = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True
)
optimizer = paddle.optimizer.SGD(
learning_rate=scheduler, parameters=model.parameters()
)
model = fleet.distributed_model(model)
optimizer = fleet.distributed_optimizer(optimizer)
output_dir = "{}/mp_00_sharding_00_pp_{:0>2d}".format(
"./pp_transformer_vp", pp_id
)
try:
os.makedirs(output_dir)
except:
# dir is already created, do nothing
pass
for step_id in range(2):
x_data = np.random.randint(0, vocab_size, size=[batch_size, length])
x = paddle.to_tensor(x_data)
x.stop_gradient = True
loss = model.train_batch([x, x], optimizer, scheduler)
paddle.save(
model.state_dict(),
os.path.join(output_dir, "model.pdparams"),
)
paddle.save(
optimizer.state_dict(),
os.path.join(output_dir, "model_state.pdopt"),
)
meta_dict = {
"epoch": 0,
"step": 2,
"cuda_rng_state": paddle.get_cuda_rng_state(),
}
paddle.save(meta_dict, os.path.join(output_dir, "meta_state.pdopt"))
if __name__ == "__main__":
unittest.main()
......@@ -120,11 +120,10 @@ class CriterionPipe(Layer):
class ModelPipe(PipelineLayer):
def __init__(self, topology):
def __init__(self, topology, transformer_layer_num: int = 8):
self.descs = []
self.descs.append(LayerDesc(EmbeddingPipe))
for x in range(8):
for x in range(transformer_layer_num):
self.descs.append(LayerDesc(TransformerNetPipe))
self.descs.append(lambda x: x[0])
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import unittest
from test_parallel_dygraph_dataparallel import TestMultipleGpus
import paddle
from paddle.distributed.fleet.utils.pp_parallel_adaptor import (
ParallelConfig,
PipeLineModelAdaptor,
adaptor_from_args,
parse_args,
)
class TestPPAdaptor(TestMultipleGpus):
def test_parse_args(self):
args = parse_args()
self.assertEqual(args.src_mp, args.dst_mp)
adaptor = adaptor_from_args(args)
self.assertTrue(adaptor is not None)
def test_hybrid_parallel_transformer_unbalanced_data(self):
print(f"pwd {os.getcwd()}")
self.run_mnist_2gpu('hybrid_parallel_pp_transformer_save.py')
self.run_mnist_2gpu(
'hybrid_parallel_pp_transformer_save_with_virtual_stage.py'
)
# test pp adaptor
dir1 = "./pp_transformer"
p_config1 = ParallelConfig(mp=1, pp=2, vpp=1, sharding=1)
dir2 = "./pp_transformer_vp"
p_config2 = ParallelConfig(mp=1, pp=2, vpp=2, sharding=1)
pp_to_vp = PipeLineModelAdaptor(
src_parallel_config=p_config1,
dst_parallel_config=p_config2,
transformer_layer_num=8,
segment_method="layer",
)
vp_to_pp = PipeLineModelAdaptor(
src_parallel_config=p_config2,
dst_parallel_config=p_config1,
transformer_layer_num=8,
segment_method="layer",
)
def check_converted_model(converted_model_dir, expected_model_dir):
# for compatibility, converted_model_dir may contain more key than
# expected model, which does not hinder model recovering
for i in range(p_config1.pp):
sub_converted_model_dir = (
"{}/mp_00_sharding_00_pp_{:0>2d}".format(
converted_model_dir, i
)
)
sub_expected_model_dir = (
"{}/mp_00_sharding_00_pp_{:0>2d}".format(
expected_model_dir, i
)
)
print(
f"converted_model_dir: {sub_converted_model_dir}; expected_model_dir: {sub_expected_model_dir}"
)
def check_names(dict_1, dict_2):
for (k, v) in dict_2.items():
self.assertTrue(k in dict_1)
self.assertEqual(
getattr(v, "name", ""),
getattr(dict_1[k], "name", ""),
)
# check param
params_1 = paddle.load(
f"{sub_converted_model_dir}/model.pdparams"
)
params_2 = paddle.load(
f"{sub_expected_model_dir}/model.pdparams"
)
check_names(params_1, params_2)
del params_1
del params_2
# check opt
opt_1 = paddle.load(
f"{sub_converted_model_dir}/model_state.pdopt"
)
opt_2 = paddle.load(
f"{sub_expected_model_dir}/model_state.pdopt"
)
check_names(opt_1, opt_2)
# check master wieghts
if "master_weights" in opt_2:
self.assertTrue("master_weights" in opt_1)
check_names(
opt_2["master_weights"], opt_1["master_weights"]
)
def create_dir_if_nonexist(dir: str):
if not os.path.exists(dir):
os.makedirs(dir)
# check pp to vp
tmp_dir1 = "./tmp_pp_to_vp"
create_dir_if_nonexist(tmp_dir1)
pp_to_vp.apply(dir1, tmp_dir1)
# browse the converted model
pp_to_vp.peek_model(tmp_dir1)
# check
check_converted_model(tmp_dir1, dir2)
# check vp to pp
tmp_dir2 = "./tmp_vp_to_pp"
create_dir_if_nonexist(tmp_dir2)
vp_to_pp.apply(dir2, tmp_dir2)
vp_to_pp.peek_model(tmp_dir2)
check_converted_model(tmp_dir2, dir1)
# check uniform segment
tmp_dir3 = "./tmp_vp_to_pp_uniform"
create_dir_if_nonexist(tmp_dir3)
vp_to_pp_uniform = PipeLineModelAdaptor(
src_parallel_config=p_config2,
dst_parallel_config=p_config1,
transformer_layer_num=8,
segment_method="uniform",
)
vp_to_pp_uniform.apply(dir2, tmp_dir3)
vp_to_pp_uniform.peek_model(tmp_dir3)
tmp_dir4 = "./tmp_pp_to_pp_uniform"
create_dir_if_nonexist(tmp_dir4)
pp_to_pp_uniform = PipeLineModelAdaptor(
src_parallel_config=p_config1,
dst_parallel_config=p_config1,
transformer_layer_num=8,
segment_method="uniform",
)
pp_to_pp_uniform.apply(dir1, tmp_dir4)
pp_to_pp_uniform.peek_model(tmp_dir4)
check_converted_model(tmp_dir3, tmp_dir4)
# rm dirs
for d in [dir1, dir2, tmp_dir1, tmp_dir2, tmp_dir3, tmp_dir4]:
shutil.rmtree(d, ignore_errors=True)
if __name__ == "__main__":
unittest.main()
......@@ -14,6 +14,7 @@ test_dygraph_sharding_stage3_for_eager,,,350,DIST,../../dist_test.sh,2,,http_pro
test_communicator_half_async,,,120,DIST,test_runner.py,2,,FLAGS_communicator_send_queue_size=1;FLAGS_communicator_max_merge_var_num=1;http_proxy=;https_proxy=;PYTHONPATH=../..,WITH_NCCL
test_parallel_dygraph_pipeline_parallel,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_parallel_dygraph_pipeline_parallel_with_virtual_stage,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_parallel_dygraph_pp_adaptor,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_localsgd_meta_optimizer,LINUX,GPU;XPU,,,test_runner.py,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_parallel_class_center_sample,,GPU,120,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,WITH_NCCL
test_pipeline,,,120,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
......
......@@ -1071,11 +1071,18 @@ def load(path, **configs):
# paddle2.0: paddle.save/load
if "StructuredToParameterName@@" in load_result:
for key in load_result["StructuredToParameterName@@"]:
for (key, name) in load_result[
"StructuredToParameterName@@"
].items():
if isinstance(load_result[key], np.ndarray):
load_result[key] = _ndarray_to_tensor(
load_result[key], config.return_numpy
)
# default name is "generatedxxx" which is set in Tensor init, if not set
if not config.return_numpy and getattr(
load_result[key], "name", ""
):
load_result[key].name = name
if (
not config.keep_name_table
......
......@@ -76,7 +76,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_pass_quantization MODULES test_pass_quantization)
set_tests_properties(test_pass_quantization PROPERTIES TIMEOUT 60)
py_test_modules(test_tuning_recompute MODULES test_tuning_recompute)
set_tests_properties(test_tuning_recompute PROPERTIES TIMEOUT 240)
set_tests_properties(test_tuning_recompute PROPERTIES TIMEOUT 300)
py_test_modules(test_fused_linear_pass MODULES test_fused_linear_pass)
set_tests_properties(test_fused_linear_pass PROPERTIES TIMEOUT 20)
py_test_modules(test_align_tool MODULES test_align_tool)
......
......@@ -29,6 +29,6 @@ endif()
foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
list(APPEND DIST_TEST_OPS ${TEST_OP})
set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 120)
set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 200)
set_tests_properties(${TEST_OP} PROPERTIES LABELS "RUN_TYPE=DIST")
endforeach()
......@@ -197,7 +197,7 @@ if(WITH_GPU AND TENSORRT_FOUND)
set_tests_properties(test_trt_fc_fuse_quant_dequant_pass PROPERTIES TIMEOUT
100)
set_tests_properties(test_trt_conv_quant_dequant_pass PROPERTIES TIMEOUT 100)
set_tests_properties(test_trt_matmul_quant_dequant PROPERTIES TIMEOUT 100)
set_tests_properties(test_trt_matmul_quant_dequant PROPERTIES TIMEOUT 180)
set_tests_properties(test_trt_conv3d_op PROPERTIES TIMEOUT 60)
set_tests_properties(test_trt_conv3d_transpose_op PROPERTIES TIMEOUT 60)
set_tests_properties(test_trt_nearest_interp_v2_op PROPERTIES TIMEOUT 30)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册