diff --git a/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py b/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py index e00efcb15323a06b69829f1dc8067191cdab4980..1e94adc8de0d34b6c58c4a33b7249f94b02f4751 100644 --- a/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py +++ b/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +import math +from abc import abstractmethod from ..graph import Graph _PATTERNS = {} @@ -504,6 +505,100 @@ class OperatorClusteringUtil: return True +class ClusterPartitionUtil: + @staticmethod + def factorization(num): + factors = [] + for i in range(1, int(math.floor(math.sqrt(num))) + 1): + if num % i == 0: + factors.append([i, int(num / i)]) + return factors + + @staticmethod + def complete_meshes(partitions: list, num: int): + if len(partitions) == 1: + partitions = ClusterPartitionUtil.factorization(num - 1) + partitions.append([1]) + return partitions + + @staticmethod + def partition_cluster( + n: int, + m: int, + filter=[ + complete_meshes.__func__, + ], + ) -> list: + """ + Partiton cluster into possible device meshes. + + Args: + n (int): The number of nodes. + m (int): The number of single devices on each node. + filter (list): Functions for filtering useful meshes + + Returns: + device_meshed (list) : The possible device meshes. + """ + partition_result = ClusterPartitionUtil.factorization(n) + for func in filter: + partition_result = func(partition_result, n) + device_meshes = [] + if n == 1: + partition_result = ClusterPartitionUtil.factorization(m) + for partition in partition_result: + device_mesh = [] + for i in range(partition[0]): + device_mesh.append([1, partition[1]]) + device_meshes.append(device_mesh) + else: + incerement = 1 if partition_result[-1] == [1] else 0 + for partition in partition_result: + if len(partition) < 2: + continue + device_mesh = [] + for i in range(partition[0]): + device_mesh.append([partition[1], m]) + device_mesh[-1][0] += incerement + device_meshes.append(device_mesh) + + return device_meshes + + +def convert_to_process_meshes(device_mesh: list) -> list: + """ + Transfer device_meshes into possible process meshes. + + Args: + device meshes (list): [n,m], one device mesh. + + Returns: + process_meshes (list): Possible process_meshes + """ + n, m = device_mesh[0], device_mesh[1] + factors = ( + ClusterPartitionUtil.factorization(m) + if n == 1 + else ClusterPartitionUtil.factorization(n) + ) + process_meshes = [] + if n == 1: + for factor in factors: + if factor[0] == 1: + process_meshes.append([factor[1]]) + continue + process_meshes.append(factor) + else: + for factor in factors: + mul1, mul2 = factor[0], factor[1] + if mul1 == 1: + process_meshes.append([m * mul2]) + elif mul1 != mul2: + process_meshes.append([int(n / mul2), m * mul2]) + process_meshes.append([int(n / mul1), m * mul1]) + return process_meshes + + class RuleBasedTuner: def __init__(self, dist_context, mode="train"): self._dist_context = dist_context diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 18fad917b683979571de47232ee9d2aece2d4aaf..d13e9b69b578a1dc29ae3f6a998d803f55301c14 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -121,4 +121,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_group_operators MODULES test_group_operators) py_test_modules(test_pattern MODULES test_pattern) py_test_modules(test_pattern_match MODULES test_pattern_match) + py_test_modules(test_cluster_partition MODULES test_cluster_partition) + py_test_modules(test_convert_to_process_meshes MODULES + test_convert_to_process_meshes) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_partition.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_partition.py new file mode 100644 index 0000000000000000000000000000000000000000..2223724c2953976d66088de85a7ef7f6d760ef13 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster_partition.py @@ -0,0 +1,34 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + + +class TestClusterPartition(unittest.TestCase): + def test_cluster_partition(self): + clusters = [(5, 8), (1, 8), (4, 8), (16, 8)] + from paddle.distributed.auto_parallel.tuner.rule_based_tuner import ( + ClusterPartitionUtil, + ) + + device_meshes = [] + for cluster in clusters: + n = cluster[0] + m = cluster[1] + device_mesh = ClusterPartitionUtil.partition_cluster(n, m) + device_meshes.append(device_mesh) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_convert_to_process_meshes.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_convert_to_process_meshes.py new file mode 100644 index 0000000000000000000000000000000000000000..120a7ba438a4066aa992fb777136dc22fbfd4566 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_convert_to_process_meshes.py @@ -0,0 +1,32 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + + +class TestConvertToProcessMeshes(unittest.TestCase): + def test_convert_to_process_meshes(self): + device_meshes = [[1, 8], [4, 8], [15, 8]] + from paddle.distributed.auto_parallel.tuner.rule_based_tuner import ( + convert_to_process_meshes, + ) + + process_meshes = [] + for device_mesh in device_meshes: + process_mesh = convert_to_process_meshes(device_mesh) + process_meshes.append(process_mesh) + + +if __name__ == "__main__": + unittest.main()