From db7275514fd4d2c6e3a6d41b09bb24788d68fdb1 Mon Sep 17 00:00:00 2001 From: Yulong Ao Date: Wed, 24 Nov 2021 14:36:05 +0800 Subject: [PATCH] [Auto Parallel] Add the unified cluster representation (#37091) * [Auto Parallel] Add the unified cluster representation * Add the local id for devices * Add some comments --- .../distributed/auto_parallel/cluster.py | 361 +++++++++++++++ .../unittests/test_auto_parallel_cluster.py | 430 ++++++++++++++++++ 2 files changed, 791 insertions(+) create mode 100644 python/paddle/distributed/auto_parallel/cluster.py create mode 100644 python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py diff --git a/python/paddle/distributed/auto_parallel/cluster.py b/python/paddle/distributed/auto_parallel/cluster.py new file mode 100644 index 0000000000..d65612fc6e --- /dev/null +++ b/python/paddle/distributed/auto_parallel/cluster.py @@ -0,0 +1,361 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +from enum import IntEnum +from enum import unique + + +@unique +class DeviceType(IntEnum): + UNKNOWN = 0 + CPU = 1 + GPU = 2 + XPU = 3 + NPU = 4 + DCU = 5 + NIC = 6 + + +@unique +class LinkType(IntEnum): + UNKNOWN = 0 + LOC = 1 + SYS = 2 + PHB = 3 + PIX = 4 + PIB = 5 + NVL = 6 + NVB = 7 + NET = 8 + + +class Device: + def __init__(self, global_id, local_id, machine): + self._global_id = global_id + self._local_id = local_id + self._machine = machine + self._type = None + # Different device have different models, such as + # "Tesla V100-SXM2-32GB" and "A100-SXM4-40GB" etc. + self._model = None + # Double precision GFLOPS + self._dp_gflops = None + # Single precision GFLOPS + self._sp_gflops = None + # Memory is stored by GB + self._memory = None + + @property + def global_id(self): + return self._global_id + + @global_id.setter + def global_id(self, value): + self._global_id = value + + @property + def local_id(self): + return self._local_id + + @local_id.setter + def local_id(self, value): + self._local_id = value + + @property + def machine(self): + return self._machine + + @machine.setter + def machine(self, value): + self._machine = value + + @property + def type(self): + return self._type + + @type.setter + def type(self, value): + self._type = value + + @property + def model(self): + return self._model + + @model.setter + def model(self, value): + self._model = value + + @property + def dp_gflops(self): + return self._dp_gflops + + @dp_gflops.setter + def dp_gflops(self, value): + self._dp_gflops = value + + @property + def sp_gflops(self): + return self._sp_gflops + + @sp_gflops.setter + def sp_gflops(self, value): + self._sp_gflops = value + + @property + def memory(self): + return self._memory + + @memory.setter + def memory(self, value): + self._memory = value + + def __str__(self): + str = "" + str += "global_id: {}, local_id: {}, machine_id: {}, type: {}, model: {}, dp_flops: {}, sp_flops: {}, memory: {}".format( + self.global_id, self.local_id, self.machine.id, self.type.name, + self.model, self.dp_gflops, self.sp_gflops, self.memory) + return str + + def __repr__(self): + return self.__str__() + + +class Link: + def __init__(self, source, target): + self._src = source + self._tgt = target + self._type = None + # bandwidth is stored by GB/s + self._bandwidth = None + # latency is stored by millisecond + self._latency = None + + @property + def source(self): + return self._src + + @source.setter + def source(self, value): + self._source = value + + @property + def target(self): + return self._tgt + + @target.setter + def target(self, value): + self._target = value + + @property + def type(self): + return self._type + + @type.setter + def type(self, value): + self._type = value + + @property + def bandwidth(self): + return self._bandwidth + + @bandwidth.setter + def bandwidth(self, value): + self._bandwidth = value + + @property + def latency(self): + return self._latency + + @latency.setter + def latency(self, value): + self._latency = value + + def __str__(self): + str = "" + str += "source_global_id: {}, target_global_id: {}, type: {}, bandwidth: {}, latency: {}".format( + self.source.global_id, self.target.global_id, self.type, + self.bandwidth, self.latency) + return str + + def __repr__(self): + return self.__str__() + + +class Machine: + def __init__(self, id): + self._id = id + self._hostname = None + self._addr = None + self._port = None + self._devices = {} + self._links = {} + + @property + def id(self): + return self._id + + @id.setter + def id(self, value): + self._id = value + + @property + def hostname(self): + return self._hostname + + @hostname.setter + def hostname(self, value): + self._hostname = value + + @property + def addr(self): + return self._addr + + @addr.setter + def addr(self, value): + self._addr = value + + @property + def port(self): + return self._port + + @port.setter + def port(self, value): + self._port = value + + @property + def devices(self): + return self._devices + + @property + def links(self): + return self._links + + def add_device(self, device): + # Use the device global_id as the key + self._devices[device.global_id] = device + + def add_link(self, link): + # Use the source device global_id and target device global_id as the key + self._links[(link.source.global_id, link.target.global_id)] = link + + def __str__(self): + str = "" + for device in self.devices.values(): + str += ", device: {}".format(device) + for link in self.links.values(): + str += ", link: {}".format(link) + return str + + def __repr__(self): + return self.__str__() + + +class Cluster: + """ + The cluster is an abstract of the hardware resource for training, which contains the cluster topology and + related hardware information. It will serve the task mapping, cost model and auto searching. + """ + + def __init__(self): + # Used to compute machine id + self._num_machines = 0 + # Store all machines within the cluster + self._machines = {} + # Cluster graph topology + self._topology = None + + @property + def machines(self): + return self._machines + + def add_machine(self, machine): + assert isinstance(machine, Machine) + self._machines[machine.id] = machine + + def add_device(self, device): + assert isinstance(device, Device) + device.machine.add_device(device) + + def add_link(self, link): + assert isinstance(link, Link) + # Only add the link to the source machine + link.source.machine.add_link(link) + + def get_device(self, device_global_id): + device = None + for machine in self.machines.values(): + if device_global_id in machine.devices.keys(): + device = machine.devices[device_global_id] + return device + + def build_from_file(self, json_file_path): + with open(json_file_path) as json_file: + cluster_info = json.load(json_file) + machines_info = cluster_info["machines"] + for machine_info in machines_info: + machine_id = self._generate_machine_id() + machine = Machine(machine_id) + machine.hostname = machine_info.get("hostname") + machine.addr = machine_info.get("addr") + machine.port = machine_info.get("port") + devices_info = machine_info.get("devices", []) + for device_info in devices_info: + device_global_id = device_info.get("global_id") + device_local_id = device_info.get("local_id") + device = Device(device_global_id, device_local_id, machine) + device_type = device_info.get("type", None) + if device_type is not None: + device_type = DeviceType[device_type] + else: + device_type = DeviceType.UNKNOWN + device.type = device_type + device.model = device_info.get("model", None) + device.dp_gflops = float(device_info.get("dp_gflops", 0)) + device.sp_gflops = float(device_info.get("sp_gflops", 0)) + device.memory = float(device_info.get("memory", 0)) + self.add_device(device) + self.add_machine(machine) + for machine_info in machines_info: + links_info = machine_info.get("links", []) + for link_info in links_info: + source_global_id = link_info.get("source_global_id") + target_global_id = link_info.get("target_global_id") + source = self.get_device(source_global_id) + target = self.get_device(target_global_id) + link = Link(source, target) + link_type = link_info.get("type", None) + if link_type is not None: + link_type = LinkType[link_type] + else: + link_type = LinkType.UNKNOWN + link.type = link_type + link.bandwidth = float(link_info.get("bandwidth", 0)) + link.latency = float(link_info.get("latency", 0)) + self.add_link(link) + + def _generate_machine_id(self): + cur_machine_id = self._num_machines + self._num_machines += 1 + return cur_machine_id + + def __str__(self): + str = "" + for machine in self.machines.values(): + str += "machine: {}\n".format(machine) + return str + + def __repr__(self): + return self.__str__() diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py new file mode 100644 index 0000000000..d3942716f5 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py @@ -0,0 +1,430 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import os +import json +from paddle.distributed.auto_parallel.cluster import Cluster +from paddle.distributed.auto_parallel.cluster import DeviceType +from paddle.distributed.auto_parallel.cluster import LinkType + +cluster_json = """ +{ + "machines": [ + { + "hostname": "machine0", + "addr": "0.0.0.1", + "port": "768", + "devices": [ + { + "global_id": 0, + "local_id": 0, + "type": "GPU", + "model": "A100-SXM4-40GB", + "sp_gflops": 19500, + "dp_gflops": 9700, + "memory": 40 + }, + { + "global_id": 1, + "local_id": 1, + "type": "GPU", + "model": "A100-SXM4-40GB", + "sp_gflops": 19500, + "dp_gflops": 9700, + "memory": 40 + }, + { + "global_id": 2, + "local_id": 0, + "type": "CPU", + "model": "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GH", + "arch": "x86_64", + "vendor": "GenuineIntel", + "sp_gflops": 150, + "dp_gflops": 75, + "memory": 1510 + }, + { + "global_id": 3, + "local_id": 0, + "type": "NIC" + } + ], + "links": [ + { + "source_global_id": 0, + "target_global_id": 1, + "type": "NVL", + "bandwidth": 252 + }, + { + "source_global_id": 0, + "target_global_id": 2, + "type": "PHB", + "bandwidth": 12 + }, + { + "source_global_id": 1, + "target_global_id": 2, + "type": "PHB", + "bandwidth": 12 + }, + { + "source_global_id": 0, + "target_global_id": 3, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 1, + "target_global_id": 3, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 2, + "target_global_id": 3, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 3, + "target_global_id": 7, + "type": "NET", + "bandwidth": 1 + } + ] + }, + { + "hostname": "machine1", + "addr": "0.0.0.2", + "port": "768", + "devices": [ + { + "global_id": 4, + "local_id": 0, + "type": "GPU", + "model": "Tesla V100-SXM2-32GB", + "sp_gflops": 15700, + "dp_gflops": 7800, + "memory": 32 + }, + { + "global_id": 5, + "local_id": 1, + "type": "GPU", + "model": "Tesla V100-SXM2-32GB", + "sp_gflops": 15700, + "dp_gflops": 7800, + "memory": 32 + }, + { + "global_id": 6, + "local_id": 0, + "type": "CPU", + "model": "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G", + "arch": "x86_64", + "vendor": "GenuineIntel", + "sp_gflops": 150, + "dp_gflops": 75, + "memory": "503" + }, + { + "global_id": 7, + "local_id": 0, + "type": "NIC" + } + ], + "links": [ + { + "source_global_id": 4, + "target_global_id": 5, + "type": "NVL", + "bandwidth": 42 + }, + { + "source_global_id": 4, + "target_global_id": 6, + "type": "PHB", + "bandwidth": 12 + }, + { + "source_global_id": 5, + "target_global_id": 6, + "type": "PHB", + "bandwidth": 12 + }, + { + "source_global_id": 4, + "target_global_id": 7, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 5, + "target_global_id": 7, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 6, + "target_global_id": 7, + "type": "NET", + "bandwidth": 1 + }, + { + "source_global_id": 7, + "target_global_id": 3, + "type": "NET", + "bandwidth": 1 + } + ] + } + ] +} +""" + + +class TestAutoParallelCluster(unittest.TestCase): + def test_cluster(self): + cluster_json_file = "" + cluster_json_object = json.loads(cluster_json) + with open("./auto_parallel_cluster.json", "w") as cluster_json_file: + json.dump(cluster_json_object, cluster_json_file) + + cluster = Cluster() + cluster.build_from_file("./auto_parallel_cluster.json") + os.remove("./auto_parallel_cluster.json") + self.assertEqual(len(cluster.machines), 2) + + # machine0 + machine0 = cluster.machines[0] + self.assertEqual(machine0.id, 0) + self.assertEqual(machine0.hostname, "machine0") + self.assertEqual(machine0.addr, "0.0.0.1") + self.assertEqual(machine0.port, "768") + self.assertEqual(len(machine0.devices), 4) + self.assertEqual(len(machine0.links), 7) + + # device0 + device0_machine0 = machine0.devices[0] + self.assertEqual(device0_machine0.global_id, 0) + self.assertEqual(device0_machine0.local_id, 0) + self.assertEqual(device0_machine0.type, DeviceType.GPU) + self.assertEqual(device0_machine0.model, "A100-SXM4-40GB") + self.assertAlmostEqual(device0_machine0.sp_gflops, 19500) + self.assertAlmostEqual(device0_machine0.dp_gflops, 9700) + self.assertAlmostEqual(device0_machine0.memory, 40) + + # device0, link0 + link0_machine0 = machine0.links[(0, 1)] + self.assertEqual(link0_machine0.source.global_id, 0) + self.assertEqual(link0_machine0.target.global_id, 1) + self.assertEqual(link0_machine0.type, LinkType.NVL) + self.assertAlmostEqual(link0_machine0.bandwidth, 252) + self.assertAlmostEqual(link0_machine0.latency, 0) + + # device 0, link 1 + link1_machine0 = machine0.links[(0, 2)] + self.assertEqual(link1_machine0.source.global_id, 0) + self.assertEqual(link1_machine0.target.global_id, 2) + self.assertEqual(link1_machine0.type, LinkType.PHB) + self.assertAlmostEqual(link1_machine0.bandwidth, 12) + self.assertAlmostEqual(link1_machine0.latency, 0) + + # device0, link2 + link2_machine0 = machine0.links[(0, 3)] + self.assertEqual(link2_machine0.source.global_id, 0) + self.assertEqual(link2_machine0.target.global_id, 3) + self.assertEqual(link2_machine0.type, LinkType.NET) + self.assertAlmostEqual(link2_machine0.bandwidth, 1) + self.assertAlmostEqual(link2_machine0.latency, 0) + + # device1 + device1_machine0 = machine0.devices[1] + self.assertEqual(device1_machine0.global_id, 1) + self.assertEqual(device1_machine0.local_id, 1) + self.assertEqual(device1_machine0.type, DeviceType.GPU) + self.assertEqual(device1_machine0.model, "A100-SXM4-40GB") + self.assertAlmostEqual(device1_machine0.sp_gflops, 19500) + self.assertAlmostEqual(device1_machine0.dp_gflops, 9700) + self.assertAlmostEqual(device1_machine0.memory, 40) + + # device1, link0 + link0_machine0 = machine0.links[(1, 2)] + self.assertEqual(link0_machine0.source.global_id, 1) + self.assertEqual(link0_machine0.target.global_id, 2) + self.assertEqual(link0_machine0.type, LinkType.PHB) + self.assertAlmostEqual(link0_machine0.bandwidth, 12) + self.assertAlmostEqual(link0_machine0.latency, 0) + + # device1, link1 + link1_machine0 = machine0.links[(1, 3)] + self.assertEqual(link1_machine0.source.global_id, 1) + self.assertEqual(link1_machine0.target.global_id, 3) + self.assertEqual(link1_machine0.type, LinkType.NET) + self.assertAlmostEqual(link1_machine0.bandwidth, 1) + self.assertAlmostEqual(link1_machine0.latency, 0) + + # device2 + device2_machine0 = machine0.devices[2] + self.assertEqual(device2_machine0.global_id, 2) + self.assertEqual(device2_machine0.local_id, 0) + self.assertEqual(device2_machine0.type, DeviceType.CPU) + self.assertEqual(device2_machine0.model, + "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GH") + self.assertAlmostEqual(device2_machine0.sp_gflops, 150) + self.assertAlmostEqual(device2_machine0.dp_gflops, 75) + self.assertAlmostEqual(device2_machine0.memory, 1510) + + # device2, link0 + link0_machine0 = machine0.links[(2, 3)] + self.assertEqual(link0_machine0.source.global_id, 2) + self.assertEqual(link0_machine0.target.global_id, 3) + self.assertEqual(link0_machine0.type, LinkType.NET) + self.assertAlmostEqual(link0_machine0.bandwidth, 1) + self.assertAlmostEqual(link0_machine0.latency, 0) + + # device3 + device3_machine0 = machine0.devices[3] + self.assertEqual(device3_machine0.global_id, 3) + self.assertEqual(device3_machine0.local_id, 0) + self.assertEqual(device3_machine0.type, DeviceType.NIC) + self.assertAlmostEqual(device3_machine0.model, None) + self.assertAlmostEqual(device3_machine0.sp_gflops, 0) + self.assertAlmostEqual(device3_machine0.dp_gflops, 0) + self.assertAlmostEqual(device3_machine0.memory, 0) + + link0_machine0 = machine0.links[(3, 7)] + # device3, link0 + self.assertEqual(link0_machine0.source.global_id, 3) + self.assertEqual(link0_machine0.target.global_id, 7) + self.assertEqual(link0_machine0.type, LinkType.NET) + self.assertAlmostEqual(link0_machine0.bandwidth, 1) + self.assertAlmostEqual(link0_machine0.latency, 0) + + # machine1 + machine1 = cluster.machines[1] + self.assertEqual(machine1.id, 1) + self.assertEqual(machine1.hostname, "machine1") + self.assertEqual(machine1.addr, "0.0.0.2") + self.assertEqual(machine1.port, "768") + self.assertEqual(len(machine1.devices), 4) + self.assertEqual(len(machine1.links), 7) + + # device4 + device4_machine1 = machine1.devices[4] + self.assertEqual(device4_machine1.global_id, 4) + self.assertEqual(device4_machine1.local_id, 0) + self.assertEqual(device4_machine1.type, DeviceType.GPU) + self.assertEqual(device4_machine1.model, "Tesla V100-SXM2-32GB") + self.assertAlmostEqual(device4_machine1.sp_gflops, 15700) + self.assertAlmostEqual(device4_machine1.dp_gflops, 7800) + self.assertAlmostEqual(device4_machine1.memory, 32) + + # device4, link0 + link0_machine1 = machine1.links[(4, 5)] + self.assertEqual(link0_machine1.source.global_id, 4) + self.assertEqual(link0_machine1.target.global_id, 5) + self.assertEqual(link0_machine1.type, LinkType.NVL) + self.assertAlmostEqual(link0_machine1.bandwidth, 42) + self.assertAlmostEqual(link0_machine1.latency, 0) + + # device 4, link 1 + link1_machine1 = machine1.links[(4, 6)] + self.assertEqual(link1_machine1.source.global_id, 4) + self.assertEqual(link1_machine1.target.global_id, 6) + self.assertEqual(link1_machine1.type, LinkType.PHB) + self.assertAlmostEqual(link1_machine1.bandwidth, 12) + self.assertAlmostEqual(link1_machine1.latency, 0) + + # device4, link2 + link2_machine1 = machine1.links[(4, 7)] + self.assertEqual(link2_machine1.source.global_id, 4) + self.assertEqual(link2_machine1.target.global_id, 7) + self.assertEqual(link2_machine1.type, LinkType.NET) + self.assertAlmostEqual(link2_machine1.bandwidth, 1) + self.assertAlmostEqual(link2_machine1.latency, 0) + + # device5 + device5_machine1 = machine1.devices[5] + self.assertEqual(device5_machine1.global_id, 5) + self.assertEqual(device5_machine1.local_id, 1) + self.assertEqual(device5_machine1.type, DeviceType.GPU) + self.assertEqual(device4_machine1.model, "Tesla V100-SXM2-32GB") + self.assertAlmostEqual(device4_machine1.sp_gflops, 15700) + self.assertAlmostEqual(device4_machine1.dp_gflops, 7800) + self.assertAlmostEqual(device4_machine1.memory, 32) + + # device5, link0 + link0_machine1 = machine1.links[(5, 6)] + self.assertEqual(link0_machine1.source.global_id, 5) + self.assertEqual(link0_machine1.target.global_id, 6) + self.assertEqual(link0_machine1.type, LinkType.PHB) + self.assertAlmostEqual(link0_machine1.bandwidth, 12) + self.assertAlmostEqual(link0_machine1.latency, 0) + + # device5, link1 + link1_machine1 = machine1.links[(5, 7)] + self.assertEqual(link1_machine1.source.global_id, 5) + self.assertEqual(link1_machine1.target.global_id, 7) + self.assertEqual(link1_machine1.type, LinkType.NET) + self.assertAlmostEqual(link1_machine1.bandwidth, 1) + self.assertAlmostEqual(link1_machine1.latency, 0) + + # device6 + device6_machine1 = machine1.devices[6] + self.assertEqual(device6_machine1.global_id, 6) + self.assertEqual(device6_machine1.local_id, 0) + self.assertEqual(device6_machine1.type, DeviceType.CPU) + self.assertEqual(device6_machine1.model, + "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G") + self.assertAlmostEqual(device6_machine1.sp_gflops, 150) + self.assertAlmostEqual(device6_machine1.dp_gflops, 75) + self.assertAlmostEqual(device6_machine1.memory, 503) + + # device6, link0 + link0_machine1 = machine1.links[(6, 7)] + self.assertEqual(link0_machine1.source.global_id, 6) + self.assertEqual(link0_machine1.target.global_id, 7) + self.assertEqual(link0_machine1.type, LinkType.NET) + self.assertAlmostEqual(link0_machine1.bandwidth, 1) + self.assertAlmostEqual(link0_machine1.latency, 0) + + # device7 + device7_machine1 = machine1.devices[7] + self.assertEqual(device7_machine1.global_id, 7) + self.assertEqual(device7_machine1.local_id, 0) + self.assertEqual(device7_machine1.type, DeviceType.NIC) + self.assertAlmostEqual(device7_machine1.model, None) + self.assertAlmostEqual(device7_machine1.sp_gflops, 0) + self.assertAlmostEqual(device7_machine1.dp_gflops, 0) + self.assertAlmostEqual(device7_machine1.memory, 0) + + # device3, link0 + link0_machine1 = machine1.links[(7, 3)] + self.assertEqual(link0_machine1.source.global_id, 7) + self.assertEqual(link0_machine1.target.global_id, 3) + self.assertEqual(link0_machine1.type, LinkType.NET) + self.assertAlmostEqual(link0_machine1.bandwidth, 1) + self.assertAlmostEqual(link0_machine1.latency, 0) + + str = "cluster: {}".format(cluster) + + +if __name__ == '__main__': + unittest.main() -- GitLab