未验证 提交 67f87d6d 编写于 作者: D danleifeng 提交者: GitHub

【cherry-pick 1.8】fix ports conflict when use paddlecloud to launch analogue multi-nodes (#27117)

* add DISTRIBUTED_TRAINER_ENDPOINTS env for cloud
上级 80b08609
......@@ -19,7 +19,7 @@ from paddle.distributed.utils import get_cluster, logger
def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
"""
args_node_ips, args_node_ip:string
args_node_ips:string, args_node_ip:string, args_port: int, selected_gpus:list
"""
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS")
......@@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
node_rank = os.getenv("PADDLE_TRAINER_ID")
assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM"))
assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None"
node_ips = node_ips.split(",")
num_nodes = len(node_ips)
node_rank = int(node_rank)
......@@ -47,32 +50,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args_node_ips, node_ips))
started_port = args_port
print("num_nodes:", num_nodes)
if num_nodes > 1:
try:
paddle_port = int(os.getenv("PADDLE_PORT", ""))
paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", ""))
if paddle_port_num >= len(
selected_gpus) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
started_port = paddle_port
except Exception as e:
print(e)
pass
if started_port is None:
started_port = 6170
logger.debug("parsed from args:node_ips:{} \
node_ip:{} node_rank:{} started_port:{}"
.format(node_ips, node_ip, node_rank, started_port))
ports = [x for x in range(started_port, started_port + len(selected_gpus))]
cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus)
# DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4
# e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8"
trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS")
if trainer_endpoints is None:
started_port = args_port
if num_nodes > 1:
try:
paddle_port = int(os.getenv("PADDLE_PORT", ""))
if paddle_ports_num >= len(
selected_gpus) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
started_port = paddle_port
except Exception as e:
print(e)
pass
if started_port is None:
started_port = 6170
ports = [
x for x in range(started_port, started_port + len(selected_gpus))
]
trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports])
else:
trainer_endpoints_ori = trainer_endpoints.split(",")
trainer_endpoints = []
assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori)
for i in range(num_nodes):
trainer_endpoints.append(trainer_endpoints_ori[
i * paddle_ports_num:(i + 1) * paddle_ports_num])
logger.debug("parsed from args: node_ips:{} \
node_ip:{} node_rank:{} trainer_endpoints:{}"
.format(node_ips, node_ip, node_rank, trainer_endpoints))
cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints,
selected_gpus)
return cluster, cluster.pods[node_rank]
......
......@@ -162,7 +162,10 @@ def get_cluster_from_args(args, selected_gpus):
x for x in range(started_port, started_port + len(selected_gpus))
]
return get_cluster(node_ips, node_ip, free_ports, selected_gpus)
trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)
def get_gpus(selected_gpus):
......
......@@ -227,18 +227,23 @@ def get_logger(log_level, name="root"):
return logger
def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus):
assert type(paddle_ports) is list, "paddle_ports must be list"
def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None)
trainer_rank = 0
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
cur_node_endpoints = trainer_endpoints[node_rank]
# when use paddlecloud, endpoints may > selected_gpus(user_defined)
assert len(cur_node_endpoints) >= len(
selected_gpus
), "current trainer_endpoints size should be greater equal than selected_gpus size."
for i in range(len(selected_gpus)):
trainer = Trainer()
trainer.gpus.append(selected_gpus[i])
trainer.endpoint = "%s:%d" % (ip, paddle_ports[i])
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = trainer_rank
trainer_rank += 1
......
......@@ -48,9 +48,9 @@ if [ -f $file_1 ]; then
rm $file_1
fi
# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud
unset PADDLE_PORT
unset TRAINER_PORTS_NUM
export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
echo ""
echo "paddle.distributed.launch async poll process test"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册