未验证 提交 f5aca8fb 编写于 作者: G gongweibao 提交者: GitHub

Pass device_ids info from launch to trainer. (#30632)

Pass device_ids info from launch to trainer
上级 d2404da7
...@@ -44,7 +44,13 @@ node_num=fleet.node_num ...@@ -44,7 +44,13 @@ node_num=fleet.node_num
rank=fleet.worker_index rank=fleet.worker_index
nranks=fleet.worker_num nranks=fleet.worker_num
world_size=fleet.worker_num world_size=fleet.worker_num
rank_in_node=fleet.rank_in_node # device id in current trainer
local_device_ids=fleet.local_device_ids
# device ids in world
world_device_ids=fleet.world_device_ids
# rank in node
local_rank=fleet.local_rank
rank_in_node=local_rank
is_worker = fleet.is_worker is_worker = fleet.is_worker
worker_endpoints = fleet.worker_endpoints worker_endpoints = fleet.worker_endpoints
server_num = fleet.server_num server_num = fleet.server_num
......
...@@ -291,8 +291,14 @@ class Fleet(object): ...@@ -291,8 +291,14 @@ class Fleet(object):
def node_num(self): def node_num(self):
return self._role_maker._get_node_num() return self._role_maker._get_node_num()
def rank_in_node(self): def local_rank(self):
return self._role_maker._get_rank_in_node() return self._role_maker._get_local_rank()
def local_device_ids(self):
return self._role_maker._get_local_device_ids()
def world_device_ids(self):
return self._role_maker._get_world_device_ids()
def is_worker(self): def is_worker(self):
""" """
......
...@@ -622,10 +622,20 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -622,10 +622,20 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._generate_role() self._generate_role()
return self._nodes_num return self._nodes_num
def _get_rank_in_node(self): def _get_local_rank(self):
if not self._role_is_generated: if not self._role_is_generated:
self._generate_role() self._generate_role()
return self._rank_in_node return self._local_rank
def _get_local_device_ids(self):
if not self._role_is_generated:
self._generate_role()
return self._local_device_ids
def _get_world_device_ids(self):
if not self._role_is_generated:
self._generate_role()
return self._world_device_ids
def _get_trainer_endpoints(self): def _get_trainer_endpoints(self):
""" """
...@@ -787,7 +797,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -787,7 +797,9 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._trainers_num = len(self._worker_endpoints) self._trainers_num = len(self._worker_endpoints)
self._nodes_num = len( self._nodes_num = len(
set([x.split(':')[0] for x in self._worker_endpoints])) set([x.split(':')[0] for x in self._worker_endpoints]))
self._rank_in_node = os.getenv("PADDLE_RANK_IN_NODE") self._local_rank = os.getenv("PADDLE_RANK_IN_NODE")
self._local_device_ids=os.getenv("PADDLE_LOCAL_DEVICE_IDS")
self._world_device_ids=os.getenv("PADDLE_WORLD_DEVICE_IDS")
def _gloo_init(self): def _gloo_init(self):
# PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier
......
...@@ -98,6 +98,13 @@ class Cluster(object): ...@@ -98,6 +98,13 @@ class Cluster(object):
r.append(t.endpoint) r.append(t.endpoint)
return r return r
def world_device_ids(self):
r = []
for pod in self.pods:
for t in pod.trainers:
r.append(t.accelerators)
return r
def pods_endpoints(self): def pods_endpoints(self):
r = [] r = []
for pod in self.pods: for pod in self.pods:
...@@ -452,6 +459,8 @@ def start_local_trainers(cluster, ...@@ -452,6 +459,8 @@ def start_local_trainers(cluster,
current_env.pop("http_proxy", None) current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None) current_env.pop("https_proxy", None)
ids=cluster.world_device_ids()
res = [':'.join(ele) for ele in ids]
procs = [] procs = []
for idx, t in enumerate(pod.trainers): for idx, t in enumerate(pod.trainers):
proc_env = { proc_env = {
...@@ -459,7 +468,9 @@ def start_local_trainers(cluster, ...@@ -459,7 +468,9 @@ def start_local_trainers(cluster,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"PADDLE_RANK_IN_NODE": str(idx) "PADDLE_RANK_IN_NODE": str(idx),
"PADDLE_LOCAL_DEVICE_IDS":",".join(t.accelerators),
"PADDLE_WORLD_DEVICE_IDS":",".join(res),
} }
if len(t.accelerators) > 0 and pod.device_mode==DeviceMode.GPU: if len(t.accelerators) > 0 and pod.device_mode==DeviceMode.GPU:
......
...@@ -150,7 +150,7 @@ class AscendOptimizer(Optimizer): ...@@ -150,7 +150,7 @@ class AscendOptimizer(Optimizer):
# Config about Graph Engine can be found in https://support.huaweicloud.com/ # Config about Graph Engine can be found in https://support.huaweicloud.com/
config = { config = {
"ge.exec.deviceId": str(fleet.rank_in_node()), "ge.exec.deviceId": str(fleet.local_device_ids()),
"ge.graphRunMode": "1", "ge.graphRunMode": "1",
"ge.exec.precision_mode": "must_keep_origin_dtype", "ge.exec.precision_mode": "must_keep_origin_dtype",
# if multi mode # if multi mode
......
...@@ -23,9 +23,11 @@ def train(prefix): ...@@ -23,9 +23,11 @@ def train(prefix):
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(',')) trainers_num = len(worker_endpoints.split(','))
device_ids=os.getenv("PADDLE_WORLD_DEVICE_IDS")
current_device_id=os.getenv("PADDLE_LOCAL_DEVICE_IDS")
details = "selected_accelerators:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ details = "selected_accelerators:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{} device_ids:{} device_id:{}"\
.format(selected_accelerators, worker_endpoints, trainers_num, current_endpoint,trainer_id) .format(selected_accelerators, worker_endpoints, trainers_num, current_endpoint,trainer_id,device_ids, current_device_id)
print(details) print(details)
with open("multi_process_{}.check_{}.log".format(prefix, trainer_id), "w") as f: with open("multi_process_{}.check_{}.log".format(prefix, trainer_id), "w") as f:
......
...@@ -30,8 +30,8 @@ export TRAINER_PORTS_NUM=2 ...@@ -30,8 +30,8 @@ export TRAINER_PORTS_NUM=2
distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1 --log_dir=testlog" distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1 --log_dir=testlog"
python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend
str1="selected_accelerators:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0" str1="selected_accelerators:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0 device_ids:0,1,0,1 device_id:0"
str2="selected_accelerators:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1" str2="selected_accelerators:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1 device_ids:0,1,0,1 device_id:1"
file_0="multi_process_fleetlaunchascend.check_0.log" file_0="multi_process_fleetlaunchascend.check_0.log"
file_1="multi_process_fleetlaunchascend.check_1.log" file_1="multi_process_fleetlaunchascend.check_1.log"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册