未验证 提交 294dfd23 编写于 作者: S ShenLiang 提交者: GitHub

[HybridParallel]Add SharedLayerDesc for PipelineParallel (#33578)

* add pplayer

* add sharedlayerdesc
上级 07197fb9
...@@ -267,7 +267,9 @@ def new_group(ranks=None, backend=None): ...@@ -267,7 +267,9 @@ def new_group(ranks=None, backend=None):
# TODO(shenliang03): This is a temporary solution to solve the problem of # TODO(shenliang03): This is a temporary solution to solve the problem of
# hang caused by cross-creation of new_group # hang caused by cross-creation of new_group
tmp = fill_constant([0], dtype="int32", value="1") tmp = paddle.to_tensor(
[1], dtype="int32") if in_dygraph_mode() else fill_constant(
[0], dtype="int32", value="1")
paddle.distributed.all_reduce(tmp, use_calc_stream=True) paddle.distributed.all_reduce(tmp, use_calc_stream=True)
paddle.distributed.wait(tmp) paddle.distributed.wait(tmp)
return gp return gp
......
...@@ -107,6 +107,11 @@ class CommunicateTopology(object): ...@@ -107,6 +107,11 @@ class CommunicateTopology(object):
return all_result return all_result
def get_rank_from_stage(self, global_rank, **kwargs):
coord = self.get_coord(global_rank)
tf = coord._replace(**kwargs)._asdict()
return self.get_rank(**tf)
class HybridCommunicateGroup(object): class HybridCommunicateGroup(object):
def __init__(self, topology): def __init__(self, topology):
...@@ -254,7 +259,6 @@ class HybridCommunicateGroup(object): ...@@ -254,7 +259,6 @@ class HybridCommunicateGroup(object):
def get_check_parallel_group(self): def get_check_parallel_group(self):
return self._check_comm_group return self._check_comm_group
def get_rank_from_stage(self, stage_id): def get_rank_from_stage(self, stage_id, **kwargs):
coord = self._topo.get_coord(self.global_rank) return self._topo.get_rank_from_stage(
tf = coord._replace(pipe=stage_id)._asdict() self.global_rank, pipe=stage_id, **kwargs)
return self._topo.get_rank(**tf)
...@@ -17,6 +17,7 @@ from .parallel_layers import ColumnParallelLinear # noqa: F401 ...@@ -17,6 +17,7 @@ from .parallel_layers import ColumnParallelLinear # noqa: F401
from .parallel_layers import RowParallelLinear # noqa: F401 from .parallel_layers import RowParallelLinear # noqa: F401
from .parallel_layers import ParallelCrossEntropy # noqa: F401 from .parallel_layers import ParallelCrossEntropy # noqa: F401
from .parallel_layers import LayerDesc # noqa: F401 from .parallel_layers import LayerDesc # noqa: F401
from .parallel_layers import SharedLayerDesc # noqa: F401
from .parallel_layers import PipelineLayer # noqa: F401 from .parallel_layers import PipelineLayer # noqa: F401
from .parallel_layers import RNGStatesTracker # noqa: F401 from .parallel_layers import RNGStatesTracker # noqa: F401
from .parallel_layers import model_parallel_random_seed # noqa: F401 from .parallel_layers import model_parallel_random_seed # noqa: F401
......
...@@ -17,6 +17,7 @@ from .mp_layers import ColumnParallelLinear # noqa: F401 ...@@ -17,6 +17,7 @@ from .mp_layers import ColumnParallelLinear # noqa: F401
from .mp_layers import RowParallelLinear # noqa: F401 from .mp_layers import RowParallelLinear # noqa: F401
from .mp_layers import ParallelCrossEntropy # noqa: F401 from .mp_layers import ParallelCrossEntropy # noqa: F401
from .pp_layers import LayerDesc # noqa: F401 from .pp_layers import LayerDesc # noqa: F401
from .pp_layers import SharedLayerDesc # noqa: F401
from .pp_layers import PipelineLayer # noqa: F401 from .pp_layers import PipelineLayer # noqa: F401
from .random import RNGStatesTracker # noqa: F401 from .random import RNGStatesTracker # noqa: F401
from .random import model_parallel_random_seed # noqa: F401 from .random import model_parallel_random_seed # noqa: F401
......
...@@ -15,6 +15,7 @@ import math ...@@ -15,6 +15,7 @@ import math
import paddle import paddle
from paddle.fluid.dygraph.layers import Layer from paddle.fluid.dygraph.layers import Layer
from ...utils.log_util import logger, layer_to_str from ...utils.log_util import logger, layer_to_str
from functools import partial
__all__ = [] __all__ = []
...@@ -58,6 +59,20 @@ class LayerDesc(object): ...@@ -58,6 +59,20 @@ class LayerDesc(object):
**self.kwargs) **self.kwargs)
class SharedLayerDesc(LayerDesc):
def __init__(self,
key,
layer_func,
forward_func=None,
shared_weight_attr='weight',
*inputs,
**kwargs):
super(SharedLayerDesc, self).__init__(layer_func, *inputs, **kwargs)
self.layer_name = key
self.forward_func = forward_func
self.shared_weight_attr = shared_weight_attr
class PipelineLayer(Layer): class PipelineLayer(Layer):
def __init__(self, def __init__(self,
layers, layers,
...@@ -104,11 +119,86 @@ class PipelineLayer(Layer): ...@@ -104,11 +119,86 @@ class PipelineLayer(Layer):
self._start_pos = 0 self._start_pos = 0
self._end_pos = self._num_layers - 1 self._end_pos = self._num_layers - 1
self._segment_network(seg_method) self._segment_network(seg_method)
self.shared_layers = paddle.nn.LayerDict()
self.shared_weight_attrs = {}
# construct layer # construct layer
self.run_function = [] self.run_function = []
self._build_layer() self._build_layer()
self.shared_comm = self._construct_shared_comm()
self._synchronize_shared_weights()
def get_stage_from_index(self, layer_idx):
assert 0 <= layer_idx < self._num_layers, "layer_idx is out of bound"
for stage in range(self._topo.get_dim('pipe')):
if self.segment_parts[stage] <= layer_idx < self.segment_parts[stage
+ 1]:
return stage
def _construct_shared_comm(self):
shared_comm = {}
if self._topo.get_dim("pipe") == 1:
return
layers_desc = self._layers_desc
shared_layer_names = set(
s.layer_name for s in layers_desc if isinstance(s, SharedLayerDesc))
for key in shared_layer_names:
shared_layers = []
for idx, layer in enumerate(layers_desc):
if isinstance(layer,
SharedLayerDesc) and layer.layer_name == key:
shared_layers.append(idx)
shared_stages = set(
self.get_stage_from_index(idx) for idx in shared_layers)
self._dp_degree = self._topo.get_dim('data')
self._mp_degree = self._topo.get_dim('model')
shared_ranks = []
for dp in range(self._dp_degree):
for mp in range(self._mp_degree):
shared_ranks = []
for s in sorted(shared_stages):
shared_ranks.append(
self._topo.get_rank_from_stage(
self.global_rank, pipe=s, data=dp, model=mp))
group = paddle.distributed.new_group(ranks=shared_ranks)
if self.global_rank in shared_ranks:
assert key in self.shared_layers
if key in self.shared_layers:
shared_comm[key] = {
'ranks': shared_ranks,
'group': group,
'weight_attr': self.shared_weight_attrs[key],
'layer': self.shared_layers[key],
}
return shared_comm
def _synchronize_shared_weights(self):
for key, comm in self.shared_comm.items():
with paddle.framework.no_grad():
paddle.distributed.broadcast(
getattr(comm['layer'], comm['weight_attr']),
src=min(comm['ranks']),
group=comm['group'])
def allreduce_shared_weight_gradients(self):
for key, comm in self.shared_comm.items():
param = getattr(self.shared_layers[key], comm['weight_attr'])
# need use trace_op to allreduce weight
with paddle.framework.no_grad():
paddle.fluid.framework._dygraph_tracer().trace_op(
type="c_allreduce_sum",
inputs={'X': param._grad_ivar()},
outputs={'Out': param._grad_ivar()},
attrs={
'ring_id': comm['group'].id,
'use_calc_stream': True
})
def _segment_network(self, seg_method): def _segment_network(self, seg_method):
logger.info("start segment network..") logger.info("start segment network..")
seg = SegmentLayers( seg = SegmentLayers(
...@@ -142,6 +232,21 @@ class PipelineLayer(Layer): ...@@ -142,6 +232,21 @@ class PipelineLayer(Layer):
if isinstance(layer, Layer): if isinstance(layer, Layer):
self.run_function.append(layer) self.run_function.append(layer)
self.add_sublayer(str(layer_index), layer) self.add_sublayer(str(layer_index), layer)
elif isinstance(layer, SharedLayerDesc):
if layer.layer_name not in self.shared_layers:
self.shared_layers[layer.layer_name] = layer.build_layer()
self.shared_weight_attrs[
layer.layer_name] = layer.shared_weight_attr
if layer.forward_func is None:
self.run_function.append(self.shared_layers[
layer.layer_name])
else:
self.run_function.append(
partial(layer.forward_func, self.shared_layers[
layer.layer_name]))
elif isinstance(layer, LayerDesc): elif isinstance(layer, LayerDesc):
model = layer.build_layer() model = layer.build_layer()
self.run_function.append(model) self.run_function.append(model)
......
...@@ -138,6 +138,8 @@ class PipelineParallel(MetaParallelBase): ...@@ -138,6 +138,8 @@ class PipelineParallel(MetaParallelBase):
self._backward(cache_id=backward_steps) self._backward(cache_id=backward_steps)
backward_steps += 1 backward_steps += 1
self._layers.allreduce_shared_weight_gradients()
# optimizer # optimizer
self._step() self._step()
self.train_loss = self._reduce_final_loss() self.train_loss = self._reduce_final_loss()
......
...@@ -270,8 +270,8 @@ class TestDistTraning(unittest.TestCase): ...@@ -270,8 +270,8 @@ class TestDistTraning(unittest.TestCase):
np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy()) np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy())
def test_parallel_cross_entropy(self): def test_parallel_cross_entropy(self):
batch_size = 2 batch_size = 8
seq_length = 1 seq_length = 16
class_size_per_card = 2 class_size_per_card = 2
vocab_size = class_size_per_card * self.model_parallel_size vocab_size = class_size_per_card * self.model_parallel_size
seed = 1025 seed = 1025
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import unittest
import paddle
import numpy as np
import random
import paddle
import paddle.distributed as dist
import paddle.distributed.fleet as fleet
from paddle.fluid.dygraph.container import Sequential
from paddle.distributed.fleet.meta_parallel import PipelineLayer
from paddle.fluid.dygraph.layers import Layer
import paddle.nn as nn
import paddle.fluid as fluid
from paddle.distributed.fleet.meta_parallel import LayerDesc, SharedLayerDesc
def print_hook_fn(grad):
print(grad)
def set_random_seed(seed, dp_id, rank_id):
"""Set random seed for reproducability."""
random.seed(seed)
np.random.seed(seed + dp_id)
paddle.seed(seed + dp_id)
batch_size = 8
micro_batch_size = 2
vocab_size = 128
hidden_size = 16
class SimpleNet(Layer):
def __init__(self):
super(SimpleNet, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
self.softmax_weight = self.create_parameter(
shape=[hidden_size, vocab_size])
self.softmax_bias = self.create_parameter(
shape=[vocab_size], is_bias=False)
def forward(self, x1, x2, y1):
x_emb = self.word_embeddings(x1)
fc = fluid.layers.matmul(x_emb, self.softmax_weight)
fc = fluid.layers.elementwise_add(fc, self.softmax_bias)
projection = fluid.layers.reshape(fc, shape=[-1, vocab_size])
projection = paddle.matmul(projection, self.word_embeddings.weight)
loss = fluid.layers.softmax_with_cross_entropy(
logits=projection, label=y1, soft_label=False)
return loss.mean()
class EmbeddingPipe(Layer):
def __init__(self):
super(EmbeddingPipe, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
@property
def embedding_weight(self):
return self.word_embeddings.weight
def forward(self, args):
x1, x2 = args
x_emb = self.word_embeddings(x1)
return x_emb, x2
class MatmulNet(Layer):
def __init__(self):
super(MatmulNet, self).__init__()
self.softmax_weight = self.create_parameter(
shape=[hidden_size, vocab_size])
def forward(self, args):
x1, x2 = args
fc = fluid.layers.matmul(x1, self.softmax_weight)
return fc, x2
class BiasNet(Layer):
def __init__(self):
super(BiasNet, self).__init__()
self.softmax_bias = self.create_parameter(shape=[vocab_size])
def forward(self, args):
fc, x2 = args
fc = fluid.layers.elementwise_add(fc, self.softmax_bias)
projection = fluid.layers.reshape(fc, shape=[-1, vocab_size])
return projection, x2
class LossNet(Layer):
def __init__(self):
super(LossNet, self).__init__()
def forward(self, args, y1):
projection = args
loss = fluid.layers.softmax_with_cross_entropy(
logits=projection, label=y1[0], soft_label=False)
return loss.mean()
class SimpleNetPipe(PipelineLayer):
def __init__(self, **kwargs):
self.descs = []
self.descs.append(
SharedLayerDesc(
'embed', EmbeddingPipe, shared_weight_attr='embedding_weight'))
self.descs.append(LayerDesc(MatmulNet))
self.descs.append(LayerDesc(BiasNet))
def _logits_helper(embedding, output):
return paddle.matmul(output[0], embedding.embedding_weight)
self.descs.append(
SharedLayerDesc(
'embed',
EmbeddingPipe,
forward_func=_logits_helper,
shared_weight_attr='embedding_weight'))
super(SimpleNetPipe, self).__init__(
layers=self.descs, loss_fn=LossNet(), **kwargs)
class TestDistEmbeddingTraning(unittest.TestCase):
def setUp(self):
strategy = fleet.DistributedStrategy()
self.model_parallel_size = 1
self.data_parallel_size = 1
self.pipeline_parallel_size = 2
strategy.hybrid_configs = {
"dp_degree": self.data_parallel_size,
"mp_degree": self.model_parallel_size,
"pp_degree": self.pipeline_parallel_size,
}
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size
}
fleet.init(is_collective=True, strategy=strategy)
def test_pp_model(self):
hcg = fleet.get_hybrid_communicate_group()
word_size = hcg.get_model_parallel_world_size()
dp_id = hcg.get_data_parallel_rank()
pp_id = hcg.get_stage_id()
rank_id = dist.get_rank()
set_random_seed(1024, dp_id, rank_id)
#construct model a
model_a = SimpleNet()
scheduler_a = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True)
optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a,
parameters=model_a.parameters())
model_b = SimpleNetPipe(topology=hcg.topology())
scheduler_b = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True)
optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b,
parameters=model_b.parameters())
model_b = fleet.distributed_model(model_b)
optimizer_b = fleet.distributed_optimizer(optimizer_b)
param_len = len(model_a.parameters())
parameters = []
for param in model_a.parameters():
parameters.append(param.numpy())
model_b_params = model_b.parameters()
if pp_id == 0:
model_b_params[0].set_value(parameters[2])
model_b_params[1].set_value(parameters[0])
else:
model_b_params[0].set_value(parameters[2])
model_b_params[1].set_value(parameters[1])
for step in range(5):
x1_data = np.random.randint(0, vocab_size, size=[batch_size, 1])
x2_data = np.random.randint(0, vocab_size, size=[batch_size, 1])
y1_data = np.random.randint(0, hidden_size, size=[batch_size, 1])
x1 = paddle.to_tensor(x1_data)
x2 = paddle.to_tensor(x2_data)
y1 = paddle.to_tensor(y1_data)
x1.stop_gradient = True
x2.stop_gradient = True
y1.stop_gradient = True
loss_a = model_a(x1, x2, y1)
loss_a.backward()
optimizer_a.step()
optimizer_a.clear_grad()
scheduler_a.step()
loss_b = model_b.train_batch([(x1, x2), (y1, )], optimizer_b,
scheduler_b)
print("loss", loss_a.numpy(), loss_b.numpy())
np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy())
if __name__ == "__main__":
unittest.main()
...@@ -27,6 +27,9 @@ class TestHybridPipeParallel(TestMultipleGpus): ...@@ -27,6 +27,9 @@ class TestHybridPipeParallel(TestMultipleGpus):
def test_hybrid_parallel_pp_tuple_inputs(self): def test_hybrid_parallel_pp_tuple_inputs(self):
self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py') self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py')
def test_hybrid_parallel_pp_tuple_inputs(self):
self.run_mnist_2gpu('hybrid_parallel_shared_weight.py')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册