未验证 提交 961d6cce 编写于 作者: C caozhou 提交者: GitHub

[Auto Parallel]Generate default cluster (#44150)

* generate default cluster

* add unittest
上级 07c729aa
...@@ -16,6 +16,7 @@ import os ...@@ -16,6 +16,7 @@ import os
import json import json
from enum import IntEnum from enum import IntEnum
from enum import unique from enum import unique
import paddle
@unique @unique
...@@ -138,7 +139,7 @@ class Device: ...@@ -138,7 +139,7 @@ class Device:
class Link: class Link:
default_hop = 1 default_hop = 1
default_nic_bandwith = 24 default_nic_bandwidth = 24
def __init__(self, source, target): def __init__(self, source, target):
self._src = source self._src = source
...@@ -411,6 +412,174 @@ class Cluster: ...@@ -411,6 +412,174 @@ class Cluster:
self._alpha_latency = None self._alpha_latency = None
self._rank_to_device_id = {} self._rank_to_device_id = {}
self._device_id_to_rank = {} self._device_id_to_rank = {}
# This property only be valid when the cluster consists of machines,
# which have the same number accelerators.
self._num_devices_per_machine = None
def gen_default_config_cluster(self,
gpu_model="V100",
cpu_model="6271C",
node_count=1,
device_count=1,
gpu_memory=32,
cpu_memory=503,
inter_bandwidth=24,
intra_bandwidth=235,
gpu_dp_gflops=7800,
gpu_sp_gflops=15700,
cpu_dp_gflops=75,
cpu_sp_gflops=150):
"""Generate cluster by default config."""
gpu_models = ["V100", "A100", "H100", "A2", "A10", "A16", "A30", "A40"]
xpu_models = ["XPU"]
npu_models = ["NPU"]
dcu_models = ["DCU"]
all_gpu_models = gpu_models + xpu_models + npu_models + dcu_models
assert gpu_model in all_gpu_models
self._num_devices_per_machine = device_count
def _convert_to_type(gpu_model):
type = None
if gpu_model in gpu_models:
type = "GPU"
elif gpu_model in xpu_models:
type = "XPU"
elif gpu_model in npu_models:
type = "NPU"
elif gpu_model in dcu_models:
type = "DCU"
assert type is not None
return type
def _convert_to_model(gpu_model, gpu_memory):
model = None
if gpu_model == "V100":
model = "Tesla V100-SXM2-" + str(gpu_memory) + "GB"
assert model is not None
return model
def _convert_to_cpu_info(cpu_model):
arch, vendor, model = None, None, None
if cpu_model == "6271C":
arch = "x86_64"
vendor = "GenuineIntel"
model = "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G"
elif cpu_model == "6148":
arch = "x86_64"
vendor = "GenuineIntel"
model = "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40G"
assert arch is not None
assert vendor is not None
assert model is not None
return arch, vendor, model
cluster_info = {}
cluster_info["machines"] = []
global_id = 0
global_id_to_device_type = {}
global_id_to_node = {}
# NOTE: It will support NPU, XPU, DCU models in the future, it is just a fake value now
for i in range(node_count):
machine = {}
# NOTE: The hostname is host_0, host_1, ...
machine["hostname"] = "host_" + str(i)
# NOTE: The addr is localhost, if need actual addr, it should be reset manually
machine["addr"] = "127.0.0.1"
# NOTE: The port is a default value
machine["port"] = 60009
machine["links"] = []
devices = []
local_id = 0
for j in range(device_count):
device = {}
global_id = global_id if i == 0 and j == 0 else global_id + 1
local_id += 1
type = _convert_to_type(gpu_model)
model = _convert_to_model(gpu_model, gpu_memory)
dp_gflops = gpu_dp_gflops
sp_gflops = gpu_dp_gflops
memory = gpu_memory
device["global_id"] = global_id
device["local_id"] = local_id
device["type"] = type
device["model"] = model
device["memory"] = memory
device["sp_gflops"] = sp_gflops
device["dp_gflops"] = dp_gflops
global_id_to_device_type[global_id] = type
global_id_to_node[global_id] = i
devices.append(device)
# add cpu device and nic device, just one cpu
cpu_device = {}
arch, vendor, model = _convert_to_cpu_info(cpu_model)
sp_gflops = cpu_sp_gflops
dp_gflops = cpu_dp_gflops
global_id += 1
local_id = 0
memory = cpu_memory
type = "CPU"
cpu_device["arch"] = arch
cpu_device["vendor"] = vendor
cpu_device["model"] = model
cpu_device["sp_gflops"] = sp_gflops
cpu_device["dp_gflops"] = dp_gflops
cpu_device["global_id"] = global_id
cpu_device["local_id"] = local_id
cpu_device["memory"] = memory
cpu_device["type"] = type
global_id_to_node[global_id] = i
global_id_to_device_type[global_id] = type
devices.append(cpu_device)
nic_device = {}
global_id += 1
# add NIC
type = "NIC"
width = 12.5
ip = "127.0.0.1"
local_id = 0
nic_device["type"] = type
nic_device["local_id"] = type
nic_device["global_id"] = global_id
global_id_to_device_type[global_id] = type
global_id_to_node[global_id] = i
devices.append(nic_device)
machine["devices"] = devices
cluster_info["machines"].append(machine)
# build link
for i in range(0, global_id + 1):
for j in range(0, global_id + 1):
if i == j:
continue
node_id_i = global_id_to_node[i]
node_id_j = global_id_to_node[j]
device_type_i = global_id_to_device_type[i]
device_type_j = global_id_to_device_type[j]
link = {}
source_global_id = i
target_global_id = j
link["source_global_id"] = source_global_id
link["target_global_id"] = target_global_id
# the same node and device_type, set intra_bandwidth, NVL
if node_id_i == node_id_j and device_type_i == device_type_j:
link["type"] = "NVL"
link["bandwidth"] = intra_bandwidth
else:
link["type"] = "PHB"
link["bandwidth"] = inter_bandwidth
cluster_info["machines"][node_id_i]["links"].append(link)
self._build_from_dict(cluster_info)
@property @property
def rank_to_device_id(self): def rank_to_device_id(self):
...@@ -473,9 +642,7 @@ class Cluster: ...@@ -473,9 +642,7 @@ class Cluster:
device = machine.devices[device_global_id] device = machine.devices[device_global_id]
return device return device
def build_from_file(self, json_file_path): def _build_from_dict(self, cluster_info):
with open(json_file_path) as json_file:
cluster_info = json.load(json_file)
machines_info = cluster_info["machines"] machines_info = cluster_info["machines"]
for machine_info in machines_info: for machine_info in machines_info:
machine_id = self._generate_machine_id() machine_id = self._generate_machine_id()
...@@ -533,6 +700,11 @@ class Cluster: ...@@ -533,6 +700,11 @@ class Cluster:
else: else:
self._alpha_latecy = None self._alpha_latecy = None
def build_from_file(self, json_file_path):
with open(json_file_path) as json_file:
cluster_info = json.load(json_file)
self._build_from_dict(cluster_info)
def _generate_machine_id(self): def _generate_machine_id(self):
cur_machine_id = self._num_machines cur_machine_id = self._num_machines
self._num_machines += 1 self._num_machines += 1
...@@ -556,7 +728,7 @@ class Cluster: ...@@ -556,7 +728,7 @@ class Cluster:
bandwidth = None bandwidth = None
# None means the source and target are not connected directly, set NIC in default # None means the source and target are not connected directly, set NIC in default
if link is None: if link is None:
bandwidth = Link.default_nic_bandwith bandwidth = Link.default_nic_bandwidth
else: else:
bandwidth = link.bandwidth bandwidth = link.bandwidth
...@@ -608,6 +780,15 @@ class Cluster: ...@@ -608,6 +780,15 @@ class Cluster:
assert count > 0 assert count > 0
return count return count
def get_num_machines(self):
return len(self._machines)
def get_num_devices_per_machine(self):
# Only return the number of accelerators of each machine.
# All machines must has the same number of devices and same type of devices.
assert self._num_devices_per_machine
return self._num_devices_per_machine
def __str__(self): def __str__(self):
str = "" str = ""
for machine in self.machines.values(): for machine in self.machines.values():
...@@ -616,3 +797,29 @@ class Cluster: ...@@ -616,3 +797,29 @@ class Cluster:
def __repr__(self): def __repr__(self):
return self.__str__() return self.__str__()
def get_default_cluster():
cluster = Cluster()
local_device_count = os.getenv("PADDLE_LOCAL_SIZE")
if local_device_count is None:
local_device_count = 1
else:
local_device_count = int(local_device_count)
global_device_count = os.getenv("PADDLE_GLOBAL_SIZE")
if global_device_count is None:
node_count = 1
else:
global_device_count = int(global_device_count)
assert global_device_count % local_device_count == 0
node_count = int(global_device_count) // local_device_count
print("Node Count: ",
node_count,
"Local Device Size: ",
local_device_count,
"World size: ",
paddle.distributed.get_world_size(),
flush=True)
cluster.gen_default_config_cluster(node_count=node_count,
device_count=local_device_count)
return cluster
...@@ -19,6 +19,7 @@ import json ...@@ -19,6 +19,7 @@ import json
import paddle import paddle
from paddle.distributed.auto_parallel.cluster import Cluster from paddle.distributed.auto_parallel.cluster import Cluster
from paddle.distributed.auto_parallel.cluster import get_default_cluster
cluster_json = """ cluster_json = """
{ {
...@@ -1997,6 +1998,10 @@ class TestCluster(unittest.TestCase): ...@@ -1997,6 +1998,10 @@ class TestCluster(unittest.TestCase):
self.assertTrue(devices == [0, 1, 2, 3]) self.assertTrue(devices == [0, 1, 2, 3])
self.assertTrue(involved_machine_count == 1) self.assertTrue(involved_machine_count == 1)
# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)
def test_multi_machine(self): def test_multi_machine(self):
# Build cluster # Build cluster
cluster_json_path = os.path.join(self.temp_dir.name, cluster_json_path = os.path.join(self.temp_dir.name,
...@@ -2022,6 +2027,17 @@ class TestCluster(unittest.TestCase): ...@@ -2022,6 +2027,17 @@ class TestCluster(unittest.TestCase):
if os.path.exists(cluster_json_path): if os.path.exists(cluster_json_path):
os.remove(cluster_json_path) os.remove(cluster_json_path)
def test_default_config_cluster(self):
cluster = Cluster()
cluster.gen_default_config_cluster(device_count=8)
# check machines and devices
self.assertTrue(cluster.get_num_machines() == 1)
self.assertTrue(cluster.get_num_devices_per_machine() == 8)
def test_default_cluster(self):
cluster = get_default_cluster()
self.assertTrue(isinstance(cluster, Cluster))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册