diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 6a96a102e1433ddc6eb9e74128edf824a75cf36c..403a02496afaab15d548ed18bf88b2db9e50f7b3 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -21,7 +21,7 @@ from .dataset import * from .data_generator import MultiSlotDataGenerator, MultiSlotStringDataGenerator from . import metrics from .base.topology import CommunicateTopology, HybridCommunicateGroup -from .meta_parallel import random, layers +from .meta_parallel import * __all__ = [ "DistributedStrategy", "UtilBase", "UserDefinedRoleMaker", diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index da0fe8aee60e8b3fa82b8165ed0a0f2234e9c3a9..8f38ba447fcb3d59a2c609dacff7c921f01935fd 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -120,6 +120,7 @@ class HybridCommunicateGroup(object): self._data_parallel_id = self._get_data_parallel_id() self._model_parallel_id = self._get_model_parallel_id() + self.stage_id = self._get_pipe_parallel_id() assert self._check_vaild_topo( ), "Here is an unreasonable topogy setting. world_size: {}, but" \ @@ -132,15 +133,22 @@ class HybridCommunicateGroup(object): # create comm group for model parallel self._mp_group, self._mp_comm_group = self._set_comm_group("model") + # create comm group for pipe parallel + self._pp_group, self._pp_comm_group = self._set_comm_group("pipe") + # create global group for check inf_nan / clip global norm self._check_group, self._check_comm_group = self._set_check_group( "data") + # create p2p group + self.is_first_stage = (self.stage_id == 0) + self.is_last_stage = (self.stage_id == (self._pp_degree - 1)) + debug_str = "HybridParallelInfo: rank_id: %d, dp_degree: %d, " \ - "mp_degree: %d, pp_degree: %d\n" % (self.global_rank, self._dp_degree, + "mp_degree: %d, pp_degree: %d" % (self.global_rank, self._dp_degree, self._mp_degree,self._pp_degree) - debug_str += "dp_group: %s, mp_group: %s, check/clip group: %s" % ( - self._dp_group, self._mp_group, self._check_group) + debug_str += "dp_group: %s, mp_group: %s, pp_group: %s, check/clip group: %s" % ( + self._dp_group, self._mp_group, self._pp_group, self._check_group) logger.info(debug_str) global _HYBRID_PARALLEL_GROUP @@ -229,6 +237,19 @@ class HybridCommunicateGroup(object): def get_model_parallel_group_src_rank(self): return self._mp_comm_group.ranks[0] + # pipeline parallel message + def _get_pipe_parallel_id(self): + return self._topo.get_coord(self.global_rank).pipe + + def get_stage_id(self): + return self.stage_id + + def get_pipe_parallel_world_size(self): + return self._pp_degree + + def get_pipe_parallel_group(self): + return self._pp_comm_group + # check parallel group def get_check_parallel_group(self): return self._check_comm_group diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py index 97b0e7306c13d5b28a11f32440362615a921d394..11bb897a678b7ca46359af37cf150d14ba67fde2 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py @@ -66,7 +66,7 @@ class HybridParallelGradScaler: self._found_inf) # allreduce_max found_inf in check_group if self._is_mp: - self._found_inf = paddle.cast(self._found_inf, dtype="int64") + self._found_inf = paddle.cast(self._found_inf, dtype="int32") paddle.distributed.all_reduce( self._found_inf, op=paddle.distributed.ReduceOp.MAX, diff --git a/python/paddle/distributed/fleet/meta_parallel/__init__.py b/python/paddle/distributed/fleet/meta_parallel/__init__.py index 7ecb97bf8234ab56e01c7fae1d46c5e77f9815ea..81fb9a6ea6d4e365b999d66f0ff0e97b6c8ac56d 100644 --- a/python/paddle/distributed/fleet/meta_parallel/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .mp_utils import * +from .parallel_layers import * from .model_parallel import ModelParallel diff --git a/python/paddle/distributed/fleet/meta_parallel/mp_utils/__init__.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py similarity index 92% rename from python/paddle/distributed/fleet/meta_parallel/mp_utils/__init__.py rename to python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py index a7da28700bceb0dc294ddbb7273b8fb1453f16b8..c4ec61e84ffa5c07aeb4d12784ebdc03ed2d8050 100644 --- a/python/paddle/distributed/fleet/meta_parallel/mp_utils/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py @@ -12,5 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .layers import * +from .mp_layers import * +from .pp_layers import * from .random import * diff --git a/python/paddle/distributed/fleet/meta_parallel/mp_utils/layers_help.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/layers_help.py similarity index 100% rename from python/paddle/distributed/fleet/meta_parallel/mp_utils/layers_help.py rename to python/paddle/distributed/fleet/meta_parallel/parallel_layers/layers_help.py diff --git a/python/paddle/distributed/fleet/meta_parallel/mp_utils/layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py similarity index 100% rename from python/paddle/distributed/fleet/meta_parallel/mp_utils/layers.py rename to python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py new file mode 100644 index 0000000000000000000000000000000000000000..e2db689eb76740b385a1e3340bc802a2425c0946 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -0,0 +1,156 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import math +import paddle +from paddle.fluid.dygraph.layers import Layer +from ...utils.log_util import logger, layer_to_str + +__all__ = ['LayerDesc', 'PipelineLayer'] + + +class SegmentLayers(object): + def __init__(self, layers_desc, num_parts, method="uniform"): + self._layers_desc = layers_desc + self.method = method + self.num_parts = num_parts + self.num_items = len(layers_desc) + assert self.num_items >= self.num_parts, "layer number should be greater than number of segments" + + def do_segment(self): + if self.method == "uniform": + return self.uniform(self.num_items, self.num_parts) + + def uniform(self, num_items, num_parts): + result = [0 for _ in range(num_parts + 1)] + part_size = math.floor(num_items / num_parts) + for i in range(num_parts): + result[i] = int(min(part_size * i, num_items)) + result[num_parts] = num_items + return result + + +class LayerDesc(object): + def __init__(self, layer_func, *inputs, **kwargs): + self.layer_func = layer_func + self.inputs = inputs + self.kwargs = kwargs + + if not issubclass(layer_func, Layer): + raise TypeError( + "The input(layer_func) should be a derived class of Layer.") + + def build_layer(self): + return self.layer_func(*self.inputs, **self.kwargs) + + def __repr__(self): + return layer_to_str(self.layer_func.__name__, *self.inputs, + **self.kwargs) + + +class PipelineLayer(Layer): + def __init__(self, + layers, + num_stages=None, + topology=None, + loss_fn=None, + seg_method="uniform"): + super(PipelineLayer, self).__init__() + if num_stages is None and topology is None: + raise ValueError("should provide num_stages or topology") + + # lazy import + import paddle.distributed as dist + from paddle.distributed import fleet + + self.device_id = dist.ParallelEnv().device_id + self.layers = layers + self._loss_fn = loss_fn + self._topo = topology + word_size = dist.get_world_size() + self.global_rank = dist.get_rank() + + if self._topo: + self._stage_id = self._topo.get_coord(self.global_rank).pipe + self._num_stages = self._topo.get_dim_size("pipe") + if num_stages: + assert self._num_stages == num_stages, "num_stages should be equal to be %d" % ( + self._num_stages) + else: + # construct default topology + if word_size % num_stages != 0: + raise ValueError("should provide correct num_stages({}) " + "which can be divided by word_size({})".format( + num_stages, word_size)) + dp_num = word_size // num_stages + self._topo = fleet.CommunicateTopology(["data", "pipe", "model"], + [dp_num, num_stages, 1]) + self._stage_id = self._topo.get_coord(self.global_rank).pipe + self._num_stages = self._topo.get_dim_size("pipe") + + # initialize segment + self._layers_desc = list(self.layers) + self._num_layers = len(self._layers_desc) + self._start_pos = 0 + self._end_pos = self._num_layers - 1 + self._segment_network(seg_method) + + # construct layer + self.run_function = [] + self._build_layer() + self.to(paddle.CUDAPlace(self.device_id)) + + def _segment_network(self, seg_method): + logger.info("start segment network..") + seg = SegmentLayers( + self._layers_desc, num_parts=self._num_stages, method=seg_method) + self.segment_parts = seg.do_segment() + + self._start_pos = self.segment_parts[self._stage_id] + self._end_pos = self.segment_parts[self._stage_id + 1] + + # print information for debug + for stage in range(self._num_stages): + start = self.segment_parts[stage] + end = self.segment_parts[stage + 1] + logger.info("stage={}, global_rank={} ,layer_number={}".format( + stage, self.global_rank, end - start)) + + for index, layer in enumerate(self._layers_desc[start:end]): + logger.info("{}: {}".format(index + start, str(layer))) + + if self._loss_fn: + try: + logger.info("loss: {}".format(self._loss_fn.__name__)) + except AttributeError: + logger.info("loss: {}".format(self._loss_fn.__class__.__name__)) + + def _build_layer(self): + start = self._start_pos + end = self._end_pos + for index, layer in enumerate(self._layers_desc[start:end]): + layer_index = start + index + if isinstance(layer, Layer): + self.run_function.append(layer) + self.add_sublayer(str(layer_index), layer) + elif isinstance(layer, LayerDesc): + model = layer.build_layer() + self.run_function.append(model) + self.add_sublayer(str(layer_index), model) + else: + self.run_function.append(layer) + + def forward(self, input): + for layer in self.run_function: + input = layer(input) + return input diff --git a/python/paddle/distributed/fleet/meta_parallel/mp_utils/random.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py similarity index 100% rename from python/paddle/distributed/fleet/meta_parallel/mp_utils/random.py rename to python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py diff --git a/python/paddle/distributed/fleet/utils/log_util.py b/python/paddle/distributed/fleet/utils/log_util.py index 906891961dd455f7dae41ad03432a2bc385da240..12c0bf699c1e607d60a6441e45ee222a6030ce00 100644 --- a/python/paddle/distributed/fleet/utils/log_util.py +++ b/python/paddle/distributed/fleet/utils/log_util.py @@ -36,3 +36,16 @@ class LoggerFactory: logger = LoggerFactory.build_logger(name="HybridParallel", level=logging.INFO) + + +def layer_to_str(base, *args, **kwargs): + name = base + "(" + if args: + name += ", ".join(str(arg) for arg in args) + if kwargs: + name += ", " + if kwargs: + name += ", ".join("{}={}".format(key, str(value)) + for key, value in kwargs.items()) + name += ")" + return name diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 89950fd62e2648ed8a07dba7eb546eedf3bdf0c6..2e68dd899ee7be0a92b005e4626440e977f57226 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -22,7 +22,7 @@ list(APPEND DIST_TEST_OPS test_gen_nccl_id_op) list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables) list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow) list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel) -#list(APPEND DIST_TEST_OPS test_parallel_dygraph_hybrid_parallel) +list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_layer) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -173,6 +173,7 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM)) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel) + list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_layer) LIST(REMOVE_ITEM TEST_OPS test_imperative_auto_mixed_precision) LIST(REMOVE_ITEM TEST_OPS test_fleet_base_single) elseif(WITH_GPU) @@ -857,7 +858,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120) - #set_tests_properties(test_parallel_dygraph_hybrid_parallel PROPERTIES TIMEOUT 200 LABELS "RUN_TYPE=DIST") + set_tests_properties(test_parallel_dygraph_pipeline_layer PROPERTIES TIMEOUT 120) if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212) set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_transformer PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer.py new file mode 100644 index 0000000000000000000000000000000000000000..3130cbf458467acfc70d38a438aa845c40584469 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer.py @@ -0,0 +1,148 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import os +import paddle +from paddle.distributed import fleet +import copy +from paddle.fluid.dygraph.container import Sequential +import paddle.nn as nn +from paddle.fluid.dygraph.layers import Layer +from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer +import paddle.nn.functional as F +import unittest + + +class AlexNet(Layer): + def __init__(self, num_classes=10): + super(AlexNet, self).__init__() + self.features = Sequential( + nn.Conv2D( + 3, 64, kernel_size=11, stride=4, padding=5), + nn.ReLU(), + nn.MaxPool2D( + kernel_size=2, stride=2), + nn.Conv2D( + 64, 192, kernel_size=5, padding=2), + nn.ReLU(), + nn.MaxPool2D( + kernel_size=2, stride=2), + nn.Conv2D( + 192, 384, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2D( + 384, 256, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2D( + 256, 256, kernel_size=3, padding=1), + nn.ReLU(), + nn.MaxPool2D( + kernel_size=2, stride=2), ) + self.classifier = nn.Linear(256, num_classes) + self.loss_fn = nn.loss.CrossEntropyLoss() + + def forward(self, x, y): + x = self.features(x) + x.flatten() + + x = self.classifier(x) + return self.loss_fn(x, y) + + +class AlexNetPipe(AlexNet): + def to_layers(self): + feat = [self.features[i] for i in range(len(self.features))] + loss_fn = [lambda x: x.flatten(), self.classifier] + feat.extend(loss_fn) + return feat + + +class AlexNetPipeDesc(PipelineLayer): + def __init__(self, num_classes=10, **kwargs): + self.num_classes = num_classes + decs = [ + LayerDesc( + nn.Conv2D, 3, 64, kernel_size=11, stride=4, padding=5), + LayerDesc(nn.ReLU), + LayerDesc( + nn.MaxPool2D, kernel_size=2, stride=2), + LayerDesc( + nn.Conv2D, 64, 192, kernel_size=5, padding=2), + F.relu, + LayerDesc( + nn.MaxPool2D, kernel_size=2, stride=2), + LayerDesc( + nn.Conv2D, 192, 384, kernel_size=3, padding=1), + F.relu, + LayerDesc( + nn.Conv2D, 384, 256, kernel_size=3, padding=1), + F.relu, + LayerDesc( + nn.Conv2D, 256, 256, kernel_size=3, padding=1), + F.relu, + LayerDesc( + nn.MaxPool2D, kernel_size=2, stride=2), + lambda x: x.flatten(), + LayerDesc(nn.Linear, 256, self.num_classes), # classifier + ] + super(AlexNetPipeDesc, self).__init__( + layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs) + + +class TestPipeLayerAPI(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": 1, + "mp_degree": 1, + "pp_degree": self.model_parallel_size + } + fleet.init(is_collective=True, strategy=strategy) + self.hcg = fleet.get_hybrid_communicate_group() + + def test_pipelayer_desc(self): + pipe_model = AlexNetPipeDesc(num_stages=self.model_parallel_size) + np.testing.assert_array_equal(len(pipe_model.parameters()), 6) + + def test_pipelayer_sequential(self): + init_net = AlexNetPipe() + pipe_model = PipelineLayer( + layers=init_net.to_layers(), + num_stages=self.model_parallel_size, + loss_fn=nn.CrossEntropyLoss()) + stage_id = self.hcg.get_stage_id() + init_parameters = init_net.parameters() + pipe_parameters = pipe_model.parameters() + part_number = len(init_parameters) // 2 + + if stage_id == 0: + for idx in range(part_number): + param_a = init_parameters[idx] + param_b = pipe_parameters[idx] + np.testing.assert_array_equal(param_a.name, param_b.name) + np.testing.assert_allclose(param_a.numpy(), param_b.numpy()) + + elif stage_id == 1: + for idx in range(part_number): + param_a = init_parameters[idx + part_number] + param_b = pipe_parameters[idx] + + np.testing.assert_array_equal(param_a.name, param_b.name) + np.testing.assert_allclose(param_a.numpy(), param_b.numpy()) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_layer.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_layer.py new file mode 100644 index 0000000000000000000000000000000000000000..f3b89d694f70b96df70f4923b5af3433c7e2e26c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_layer.py @@ -0,0 +1,29 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import paddle.fluid as fluid + +from test_parallel_dygraph_dataparallel import TestMultipleGpus + + +class TestHybridPipeParallel(TestMultipleGpus): + def test_hybrid_parallel_pp_layer(self): + self.run_mnist_2gpu('hybrid_parallel_pp_layer.py') + + +if __name__ == "__main__": + unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index d9b195a74ea0461b3ebda311c25c86eed60dfd2f..3458a42d2d907b018ddda46408ff3149e4c4d315 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -159,7 +159,7 @@ packages=['paddle', 'paddle.distributed.fleet.proto', 'paddle.distributed.fleet.utils', 'paddle.distributed.fleet.meta_parallel', - 'paddle.distributed.fleet.meta_parallel.mp_utils', + 'paddle.distributed.fleet.meta_parallel.parallel_layers', 'paddle.framework', 'paddle.jit', 'paddle.jit.dy2static',