未验证 提交 605552a9 编写于 作者: C caozhou 提交者: GitHub

[Auto Parallel]update cluster (#41722)

* update cluster
上级 42abcc08
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -43,6 +43,8 @@ class LinkType(IntEnum):
class Device:
NON_ACCELERATOR_TYPE = [DeviceType.CPU, DeviceType.NIC, DeviceType.UNKNOWN]
def __init__(self, global_id, local_id, machine):
self._global_id = global_id
self._local_id = local_id
......@@ -134,6 +136,10 @@ class Device:
class Link:
default_hop = 1
default_nic_bandwith = 24
def __init__(self, source, target):
self._src = source
self._tgt = target
......@@ -142,6 +148,7 @@ class Link:
self._bandwidth = None
# latency is stored by millisecond
self._latency = None
self._hop = None
@property
def source(self):
......@@ -183,6 +190,14 @@ class Link:
def latency(self, value):
self._latency = value
@property
def hop(self):
return self._hop
@hop.setter
def hop(self, value):
self._hop = value
def __str__(self):
str = ""
str += "source_global_id: {}, target_global_id: {}, type: {}, bandwidth: {}, latency: {}".format(
......@@ -202,6 +217,8 @@ class Machine:
self._port = None
self._devices = {}
self._links = {}
self._accelerators = {}
self._non_accelerator_cumulative_count = 0
@property
def id(self):
......@@ -243,14 +260,23 @@ class Machine:
def links(self):
return self._links
@property
def accelerators(self):
return self._accelerators
def add_device(self, device):
# Use the device global_id as the key
self._devices[device.global_id] = device
if device.type not in Device.NON_ACCELERATOR_TYPE:
self._accelerators[device.global_id] = device
def add_link(self, link):
# Use the source device global_id and target device global_id as the key
self._links[(link.source.global_id, link.target.global_id)] = link
def get_link(self, source_global_id, target_global_id):
return self._links.get((source_global_id, target_global_id), None)
def __str__(self):
str = ""
for device in self.devices.values():
......@@ -263,6 +289,109 @@ class Machine:
return self.__str__()
class AlphaLatency:
def __init__(self, alpha_latency):
assert isinstance(alpha_latency, dict)
self._base = alpha_latency.get("base", None)
self._inter = alpha_latency.get("inter", None)
self._intra = alpha_latency.get("intra", None)
self._switch = alpha_latency.get("switch", None)
if self._switch is not None:
try:
self._switch = float(self._switch)
except:
raise TypeError("The switch latency must be float")
self._base_ring = self._base.get(
"ring", None) if self._base is not None else None
self._base_tree = self._base.get(
"tree", None) if self._base is not None else None
self._base_inter = self._base.get(
"inter", None) if self._base is not None else None
if self._base_ring is not None:
try:
self._base_ring = float(self._base_ring)
except:
raise TypeError("The base ring latency must be float.")
if self._base_tree is not None:
try:
self._base_tree = float(self._base_tree)
except:
raise TypeError("The base ring latency must be float.")
self._inter_ring = self._inter.get("ring", None)
self._inter_tree = self._inter.get("tree", None)
self._intra_ring = self._intra.get("ring", None)
self._intra_tree = self._intra.get("tree", None)
if self._inter_ring is not None:
if isinstance(self._inter_ring, str):
assert self._inter_ring in ["NET"]
self._inter_ring = LinkType[self._inter_ring]
else:
try:
self._inter_ring = float(self._inter_ring)
except:
raise TypeError("The inter ring latency must be float.")
if self._inter_tree is not None:
if isinstance(self._inter_tree, str):
assert self._inter_tree in ["NET"]
self._inter_tree = LinkType[self._inter_tree]
else:
try:
self._inter_tree = float(self._inter_tree)
except:
raise TypeError("The inter tree latency must be float.")
if self._intra_ring is not None:
if isinstance(self._intra_ring, str):
assert self._intra_ring in ["NVL", "PHB"]
self._intra_ring = LinkType[self._intra_ring]
else:
try:
self._intra_ring = float(self._intra_ring)
except:
raise TypeError("The intra ring latency must be float.")
if self._intra_tree is not None:
if isinstance(self._intra_tree, str):
assert self._intra_tree in ["NVL", "PHB"]
self._intra_tree = LinkType[self._intra_tree]
else:
try:
self._intra_tree = float(self._intra_tree)
except:
raise TypeError("The intra tree latency must be float.")
@property
def base_ring(self):
return self._base_ring
@property
def base_tree(self):
return self._base_tree
@property
def switch(self):
return self._switch
@property
def inter_ring(self):
return self._inter_ring
@property
def inter_tree(self):
return self._inter_tree
@property
def intra_ring(self):
return self._intra_ring
@property
def intra_tree(self):
return self._intra_tree
class Cluster:
"""
The cluster is an abstract of the hardware resource for training, which contains the cluster topology and
......@@ -276,6 +405,18 @@ class Cluster:
self._machines = {}
# Cluster graph topology
self._topology = None
# Latency for communication cost model
self._alpha_latency = None
self._rank_to_device_id = {}
self._device_id_to_rank = {}
@property
def rank_to_device_id(self):
return self._rank_to_device_id
@property
def device_id_to_rank(self):
return self._device_id_to_rank
@property
def machines(self):
......@@ -285,6 +426,35 @@ class Cluster:
assert isinstance(machine, Machine)
self._machines[machine.id] = machine
# map rank to device id and map device id to rank
if machine.id != 0:
prev_machine = self._machines[machine.id - 1]
offset = prev_machine._non_accelerator_cumulative_count
for global_id in machine.devices:
if machine.devices[
global_id].type not in Device.NON_ACCELERATOR_TYPE:
rank_id = global_id - offset
self._rank_to_device_id[rank_id] = global_id
self._device_id_to_rank[global_id] = rank_id
machine._non_accelerator_cumulative_count = len(
machine.devices) - len(
machine.accelerators
) + prev_machine._non_accelerator_cumulative_count
else:
for global_id in machine.devices:
if machine.devices[
global_id].type not in Device.NON_ACCELERATOR_TYPE:
rank_id = global_id
self._rank_to_device_id[rank_id] = global_id
self._device_id_to_rank[global_id] = rank_id
machine.accelerators[global_id] = machine.devices[global_id]
machine._non_accelerator_cumulative_count = len(
machine.devices) - len(machine.accelerators)
@property
def alpha_latency(self):
return self._alpha_latency
def add_device(self, device):
assert isinstance(device, Device)
device.machine.add_device(device)
......@@ -344,8 +514,23 @@ class Cluster:
link.type = link_type
link.bandwidth = float(link_info.get("bandwidth", 0))
link.latency = float(link_info.get("latency", 0))
link.hop = link_info.get("hop", None)
if link.hop is None:
# Set the default of hop: If in the same machine, hop is 0. And if in the different machine, hop is 1.
source_machine = source.machine
target_machine = target.machine
if source_machine.id == target_machine.id:
link.hop = 0
else:
link.hop = Link.default_hop
self.add_link(link)
if "alpha_latency" in cluster_info:
self._alpha_latency = AlphaLatency(
cluster_info.get("alpha_latency"))
else:
self._alpha_latecy = None
def _generate_machine_id(self):
cur_machine_id = self._num_machines
self._num_machines += 1
......@@ -359,6 +544,68 @@ class Cluster:
devices.append(device)
return devices
def get_beta(self, source_device_id, target_device_id):
# beta means the time transferring a byte, us/B
beta = None
convert_base = 1000
device = self.get_device(source_device_id)
machine = device.machine
link = machine.get_link(source_device_id, target_device_id)
bandwidth = None
# None means the source and target are not connected directly, set NIC in default
if link is None:
bandwidth = Link.default_nic_bandwith
else:
bandwidth = link.bandwidth
if bandwidth == 0.:
beta = 0
else:
beta = 1 / (bandwidth * (convert_base**3 / 10**6))
return beta
def get_hop(self, source_device_id, target_device_id):
beta = None
hop = None
device = self.get_device(source_device_id)
machine = device.machine
link = machine.get_link(source_device_id, target_device_id)
if link is not None:
hop = link.hop
else:
hop = Link.default_hop
return hop
def cross_machine(self, device_ids):
machine_ids = set()
for device_id in device_ids:
device = self.get_device(device_id)
machine_id = device.machine.id
machine_ids.add(machine_id)
if len(machine_ids) == 1:
return False
else:
return True
def convert_rank_to_device_id(self, group_ranks):
# group_ranks is global id of the rank in paddle
# task will use all of machine in this cluster with accelerators in default
device_ids = []
for rank in group_ranks:
device_ids.append(self.rank_to_device_id[rank])
return device_ids
def get_involved_machine_count(self, device_ids):
machine_ids = set()
for device_id in device_ids:
device = self.get_device(device_id)
machine_id = device.machine.id
machine_ids.add(machine_id)
count = len(machine_ids)
assert count > 0
return count
def __str__(self):
str = ""
for machine in self.machines.values():
......
......@@ -18,4 +18,5 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_recorder MODULES test_recorder ENVS ${dist_ENVS})
py_test_modules(test_trial MODULES test_trial ENVS ${dist_ENVS})
py_test_modules(test_new_cost_model MODULES test_new_cost_model ENVS ${dist_ENVS})
py_test_modules(test_cluster MODULES test_cluster ENVS ${dist_ENVS})
endif()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册