未验证 提交 db727551 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto Parallel] Add the unified cluster representation (#37091)

* [Auto Parallel]  Add the unified cluster representation

* Add the local id for devices

* Add some comments
上级 8b87d5eb
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
from enum import IntEnum
from enum import unique
@unique
class DeviceType(IntEnum):
UNKNOWN = 0
CPU = 1
GPU = 2
XPU = 3
NPU = 4
DCU = 5
NIC = 6
@unique
class LinkType(IntEnum):
UNKNOWN = 0
LOC = 1
SYS = 2
PHB = 3
PIX = 4
PIB = 5
NVL = 6
NVB = 7
NET = 8
class Device:
def __init__(self, global_id, local_id, machine):
self._global_id = global_id
self._local_id = local_id
self._machine = machine
self._type = None
# Different device have different models, such as
# "Tesla V100-SXM2-32GB" and "A100-SXM4-40GB" etc.
self._model = None
# Double precision GFLOPS
self._dp_gflops = None
# Single precision GFLOPS
self._sp_gflops = None
# Memory is stored by GB
self._memory = None
@property
def global_id(self):
return self._global_id
@global_id.setter
def global_id(self, value):
self._global_id = value
@property
def local_id(self):
return self._local_id
@local_id.setter
def local_id(self, value):
self._local_id = value
@property
def machine(self):
return self._machine
@machine.setter
def machine(self, value):
self._machine = value
@property
def type(self):
return self._type
@type.setter
def type(self, value):
self._type = value
@property
def model(self):
return self._model
@model.setter
def model(self, value):
self._model = value
@property
def dp_gflops(self):
return self._dp_gflops
@dp_gflops.setter
def dp_gflops(self, value):
self._dp_gflops = value
@property
def sp_gflops(self):
return self._sp_gflops
@sp_gflops.setter
def sp_gflops(self, value):
self._sp_gflops = value
@property
def memory(self):
return self._memory
@memory.setter
def memory(self, value):
self._memory = value
def __str__(self):
str = ""
str += "global_id: {}, local_id: {}, machine_id: {}, type: {}, model: {}, dp_flops: {}, sp_flops: {}, memory: {}".format(
self.global_id, self.local_id, self.machine.id, self.type.name,
self.model, self.dp_gflops, self.sp_gflops, self.memory)
return str
def __repr__(self):
return self.__str__()
class Link:
def __init__(self, source, target):
self._src = source
self._tgt = target
self._type = None
# bandwidth is stored by GB/s
self._bandwidth = None
# latency is stored by millisecond
self._latency = None
@property
def source(self):
return self._src
@source.setter
def source(self, value):
self._source = value
@property
def target(self):
return self._tgt
@target.setter
def target(self, value):
self._target = value
@property
def type(self):
return self._type
@type.setter
def type(self, value):
self._type = value
@property
def bandwidth(self):
return self._bandwidth
@bandwidth.setter
def bandwidth(self, value):
self._bandwidth = value
@property
def latency(self):
return self._latency
@latency.setter
def latency(self, value):
self._latency = value
def __str__(self):
str = ""
str += "source_global_id: {}, target_global_id: {}, type: {}, bandwidth: {}, latency: {}".format(
self.source.global_id, self.target.global_id, self.type,
self.bandwidth, self.latency)
return str
def __repr__(self):
return self.__str__()
class Machine:
def __init__(self, id):
self._id = id
self._hostname = None
self._addr = None
self._port = None
self._devices = {}
self._links = {}
@property
def id(self):
return self._id
@id.setter
def id(self, value):
self._id = value
@property
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, value):
self._hostname = value
@property
def addr(self):
return self._addr
@addr.setter
def addr(self, value):
self._addr = value
@property
def port(self):
return self._port
@port.setter
def port(self, value):
self._port = value
@property
def devices(self):
return self._devices
@property
def links(self):
return self._links
def add_device(self, device):
# Use the device global_id as the key
self._devices[device.global_id] = device
def add_link(self, link):
# Use the source device global_id and target device global_id as the key
self._links[(link.source.global_id, link.target.global_id)] = link
def __str__(self):
str = ""
for device in self.devices.values():
str += ", device: {}".format(device)
for link in self.links.values():
str += ", link: {}".format(link)
return str
def __repr__(self):
return self.__str__()
class Cluster:
"""
The cluster is an abstract of the hardware resource for training, which contains the cluster topology and
related hardware information. It will serve the task mapping, cost model and auto searching.
"""
def __init__(self):
# Used to compute machine id
self._num_machines = 0
# Store all machines within the cluster
self._machines = {}
# Cluster graph topology
self._topology = None
@property
def machines(self):
return self._machines
def add_machine(self, machine):
assert isinstance(machine, Machine)
self._machines[machine.id] = machine
def add_device(self, device):
assert isinstance(device, Device)
device.machine.add_device(device)
def add_link(self, link):
assert isinstance(link, Link)
# Only add the link to the source machine
link.source.machine.add_link(link)
def get_device(self, device_global_id):
device = None
for machine in self.machines.values():
if device_global_id in machine.devices.keys():
device = machine.devices[device_global_id]
return device
def build_from_file(self, json_file_path):
with open(json_file_path) as json_file:
cluster_info = json.load(json_file)
machines_info = cluster_info["machines"]
for machine_info in machines_info:
machine_id = self._generate_machine_id()
machine = Machine(machine_id)
machine.hostname = machine_info.get("hostname")
machine.addr = machine_info.get("addr")
machine.port = machine_info.get("port")
devices_info = machine_info.get("devices", [])
for device_info in devices_info:
device_global_id = device_info.get("global_id")
device_local_id = device_info.get("local_id")
device = Device(device_global_id, device_local_id, machine)
device_type = device_info.get("type", None)
if device_type is not None:
device_type = DeviceType[device_type]
else:
device_type = DeviceType.UNKNOWN
device.type = device_type
device.model = device_info.get("model", None)
device.dp_gflops = float(device_info.get("dp_gflops", 0))
device.sp_gflops = float(device_info.get("sp_gflops", 0))
device.memory = float(device_info.get("memory", 0))
self.add_device(device)
self.add_machine(machine)
for machine_info in machines_info:
links_info = machine_info.get("links", [])
for link_info in links_info:
source_global_id = link_info.get("source_global_id")
target_global_id = link_info.get("target_global_id")
source = self.get_device(source_global_id)
target = self.get_device(target_global_id)
link = Link(source, target)
link_type = link_info.get("type", None)
if link_type is not None:
link_type = LinkType[link_type]
else:
link_type = LinkType.UNKNOWN
link.type = link_type
link.bandwidth = float(link_info.get("bandwidth", 0))
link.latency = float(link_info.get("latency", 0))
self.add_link(link)
def _generate_machine_id(self):
cur_machine_id = self._num_machines
self._num_machines += 1
return cur_machine_id
def __str__(self):
str = ""
for machine in self.machines.values():
str += "machine: {}\n".format(machine)
return str
def __repr__(self):
return self.__str__()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import os
import json
from paddle.distributed.auto_parallel.cluster import Cluster
from paddle.distributed.auto_parallel.cluster import DeviceType
from paddle.distributed.auto_parallel.cluster import LinkType
cluster_json = """
{
"machines": [
{
"hostname": "machine0",
"addr": "0.0.0.1",
"port": "768",
"devices": [
{
"global_id": 0,
"local_id": 0,
"type": "GPU",
"model": "A100-SXM4-40GB",
"sp_gflops": 19500,
"dp_gflops": 9700,
"memory": 40
},
{
"global_id": 1,
"local_id": 1,
"type": "GPU",
"model": "A100-SXM4-40GB",
"sp_gflops": 19500,
"dp_gflops": 9700,
"memory": 40
},
{
"global_id": 2,
"local_id": 0,
"type": "CPU",
"model": "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GH",
"arch": "x86_64",
"vendor": "GenuineIntel",
"sp_gflops": 150,
"dp_gflops": 75,
"memory": 1510
},
{
"global_id": 3,
"local_id": 0,
"type": "NIC"
}
],
"links": [
{
"source_global_id": 0,
"target_global_id": 1,
"type": "NVL",
"bandwidth": 252
},
{
"source_global_id": 0,
"target_global_id": 2,
"type": "PHB",
"bandwidth": 12
},
{
"source_global_id": 1,
"target_global_id": 2,
"type": "PHB",
"bandwidth": 12
},
{
"source_global_id": 0,
"target_global_id": 3,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 1,
"target_global_id": 3,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 2,
"target_global_id": 3,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 3,
"target_global_id": 7,
"type": "NET",
"bandwidth": 1
}
]
},
{
"hostname": "machine1",
"addr": "0.0.0.2",
"port": "768",
"devices": [
{
"global_id": 4,
"local_id": 0,
"type": "GPU",
"model": "Tesla V100-SXM2-32GB",
"sp_gflops": 15700,
"dp_gflops": 7800,
"memory": 32
},
{
"global_id": 5,
"local_id": 1,
"type": "GPU",
"model": "Tesla V100-SXM2-32GB",
"sp_gflops": 15700,
"dp_gflops": 7800,
"memory": 32
},
{
"global_id": 6,
"local_id": 0,
"type": "CPU",
"model": "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G",
"arch": "x86_64",
"vendor": "GenuineIntel",
"sp_gflops": 150,
"dp_gflops": 75,
"memory": "503"
},
{
"global_id": 7,
"local_id": 0,
"type": "NIC"
}
],
"links": [
{
"source_global_id": 4,
"target_global_id": 5,
"type": "NVL",
"bandwidth": 42
},
{
"source_global_id": 4,
"target_global_id": 6,
"type": "PHB",
"bandwidth": 12
},
{
"source_global_id": 5,
"target_global_id": 6,
"type": "PHB",
"bandwidth": 12
},
{
"source_global_id": 4,
"target_global_id": 7,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 5,
"target_global_id": 7,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 6,
"target_global_id": 7,
"type": "NET",
"bandwidth": 1
},
{
"source_global_id": 7,
"target_global_id": 3,
"type": "NET",
"bandwidth": 1
}
]
}
]
}
"""
class TestAutoParallelCluster(unittest.TestCase):
def test_cluster(self):
cluster_json_file = ""
cluster_json_object = json.loads(cluster_json)
with open("./auto_parallel_cluster.json", "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file)
cluster = Cluster()
cluster.build_from_file("./auto_parallel_cluster.json")
os.remove("./auto_parallel_cluster.json")
self.assertEqual(len(cluster.machines), 2)
# machine0
machine0 = cluster.machines[0]
self.assertEqual(machine0.id, 0)
self.assertEqual(machine0.hostname, "machine0")
self.assertEqual(machine0.addr, "0.0.0.1")
self.assertEqual(machine0.port, "768")
self.assertEqual(len(machine0.devices), 4)
self.assertEqual(len(machine0.links), 7)
# device0
device0_machine0 = machine0.devices[0]
self.assertEqual(device0_machine0.global_id, 0)
self.assertEqual(device0_machine0.local_id, 0)
self.assertEqual(device0_machine0.type, DeviceType.GPU)
self.assertEqual(device0_machine0.model, "A100-SXM4-40GB")
self.assertAlmostEqual(device0_machine0.sp_gflops, 19500)
self.assertAlmostEqual(device0_machine0.dp_gflops, 9700)
self.assertAlmostEqual(device0_machine0.memory, 40)
# device0, link0
link0_machine0 = machine0.links[(0, 1)]
self.assertEqual(link0_machine0.source.global_id, 0)
self.assertEqual(link0_machine0.target.global_id, 1)
self.assertEqual(link0_machine0.type, LinkType.NVL)
self.assertAlmostEqual(link0_machine0.bandwidth, 252)
self.assertAlmostEqual(link0_machine0.latency, 0)
# device 0, link 1
link1_machine0 = machine0.links[(0, 2)]
self.assertEqual(link1_machine0.source.global_id, 0)
self.assertEqual(link1_machine0.target.global_id, 2)
self.assertEqual(link1_machine0.type, LinkType.PHB)
self.assertAlmostEqual(link1_machine0.bandwidth, 12)
self.assertAlmostEqual(link1_machine0.latency, 0)
# device0, link2
link2_machine0 = machine0.links[(0, 3)]
self.assertEqual(link2_machine0.source.global_id, 0)
self.assertEqual(link2_machine0.target.global_id, 3)
self.assertEqual(link2_machine0.type, LinkType.NET)
self.assertAlmostEqual(link2_machine0.bandwidth, 1)
self.assertAlmostEqual(link2_machine0.latency, 0)
# device1
device1_machine0 = machine0.devices[1]
self.assertEqual(device1_machine0.global_id, 1)
self.assertEqual(device1_machine0.local_id, 1)
self.assertEqual(device1_machine0.type, DeviceType.GPU)
self.assertEqual(device1_machine0.model, "A100-SXM4-40GB")
self.assertAlmostEqual(device1_machine0.sp_gflops, 19500)
self.assertAlmostEqual(device1_machine0.dp_gflops, 9700)
self.assertAlmostEqual(device1_machine0.memory, 40)
# device1, link0
link0_machine0 = machine0.links[(1, 2)]
self.assertEqual(link0_machine0.source.global_id, 1)
self.assertEqual(link0_machine0.target.global_id, 2)
self.assertEqual(link0_machine0.type, LinkType.PHB)
self.assertAlmostEqual(link0_machine0.bandwidth, 12)
self.assertAlmostEqual(link0_machine0.latency, 0)
# device1, link1
link1_machine0 = machine0.links[(1, 3)]
self.assertEqual(link1_machine0.source.global_id, 1)
self.assertEqual(link1_machine0.target.global_id, 3)
self.assertEqual(link1_machine0.type, LinkType.NET)
self.assertAlmostEqual(link1_machine0.bandwidth, 1)
self.assertAlmostEqual(link1_machine0.latency, 0)
# device2
device2_machine0 = machine0.devices[2]
self.assertEqual(device2_machine0.global_id, 2)
self.assertEqual(device2_machine0.local_id, 0)
self.assertEqual(device2_machine0.type, DeviceType.CPU)
self.assertEqual(device2_machine0.model,
"Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GH")
self.assertAlmostEqual(device2_machine0.sp_gflops, 150)
self.assertAlmostEqual(device2_machine0.dp_gflops, 75)
self.assertAlmostEqual(device2_machine0.memory, 1510)
# device2, link0
link0_machine0 = machine0.links[(2, 3)]
self.assertEqual(link0_machine0.source.global_id, 2)
self.assertEqual(link0_machine0.target.global_id, 3)
self.assertEqual(link0_machine0.type, LinkType.NET)
self.assertAlmostEqual(link0_machine0.bandwidth, 1)
self.assertAlmostEqual(link0_machine0.latency, 0)
# device3
device3_machine0 = machine0.devices[3]
self.assertEqual(device3_machine0.global_id, 3)
self.assertEqual(device3_machine0.local_id, 0)
self.assertEqual(device3_machine0.type, DeviceType.NIC)
self.assertAlmostEqual(device3_machine0.model, None)
self.assertAlmostEqual(device3_machine0.sp_gflops, 0)
self.assertAlmostEqual(device3_machine0.dp_gflops, 0)
self.assertAlmostEqual(device3_machine0.memory, 0)
link0_machine0 = machine0.links[(3, 7)]
# device3, link0
self.assertEqual(link0_machine0.source.global_id, 3)
self.assertEqual(link0_machine0.target.global_id, 7)
self.assertEqual(link0_machine0.type, LinkType.NET)
self.assertAlmostEqual(link0_machine0.bandwidth, 1)
self.assertAlmostEqual(link0_machine0.latency, 0)
# machine1
machine1 = cluster.machines[1]
self.assertEqual(machine1.id, 1)
self.assertEqual(machine1.hostname, "machine1")
self.assertEqual(machine1.addr, "0.0.0.2")
self.assertEqual(machine1.port, "768")
self.assertEqual(len(machine1.devices), 4)
self.assertEqual(len(machine1.links), 7)
# device4
device4_machine1 = machine1.devices[4]
self.assertEqual(device4_machine1.global_id, 4)
self.assertEqual(device4_machine1.local_id, 0)
self.assertEqual(device4_machine1.type, DeviceType.GPU)
self.assertEqual(device4_machine1.model, "Tesla V100-SXM2-32GB")
self.assertAlmostEqual(device4_machine1.sp_gflops, 15700)
self.assertAlmostEqual(device4_machine1.dp_gflops, 7800)
self.assertAlmostEqual(device4_machine1.memory, 32)
# device4, link0
link0_machine1 = machine1.links[(4, 5)]
self.assertEqual(link0_machine1.source.global_id, 4)
self.assertEqual(link0_machine1.target.global_id, 5)
self.assertEqual(link0_machine1.type, LinkType.NVL)
self.assertAlmostEqual(link0_machine1.bandwidth, 42)
self.assertAlmostEqual(link0_machine1.latency, 0)
# device 4, link 1
link1_machine1 = machine1.links[(4, 6)]
self.assertEqual(link1_machine1.source.global_id, 4)
self.assertEqual(link1_machine1.target.global_id, 6)
self.assertEqual(link1_machine1.type, LinkType.PHB)
self.assertAlmostEqual(link1_machine1.bandwidth, 12)
self.assertAlmostEqual(link1_machine1.latency, 0)
# device4, link2
link2_machine1 = machine1.links[(4, 7)]
self.assertEqual(link2_machine1.source.global_id, 4)
self.assertEqual(link2_machine1.target.global_id, 7)
self.assertEqual(link2_machine1.type, LinkType.NET)
self.assertAlmostEqual(link2_machine1.bandwidth, 1)
self.assertAlmostEqual(link2_machine1.latency, 0)
# device5
device5_machine1 = machine1.devices[5]
self.assertEqual(device5_machine1.global_id, 5)
self.assertEqual(device5_machine1.local_id, 1)
self.assertEqual(device5_machine1.type, DeviceType.GPU)
self.assertEqual(device4_machine1.model, "Tesla V100-SXM2-32GB")
self.assertAlmostEqual(device4_machine1.sp_gflops, 15700)
self.assertAlmostEqual(device4_machine1.dp_gflops, 7800)
self.assertAlmostEqual(device4_machine1.memory, 32)
# device5, link0
link0_machine1 = machine1.links[(5, 6)]
self.assertEqual(link0_machine1.source.global_id, 5)
self.assertEqual(link0_machine1.target.global_id, 6)
self.assertEqual(link0_machine1.type, LinkType.PHB)
self.assertAlmostEqual(link0_machine1.bandwidth, 12)
self.assertAlmostEqual(link0_machine1.latency, 0)
# device5, link1
link1_machine1 = machine1.links[(5, 7)]
self.assertEqual(link1_machine1.source.global_id, 5)
self.assertEqual(link1_machine1.target.global_id, 7)
self.assertEqual(link1_machine1.type, LinkType.NET)
self.assertAlmostEqual(link1_machine1.bandwidth, 1)
self.assertAlmostEqual(link1_machine1.latency, 0)
# device6
device6_machine1 = machine1.devices[6]
self.assertEqual(device6_machine1.global_id, 6)
self.assertEqual(device6_machine1.local_id, 0)
self.assertEqual(device6_machine1.type, DeviceType.CPU)
self.assertEqual(device6_machine1.model,
"Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G")
self.assertAlmostEqual(device6_machine1.sp_gflops, 150)
self.assertAlmostEqual(device6_machine1.dp_gflops, 75)
self.assertAlmostEqual(device6_machine1.memory, 503)
# device6, link0
link0_machine1 = machine1.links[(6, 7)]
self.assertEqual(link0_machine1.source.global_id, 6)
self.assertEqual(link0_machine1.target.global_id, 7)
self.assertEqual(link0_machine1.type, LinkType.NET)
self.assertAlmostEqual(link0_machine1.bandwidth, 1)
self.assertAlmostEqual(link0_machine1.latency, 0)
# device7
device7_machine1 = machine1.devices[7]
self.assertEqual(device7_machine1.global_id, 7)
self.assertEqual(device7_machine1.local_id, 0)
self.assertEqual(device7_machine1.type, DeviceType.NIC)
self.assertAlmostEqual(device7_machine1.model, None)
self.assertAlmostEqual(device7_machine1.sp_gflops, 0)
self.assertAlmostEqual(device7_machine1.dp_gflops, 0)
self.assertAlmostEqual(device7_machine1.memory, 0)
# device3, link0
link0_machine1 = machine1.links[(7, 3)]
self.assertEqual(link0_machine1.source.global_id, 7)
self.assertEqual(link0_machine1.target.global_id, 3)
self.assertEqual(link0_machine1.type, LinkType.NET)
self.assertAlmostEqual(link0_machine1.bandwidth, 1)
self.assertAlmostEqual(link0_machine1.latency, 0)
str = "cluster: {}".format(cluster)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册