未验证 提交 389a9a7e 编写于 作者: D danleifeng 提交者: GitHub

fix ports conflict when use paddlecloud to launch analogue multi-nodes (#26191)

* fix ports conflict when launching multi-nodes in paddlecloud;test=develop

* add DISTRIBUTED_TRAINER_ENDPOINTS env for cloud;test=develop
上级 dae62556
...@@ -19,7 +19,7 @@ from paddle.distributed.utils import get_cluster, logger ...@@ -19,7 +19,7 @@ from paddle.distributed.utils import get_cluster, logger
def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
""" """
args_node_ips, args_node_ip:string args_node_ips:string, args_node_ip:string, args_port: int, selected_gpus:list
""" """
#you can automatically get ip info while using paddlecloud multi nodes mode. #you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS") node_ips = os.getenv("PADDLE_TRAINERS")
...@@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): ...@@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
node_rank = os.getenv("PADDLE_TRAINER_ID") node_rank = os.getenv("PADDLE_TRAINER_ID")
assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM"))
assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None"
node_ips = node_ips.split(",") node_ips = node_ips.split(",")
num_nodes = len(node_ips) num_nodes = len(node_ips)
node_rank = int(node_rank) node_rank = int(node_rank)
...@@ -47,32 +50,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ ...@@ -47,32 +50,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args_node_ips, node_ips)) paddlecloud environment.".format(args_node_ips, node_ips))
started_port = args_port # DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4
print("num_nodes:", num_nodes) # e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8"
if num_nodes > 1: trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS")
try: if trainer_endpoints is None:
paddle_port = int(os.getenv("PADDLE_PORT", "")) started_port = args_port
paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) if num_nodes > 1:
try:
if paddle_port_num >= len( paddle_port = int(os.getenv("PADDLE_PORT", ""))
selected_gpus) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format( if paddle_ports_num >= len(
paddle_port)) selected_gpus) and paddle_port != args_port:
started_port = paddle_port logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
except Exception as e: started_port = paddle_port
print(e)
pass except Exception as e:
print(e)
if started_port is None: pass
started_port = 6170
if started_port is None:
logger.debug("parsed from args:node_ips:{} \ started_port = 6170
node_ip:{} node_rank:{} started_port:{}" ports = [
.format(node_ips, node_ip, node_rank, started_port)) x for x in range(started_port, started_port + len(selected_gpus))
]
ports = [x for x in range(started_port, started_port + len(selected_gpus))] trainer_endpoints = []
cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports])
else:
trainer_endpoints_ori = trainer_endpoints.split(",")
trainer_endpoints = []
assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori)
for i in range(num_nodes):
trainer_endpoints.append(trainer_endpoints_ori[
i * paddle_ports_num:(i + 1) * paddle_ports_num])
logger.debug("parsed from args: node_ips:{} \
node_ip:{} node_rank:{} trainer_endpoints:{}"
.format(node_ips, node_ip, node_rank, trainer_endpoints))
cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints,
selected_gpus)
return cluster, cluster.pods[node_rank] return cluster, cluster.pods[node_rank]
......
...@@ -19,7 +19,7 @@ from paddle.distributed.fleet.launch_utils import get_cluster, logger ...@@ -19,7 +19,7 @@ from paddle.distributed.fleet.launch_utils import get_cluster, logger
def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
""" """
args_node_ips, args_node_ip:string args_node_ips:string, selected_gpus:list, args_port: int
""" """
#you can automatically get ip info while using paddlecloud multi nodes mode. #you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS") node_ips = os.getenv("PADDLE_TRAINERS")
...@@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): ...@@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
node_rank = os.getenv("PADDLE_TRAINER_ID") node_rank = os.getenv("PADDLE_TRAINER_ID")
assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM"))
assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None"
node_ips = node_ips.split(",") node_ips = node_ips.split(",")
num_nodes = len(node_ips) num_nodes = len(node_ips)
node_rank = int(node_rank) node_rank = int(node_rank)
...@@ -42,32 +45,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ ...@@ -42,32 +45,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args_node_ips, node_ips)) paddlecloud environment.".format(args_node_ips, node_ips))
started_port = args_port # DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4
print("num_nodes:", num_nodes) # e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8"
if num_nodes > 1: trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS")
try: if trainer_endpoints is None:
paddle_port = int(os.getenv("PADDLE_PORT", "")) started_port = args_port
paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) if num_nodes > 1:
try:
if paddle_port_num >= len( paddle_port = int(os.getenv("PADDLE_PORT", ""))
selected_gpus) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format( if paddle_ports_num >= len(
paddle_port)) selected_gpus) and paddle_port != args_port:
started_port = paddle_port logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
except Exception as e: started_port = paddle_port
print(e)
pass except Exception as e:
print(e)
if started_port is None: pass
started_port = 6170
if started_port is None:
logger.debug("parsed from args:node_ips:{} \ started_port = 6170
node_ip:{} node_rank:{} started_port:{}" ports = [
.format(node_ips, node_ip, node_rank, started_port)) x for x in range(started_port, started_port + len(selected_gpus))
]
ports = [x for x in range(started_port, started_port + len(selected_gpus))] trainer_endpoints = []
cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports])
else:
trainer_endpoints_ori = trainer_endpoints.split(",")
trainer_endpoints = []
assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori)
for i in range(num_nodes):
trainer_endpoints.append(trainer_endpoints_ori[
i * paddle_ports_num:(i + 1) * paddle_ports_num])
logger.debug("parsed from args: node_ips:{} \
node_ip:{} node_rank:{} trainer_endpoints:{}"
.format(node_ips, node_ip, node_rank, trainer_endpoints))
cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints,
selected_gpus)
return cluster, cluster.pods[node_rank] return cluster, cluster.pods[node_rank]
...@@ -75,7 +93,8 @@ def use_paddlecloud(): ...@@ -75,7 +93,8 @@ def use_paddlecloud():
node_ips = os.getenv("PADDLE_TRAINERS") node_ips = os.getenv("PADDLE_TRAINERS")
node_ip = os.getenv("POD_IP") node_ip = os.getenv("POD_IP")
node_rank = os.getenv("PADDLE_TRAINER_ID") node_rank = os.getenv("PADDLE_TRAINER_ID")
if node_ips is None or node_ip is None or node_rank is None: paddle_ports_num = os.getenv("TRAINER_PORTS_NUM")
if node_ips is None or node_ip is None or node_rank is None or paddle_ports_num is None:
return False return False
else: else:
return True return True
......
...@@ -157,17 +157,20 @@ def get_cluster_from_args(args, gpus): ...@@ -157,17 +157,20 @@ def get_cluster_from_args(args, gpus):
free_ports = [x for x in range(start_port, start_port + len(gpus))] free_ports = [x for x in range(start_port, start_port + len(gpus))]
return get_cluster(node_ips, node_ip, free_ports, gpus) trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, gpus)
def get_gpus(gpus): def get_gpus(gpus):
if gpus is None: if gpus is None:
gpus_num = fluid.core.get_cuda_device_count() gpus_num = fluid.core.get_cuda_device_count()
gpus = [str(x) for x in range(0, gpus_num)] res_gpus = [str(x) for x in range(0, gpus_num)]
else: else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "": if cuda_visible_devices is None or cuda_visible_devices == "":
gpus = [x.strip() for x in gpus.split(',')] res_gpus = [x.strip() for x in gpus.split(',')]
else: else:
# change gpus into relative values # change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
...@@ -177,12 +180,16 @@ def get_gpus(gpus): ...@@ -177,12 +180,16 @@ def get_gpus(gpus):
assert x in cuda_visible_devices_list, "Can't find "\ assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices) % (x, cuda_visible_devices)
gpus = [ res_gpus = [
cuda_visible_devices_list.index(x.strip()) cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',') for x in gpus.split(',')
] ]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
gpus, res_gpus, cuda_visible_devices_list))
return gpus return res_gpus
def launch_collective(args): def launch_collective(args):
......
...@@ -227,18 +227,23 @@ def get_logger(log_level=20, name="root"): ...@@ -227,18 +227,23 @@ def get_logger(log_level=20, name="root"):
return logger return logger
def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
assert type(paddle_ports) is list, "paddle_ports must be list" assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None) cluster = Cluster(hdfs=None)
trainer_rank = 0 trainer_rank = 0
for node_rank, ip in enumerate(node_ips): for node_rank, ip in enumerate(node_ips):
pod = Pod() pod = Pod()
pod.rank = node_rank pod.rank = node_rank
pod.addr = ip pod.addr = ip
cur_node_endpoints = trainer_endpoints[node_rank]
# when use paddlecloud, endpoints may > selected_gpus(user_defined)
assert len(cur_node_endpoints) >= len(
selected_gpus
), "current trainer_endpoints size should be greater equal than selected_gpus size."
for i in range(len(selected_gpus)): for i in range(len(selected_gpus)):
trainer = Trainer() trainer = Trainer()
trainer.gpus.append(selected_gpus[i]) trainer.gpus.append(selected_gpus[i])
trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = trainer_rank trainer.rank = trainer_rank
trainer_rank += 1 trainer_rank += 1
...@@ -424,10 +429,6 @@ def start_local_trainers(cluster, ...@@ -424,10 +429,6 @@ def start_local_trainers(cluster,
len(pod.trainers), len(pod.trainers),
pretty_print_envs(proc_env, ("Distributed Envs", pretty_print_envs(proc_env, ("Distributed Envs",
"Value")))) "Value"))))
logger.info(
"More details for debug about commands and environments are written in {}/run.sh".
format(log_dir))
fn = None fn = None
if log_dir is not None: if log_dir is not None:
os.system("mkdir -p {}".format(log_dir)) os.system("mkdir -p {}".format(log_dir))
......
...@@ -160,18 +160,21 @@ def get_cluster_from_args(args, selected_gpus): ...@@ -160,18 +160,21 @@ def get_cluster_from_args(args, selected_gpus):
x for x in range(started_port, started_port + len(selected_gpus)) x for x in range(started_port, started_port + len(selected_gpus))
] ]
return get_cluster(node_ips, node_ip, free_ports, selected_gpus) trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)
def get_gpus(selected_gpus): def get_gpus(selected_gpus):
if selected_gpus is None: if selected_gpus is None:
from paddle.fluid import core from paddle.fluid import core
gpus_num = core.get_cuda_device_count() gpus_num = core.get_cuda_device_count()
selected_gpus = [str(x) for x in range(0, gpus_num)] gpus = [str(x) for x in range(0, gpus_num)]
else: else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "": if cuda_visible_devices is None or cuda_visible_devices == "":
selected_gpus = [x.strip() for x in selected_gpus.split(',')] gpus = [x.strip() for x in selected_gpus.split(',')]
else: else:
# change selected_gpus into relative values # change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
...@@ -181,12 +184,16 @@ def get_gpus(selected_gpus): ...@@ -181,12 +184,16 @@ def get_gpus(selected_gpus):
assert x in cuda_visible_devices_list, "Can't find "\ assert x in cuda_visible_devices_list, "Can't find "\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices) % (x, cuda_visible_devices)
selected_gpus = [ gpus = [
cuda_visible_devices_list.index(x.strip()) cuda_visible_devices_list.index(x.strip())
for x in selected_gpus.split(',') for x in selected_gpus.split(',')
] ]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
selected_gpus, gpus, cuda_visible_devices_list))
return selected_gpus return gpus
def get_cluster_and_pod(args): def get_cluster_and_pod(args):
......
...@@ -227,18 +227,23 @@ def get_logger(log_level, name="root"): ...@@ -227,18 +227,23 @@ def get_logger(log_level, name="root"):
return logger return logger
def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
assert type(paddle_ports) is list, "paddle_ports must be list" assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None) cluster = Cluster(hdfs=None)
trainer_rank = 0 trainer_rank = 0
for node_rank, ip in enumerate(node_ips): for node_rank, ip in enumerate(node_ips):
pod = Pod() pod = Pod()
pod.rank = node_rank pod.rank = node_rank
pod.addr = ip pod.addr = ip
cur_node_endpoints = trainer_endpoints[node_rank]
# when use paddlecloud, endpoints may > selected_gpus(user_defined)
assert len(cur_node_endpoints) >= len(
selected_gpus
), "current trainer_endpoints size should be greater equal than selected_gpus size."
for i in range(len(selected_gpus)): for i in range(len(selected_gpus)):
trainer = Trainer() trainer = Trainer()
trainer.gpus.append(selected_gpus[i]) trainer.gpus.append(selected_gpus[i])
trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = trainer_rank trainer.rank = trainer_rank
trainer_rank += 1 trainer_rank += 1
...@@ -253,7 +258,8 @@ def terminate_local_procs(procs): ...@@ -253,7 +258,8 @@ def terminate_local_procs(procs):
for p in procs: for p in procs:
if p.proc.poll() is None: if p.proc.poll() is None:
p.proc.terminate() p.proc.terminate()
p.log_fn.close() if p.log_fn:
p.log_fn.close()
logger.debug("terminate process id:{}".format(p.proc.pid)) logger.debug("terminate process id:{}".format(p.proc.pid))
#wait all process terminiated #wait all process terminiated
......
...@@ -79,9 +79,9 @@ if [ -f $file_1 ]; then ...@@ -79,9 +79,9 @@ if [ -f $file_1 ]; then
rm $file_1 rm $file_1
fi fi
# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud
unset PADDLE_PORT unset PADDLE_PORT
unset TRAINER_PORTS_NUM export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
echo "" echo ""
echo "paddle.distributed.launch async poll process test" echo "paddle.distributed.launch async poll process test"
......
...@@ -48,9 +48,9 @@ if [ -f $file_1 ]; then ...@@ -48,9 +48,9 @@ if [ -f $file_1 ]; then
rm $file_1 rm $file_1
fi fi
# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud
unset PADDLE_PORT unset PADDLE_PORT
unset TRAINER_PORTS_NUM export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
echo "" echo ""
echo "paddle.distributed.launch async poll process test" echo "paddle.distributed.launch async poll process test"
......
...@@ -37,7 +37,11 @@ def get_cluster_from_args(selected_gpus): ...@@ -37,7 +37,11 @@ def get_cluster_from_args(selected_gpus):
free_ports = find_free_ports(len(selected_gpus)) free_ports = find_free_ports(len(selected_gpus))
if free_ports is not None: if free_ports is not None:
free_ports = list(free_ports) free_ports = list(free_ports)
return get_cluster(node_ips, node_ip, free_ports, selected_gpus)
trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)
def get_gpus(selected_gpus): def get_gpus(selected_gpus):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册