diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 0ffb1d9f881ba159cf6a96359ba4d1a7258103d7..1a09cf5394fba8f278a33e6aebd09326d1775144 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -267,7 +267,9 @@ def new_group(ranks=None, backend=None): # TODO(shenliang03): This is a temporary solution to solve the problem of # hang caused by cross-creation of new_group - tmp = fill_constant([0], dtype="int32", value="1") + tmp = paddle.to_tensor( + [1], dtype="int32") if in_dygraph_mode() else fill_constant( + [0], dtype="int32", value="1") paddle.distributed.all_reduce(tmp, use_calc_stream=True) paddle.distributed.wait(tmp) return gp diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index 04d8417fdcbf3f1ef23db09caf1cc417672b8358..850f3581421705cc9f1fb4756153bb7fd1869524 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -107,6 +107,11 @@ class CommunicateTopology(object): return all_result + def get_rank_from_stage(self, global_rank, **kwargs): + coord = self.get_coord(global_rank) + tf = coord._replace(**kwargs)._asdict() + return self.get_rank(**tf) + class HybridCommunicateGroup(object): def __init__(self, topology): @@ -254,7 +259,6 @@ class HybridCommunicateGroup(object): def get_check_parallel_group(self): return self._check_comm_group - def get_rank_from_stage(self, stage_id): - coord = self._topo.get_coord(self.global_rank) - tf = coord._replace(pipe=stage_id)._asdict() - return self._topo.get_rank(**tf) + def get_rank_from_stage(self, stage_id, **kwargs): + return self._topo.get_rank_from_stage( + self.global_rank, pipe=stage_id, **kwargs) diff --git a/python/paddle/distributed/fleet/meta_parallel/__init__.py b/python/paddle/distributed/fleet/meta_parallel/__init__.py index 0750c2c250e2bb90f7384d644661d6eb059bb22a..4e32ff5723c4181892620281e21232e7f2267f08 100644 --- a/python/paddle/distributed/fleet/meta_parallel/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/__init__.py @@ -17,6 +17,7 @@ from .parallel_layers import ColumnParallelLinear # noqa: F401 from .parallel_layers import RowParallelLinear # noqa: F401 from .parallel_layers import ParallelCrossEntropy # noqa: F401 from .parallel_layers import LayerDesc # noqa: F401 +from .parallel_layers import SharedLayerDesc # noqa: F401 from .parallel_layers import PipelineLayer # noqa: F401 from .parallel_layers import RNGStatesTracker # noqa: F401 from .parallel_layers import model_parallel_random_seed # noqa: F401 diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py index 72da962b8914eb2d5eb92e40960c60a4703c6d52..fd9778574907373d16512c0a12452dbb828402db 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py @@ -17,6 +17,7 @@ from .mp_layers import ColumnParallelLinear # noqa: F401 from .mp_layers import RowParallelLinear # noqa: F401 from .mp_layers import ParallelCrossEntropy # noqa: F401 from .pp_layers import LayerDesc # noqa: F401 +from .pp_layers import SharedLayerDesc # noqa: F401 from .pp_layers import PipelineLayer # noqa: F401 from .random import RNGStatesTracker # noqa: F401 from .random import model_parallel_random_seed # noqa: F401 diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 77be62ae6cf4b464b6bdc8a1e72f0b2f9c414e6c..b31b2939695b33e82b2339126df102fdeb93eeec 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -15,6 +15,7 @@ import math import paddle from paddle.fluid.dygraph.layers import Layer from ...utils.log_util import logger, layer_to_str +from functools import partial __all__ = [] @@ -58,6 +59,20 @@ class LayerDesc(object): **self.kwargs) +class SharedLayerDesc(LayerDesc): + def __init__(self, + key, + layer_func, + forward_func=None, + shared_weight_attr='weight', + *inputs, + **kwargs): + super(SharedLayerDesc, self).__init__(layer_func, *inputs, **kwargs) + self.layer_name = key + self.forward_func = forward_func + self.shared_weight_attr = shared_weight_attr + + class PipelineLayer(Layer): def __init__(self, layers, @@ -104,11 +119,86 @@ class PipelineLayer(Layer): self._start_pos = 0 self._end_pos = self._num_layers - 1 self._segment_network(seg_method) + self.shared_layers = paddle.nn.LayerDict() + self.shared_weight_attrs = {} # construct layer self.run_function = [] self._build_layer() + self.shared_comm = self._construct_shared_comm() + self._synchronize_shared_weights() + + def get_stage_from_index(self, layer_idx): + assert 0 <= layer_idx < self._num_layers, "layer_idx is out of bound" + for stage in range(self._topo.get_dim('pipe')): + if self.segment_parts[stage] <= layer_idx < self.segment_parts[stage + + 1]: + return stage + + def _construct_shared_comm(self): + shared_comm = {} + if self._topo.get_dim("pipe") == 1: + return + + layers_desc = self._layers_desc + shared_layer_names = set( + s.layer_name for s in layers_desc if isinstance(s, SharedLayerDesc)) + for key in shared_layer_names: + shared_layers = [] + for idx, layer in enumerate(layers_desc): + if isinstance(layer, + SharedLayerDesc) and layer.layer_name == key: + shared_layers.append(idx) + + shared_stages = set( + self.get_stage_from_index(idx) for idx in shared_layers) + self._dp_degree = self._topo.get_dim('data') + self._mp_degree = self._topo.get_dim('model') + + shared_ranks = [] + for dp in range(self._dp_degree): + for mp in range(self._mp_degree): + shared_ranks = [] + for s in sorted(shared_stages): + shared_ranks.append( + self._topo.get_rank_from_stage( + self.global_rank, pipe=s, data=dp, model=mp)) + + group = paddle.distributed.new_group(ranks=shared_ranks) + if self.global_rank in shared_ranks: + assert key in self.shared_layers + if key in self.shared_layers: + shared_comm[key] = { + 'ranks': shared_ranks, + 'group': group, + 'weight_attr': self.shared_weight_attrs[key], + 'layer': self.shared_layers[key], + } + return shared_comm + + def _synchronize_shared_weights(self): + for key, comm in self.shared_comm.items(): + with paddle.framework.no_grad(): + paddle.distributed.broadcast( + getattr(comm['layer'], comm['weight_attr']), + src=min(comm['ranks']), + group=comm['group']) + + def allreduce_shared_weight_gradients(self): + for key, comm in self.shared_comm.items(): + param = getattr(self.shared_layers[key], comm['weight_attr']) + # need use trace_op to allreduce weight + with paddle.framework.no_grad(): + paddle.fluid.framework._dygraph_tracer().trace_op( + type="c_allreduce_sum", + inputs={'X': param._grad_ivar()}, + outputs={'Out': param._grad_ivar()}, + attrs={ + 'ring_id': comm['group'].id, + 'use_calc_stream': True + }) + def _segment_network(self, seg_method): logger.info("start segment network..") seg = SegmentLayers( @@ -142,6 +232,21 @@ class PipelineLayer(Layer): if isinstance(layer, Layer): self.run_function.append(layer) self.add_sublayer(str(layer_index), layer) + elif isinstance(layer, SharedLayerDesc): + if layer.layer_name not in self.shared_layers: + self.shared_layers[layer.layer_name] = layer.build_layer() + self.shared_weight_attrs[ + layer.layer_name] = layer.shared_weight_attr + + if layer.forward_func is None: + self.run_function.append(self.shared_layers[ + layer.layer_name]) + + else: + self.run_function.append( + partial(layer.forward_func, self.shared_layers[ + layer.layer_name])) + elif isinstance(layer, LayerDesc): model = layer.build_layer() self.run_function.append(model) diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 54324b389336d0f423a79e177a04a0381d2779b9..0bb6315290ed724cc8494fbb656da352c5dcd468 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -138,6 +138,8 @@ class PipelineParallel(MetaParallelBase): self._backward(cache_id=backward_steps) backward_steps += 1 + self._layers.allreduce_shared_weight_gradients() + # optimizer self._step() self.train_loss = self._reduce_final_loss() diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py index 23dae3173869183fa460aa020f9544c07df60b3f..317eb14ad069e2ad76bce3a3de1f8f41f129697f 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py @@ -270,8 +270,8 @@ class TestDistTraning(unittest.TestCase): np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy()) def test_parallel_cross_entropy(self): - batch_size = 2 - seq_length = 1 + batch_size = 8 + seq_length = 16 class_size_per_card = 2 vocab_size = class_size_per_card * self.model_parallel_size seed = 1025 diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py new file mode 100644 index 0000000000000000000000000000000000000000..9253f737bf942934cee1f9f203b8b979a5af2c7b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py @@ -0,0 +1,233 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import unittest +import paddle +import numpy as np +import random +import paddle +import paddle.distributed as dist +import paddle.distributed.fleet as fleet +from paddle.fluid.dygraph.container import Sequential +from paddle.distributed.fleet.meta_parallel import PipelineLayer +from paddle.fluid.dygraph.layers import Layer +import paddle.nn as nn +import paddle.fluid as fluid +from paddle.distributed.fleet.meta_parallel import LayerDesc, SharedLayerDesc + + +def print_hook_fn(grad): + print(grad) + + +def set_random_seed(seed, dp_id, rank_id): + """Set random seed for reproducability.""" + random.seed(seed) + np.random.seed(seed + dp_id) + paddle.seed(seed + dp_id) + + +batch_size = 8 +micro_batch_size = 2 +vocab_size = 128 +hidden_size = 16 + + +class SimpleNet(Layer): + def __init__(self): + super(SimpleNet, self).__init__() + self.word_embeddings = nn.Embedding(vocab_size, hidden_size) + + self.softmax_weight = self.create_parameter( + shape=[hidden_size, vocab_size]) + self.softmax_bias = self.create_parameter( + shape=[vocab_size], is_bias=False) + + def forward(self, x1, x2, y1): + x_emb = self.word_embeddings(x1) + fc = fluid.layers.matmul(x_emb, self.softmax_weight) + fc = fluid.layers.elementwise_add(fc, self.softmax_bias) + projection = fluid.layers.reshape(fc, shape=[-1, vocab_size]) + + projection = paddle.matmul(projection, self.word_embeddings.weight) + + loss = fluid.layers.softmax_with_cross_entropy( + logits=projection, label=y1, soft_label=False) + return loss.mean() + + +class EmbeddingPipe(Layer): + def __init__(self): + super(EmbeddingPipe, self).__init__() + self.word_embeddings = nn.Embedding(vocab_size, hidden_size) + + @property + def embedding_weight(self): + return self.word_embeddings.weight + + def forward(self, args): + x1, x2 = args + x_emb = self.word_embeddings(x1) + return x_emb, x2 + + +class MatmulNet(Layer): + def __init__(self): + super(MatmulNet, self).__init__() + self.softmax_weight = self.create_parameter( + shape=[hidden_size, vocab_size]) + + def forward(self, args): + x1, x2 = args + fc = fluid.layers.matmul(x1, self.softmax_weight) + + return fc, x2 + + +class BiasNet(Layer): + def __init__(self): + super(BiasNet, self).__init__() + self.softmax_bias = self.create_parameter(shape=[vocab_size]) + + def forward(self, args): + fc, x2 = args + fc = fluid.layers.elementwise_add(fc, self.softmax_bias) + projection = fluid.layers.reshape(fc, shape=[-1, vocab_size]) + return projection, x2 + + +class LossNet(Layer): + def __init__(self): + super(LossNet, self).__init__() + + def forward(self, args, y1): + projection = args + loss = fluid.layers.softmax_with_cross_entropy( + logits=projection, label=y1[0], soft_label=False) + return loss.mean() + + +class SimpleNetPipe(PipelineLayer): + def __init__(self, **kwargs): + self.descs = [] + self.descs.append( + SharedLayerDesc( + 'embed', EmbeddingPipe, shared_weight_attr='embedding_weight')) + self.descs.append(LayerDesc(MatmulNet)) + + self.descs.append(LayerDesc(BiasNet)) + + def _logits_helper(embedding, output): + return paddle.matmul(output[0], embedding.embedding_weight) + + self.descs.append( + SharedLayerDesc( + 'embed', + EmbeddingPipe, + forward_func=_logits_helper, + shared_weight_attr='embedding_weight')) + + super(SimpleNetPipe, self).__init__( + layers=self.descs, loss_fn=LossNet(), **kwargs) + + +class TestDistEmbeddingTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + set_random_seed(1024, dp_id, rank_id) + + #construct model a + model_a = SimpleNet() + scheduler_a = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True) + optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a, + parameters=model_a.parameters()) + + model_b = SimpleNetPipe(topology=hcg.topology()) + + scheduler_b = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True) + optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b, + parameters=model_b.parameters()) + model_b = fleet.distributed_model(model_b) + optimizer_b = fleet.distributed_optimizer(optimizer_b) + + param_len = len(model_a.parameters()) + + parameters = [] + for param in model_a.parameters(): + parameters.append(param.numpy()) + + model_b_params = model_b.parameters() + + if pp_id == 0: + model_b_params[0].set_value(parameters[2]) + model_b_params[1].set_value(parameters[0]) + + else: + model_b_params[0].set_value(parameters[2]) + model_b_params[1].set_value(parameters[1]) + + for step in range(5): + x1_data = np.random.randint(0, vocab_size, size=[batch_size, 1]) + x2_data = np.random.randint(0, vocab_size, size=[batch_size, 1]) + y1_data = np.random.randint(0, hidden_size, size=[batch_size, 1]) + + x1 = paddle.to_tensor(x1_data) + x2 = paddle.to_tensor(x2_data) + y1 = paddle.to_tensor(y1_data) + + x1.stop_gradient = True + x2.stop_gradient = True + y1.stop_gradient = True + + loss_a = model_a(x1, x2, y1) + loss_a.backward() + + optimizer_a.step() + optimizer_a.clear_grad() + scheduler_a.step() + + loss_b = model_b.train_batch([(x1, x2), (y1, )], optimizer_b, + scheduler_b) + + print("loss", loss_a.numpy(), loss_b.numpy()) + np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy()) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py index 1d06e168208b279ccfba753452b1a82e5034975f..ef8ee2e4ad4454d7cdef9350c53a19772a800f70 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py @@ -27,6 +27,9 @@ class TestHybridPipeParallel(TestMultipleGpus): def test_hybrid_parallel_pp_tuple_inputs(self): self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py') + def test_hybrid_parallel_pp_tuple_inputs(self): + self.run_mnist_2gpu('hybrid_parallel_shared_weight.py') + if __name__ == "__main__": unittest.main()