未验证 提交 55428765 编写于 作者: J Jianghai 提交者: GitHub

[Auto Parallel] Add cluster partition and dm to pm (#48320)

* add cluster_partition and device_meshes to process_meshes funcs

* add unitest
上级 2e7c172c
......@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
import math
from abc import abstractmethod
from ..graph import Graph
_PATTERNS = {}
......@@ -504,6 +505,100 @@ class OperatorClusteringUtil:
return True
class ClusterPartitionUtil:
@staticmethod
def factorization(num):
factors = []
for i in range(1, int(math.floor(math.sqrt(num))) + 1):
if num % i == 0:
factors.append([i, int(num / i)])
return factors
@staticmethod
def complete_meshes(partitions: list, num: int):
if len(partitions) == 1:
partitions = ClusterPartitionUtil.factorization(num - 1)
partitions.append([1])
return partitions
@staticmethod
def partition_cluster(
n: int,
m: int,
filter=[
complete_meshes.__func__,
],
) -> list:
"""
Partiton cluster into possible device meshes.
Args:
n (int): The number of nodes.
m (int): The number of single devices on each node.
filter (list): Functions for filtering useful meshes
Returns:
device_meshed (list) : The possible device meshes.
"""
partition_result = ClusterPartitionUtil.factorization(n)
for func in filter:
partition_result = func(partition_result, n)
device_meshes = []
if n == 1:
partition_result = ClusterPartitionUtil.factorization(m)
for partition in partition_result:
device_mesh = []
for i in range(partition[0]):
device_mesh.append([1, partition[1]])
device_meshes.append(device_mesh)
else:
incerement = 1 if partition_result[-1] == [1] else 0
for partition in partition_result:
if len(partition) < 2:
continue
device_mesh = []
for i in range(partition[0]):
device_mesh.append([partition[1], m])
device_mesh[-1][0] += incerement
device_meshes.append(device_mesh)
return device_meshes
def convert_to_process_meshes(device_mesh: list) -> list:
"""
Transfer device_meshes into possible process meshes.
Args:
device meshes (list): [n,m], one device mesh.
Returns:
process_meshes (list): Possible process_meshes
"""
n, m = device_mesh[0], device_mesh[1]
factors = (
ClusterPartitionUtil.factorization(m)
if n == 1
else ClusterPartitionUtil.factorization(n)
)
process_meshes = []
if n == 1:
for factor in factors:
if factor[0] == 1:
process_meshes.append([factor[1]])
continue
process_meshes.append(factor)
else:
for factor in factors:
mul1, mul2 = factor[0], factor[1]
if mul1 == 1:
process_meshes.append([m * mul2])
elif mul1 != mul2:
process_meshes.append([int(n / mul2), m * mul2])
process_meshes.append([int(n / mul1), m * mul1])
return process_meshes
class RuleBasedTuner:
def __init__(self, dist_context, mode="train"):
self._dist_context = dist_context
......
......@@ -121,4 +121,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_group_operators MODULES test_group_operators)
py_test_modules(test_pattern MODULES test_pattern)
py_test_modules(test_pattern_match MODULES test_pattern_match)
py_test_modules(test_cluster_partition MODULES test_cluster_partition)
py_test_modules(test_convert_to_process_meshes MODULES
test_convert_to_process_meshes)
endif()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
class TestClusterPartition(unittest.TestCase):
def test_cluster_partition(self):
clusters = [(5, 8), (1, 8), (4, 8), (16, 8)]
from paddle.distributed.auto_parallel.tuner.rule_based_tuner import (
ClusterPartitionUtil,
)
device_meshes = []
for cluster in clusters:
n = cluster[0]
m = cluster[1]
device_mesh = ClusterPartitionUtil.partition_cluster(n, m)
device_meshes.append(device_mesh)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
class TestConvertToProcessMeshes(unittest.TestCase):
def test_convert_to_process_meshes(self):
device_meshes = [[1, 8], [4, 8], [15, 8]]
from paddle.distributed.auto_parallel.tuner.rule_based_tuner import (
convert_to_process_meshes,
)
process_meshes = []
for device_mesh in device_meshes:
process_mesh = convert_to_process_meshes(device_mesh)
process_meshes.append(process_mesh)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册