diff --git a/python/paddle/distributed/cloud_utils.py b/python/paddle/distributed/cloud_utils.py index 345b783d60bb79e99c98c4e9d212aa11cbe91dcc..5b7268e4b64fe34e6376819a7ac5659d1a5f5959 100644 --- a/python/paddle/distributed/cloud_utils.py +++ b/python/paddle/distributed/cloud_utils.py @@ -19,7 +19,7 @@ from paddle.distributed.utils import get_cluster, logger def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): """ - args_node_ips, args_node_ip:string + args_node_ips:string, args_node_ip:string, args_port: int, selected_gpus:list """ #you can automatically get ip info while using paddlecloud multi nodes mode. node_ips = os.getenv("PADDLE_TRAINERS") @@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): node_rank = os.getenv("PADDLE_TRAINER_ID") assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM")) + assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None" + node_ips = node_ips.split(",") num_nodes = len(node_ips) node_rank = int(node_rank) @@ -47,32 +50,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ paddlecloud environment.".format(args_node_ips, node_ips)) - started_port = args_port - print("num_nodes:", num_nodes) - if num_nodes > 1: - try: - paddle_port = int(os.getenv("PADDLE_PORT", "")) - paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) - - if paddle_port_num >= len( - selected_gpus) and paddle_port != args_port: - logger.warning("Use Cloud specified port:{}.".format( - paddle_port)) - started_port = paddle_port - - except Exception as e: - print(e) - pass - - if started_port is None: - started_port = 6170 - - logger.debug("parsed from args:node_ips:{} \ - node_ip:{} node_rank:{} started_port:{}" - .format(node_ips, node_ip, node_rank, started_port)) - - ports = [x for x in range(started_port, started_port + len(selected_gpus))] - cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + # DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4 + # e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8" + trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS") + if trainer_endpoints is None: + started_port = args_port + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + + if paddle_ports_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + ports = [ + x for x in range(started_port, started_port + len(selected_gpus)) + ] + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports]) + else: + trainer_endpoints_ori = trainer_endpoints.split(",") + trainer_endpoints = [] + assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori) + for i in range(num_nodes): + trainer_endpoints.append(trainer_endpoints_ori[ + i * paddle_ports_num:(i + 1) * paddle_ports_num]) + + logger.debug("parsed from args: node_ips:{} \ + node_ip:{} node_rank:{} trainer_endpoints:{}" + .format(node_ips, node_ip, node_rank, trainer_endpoints)) + + cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints, + selected_gpus) return cluster, cluster.pods[node_rank] diff --git a/python/paddle/distributed/fleet/cloud_utils.py b/python/paddle/distributed/fleet/cloud_utils.py index 49d66118d902e43f7ee0c4003c516081092b2a97..a1203bed85cadd859132ad67159b604c7b78916b 100644 --- a/python/paddle/distributed/fleet/cloud_utils.py +++ b/python/paddle/distributed/fleet/cloud_utils.py @@ -19,7 +19,7 @@ from paddle.distributed.fleet.launch_utils import get_cluster, logger def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): """ - args_node_ips, args_node_ip:string + args_node_ips:string, selected_gpus:list, args_port: int """ #you can automatically get ip info while using paddlecloud multi nodes mode. node_ips = os.getenv("PADDLE_TRAINERS") @@ -31,6 +31,9 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): node_rank = os.getenv("PADDLE_TRAINER_ID") assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM")) + assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None" + node_ips = node_ips.split(",") num_nodes = len(node_ips) node_rank = int(node_rank) @@ -42,32 +45,47 @@ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ paddlecloud environment.".format(args_node_ips, node_ips)) - started_port = args_port - print("num_nodes:", num_nodes) - if num_nodes > 1: - try: - paddle_port = int(os.getenv("PADDLE_PORT", "")) - paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) - - if paddle_port_num >= len( - selected_gpus) and paddle_port != args_port: - logger.warning("Use Cloud specified port:{}.".format( - paddle_port)) - started_port = paddle_port - - except Exception as e: - print(e) - pass - - if started_port is None: - started_port = 6170 - - logger.debug("parsed from args:node_ips:{} \ - node_ip:{} node_rank:{} started_port:{}" - .format(node_ips, node_ip, node_rank, started_port)) - - ports = [x for x in range(started_port, started_port + len(selected_gpus))] - cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + # DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4 + # e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8" + trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS") + if trainer_endpoints is None: + started_port = args_port + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + + if paddle_ports_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + ports = [ + x for x in range(started_port, started_port + len(selected_gpus)) + ] + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports]) + else: + trainer_endpoints_ori = trainer_endpoints.split(",") + trainer_endpoints = [] + assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori) + for i in range(num_nodes): + trainer_endpoints.append(trainer_endpoints_ori[ + i * paddle_ports_num:(i + 1) * paddle_ports_num]) + + logger.debug("parsed from args: node_ips:{} \ + node_ip:{} node_rank:{} trainer_endpoints:{}" + .format(node_ips, node_ip, node_rank, trainer_endpoints)) + + cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints, + selected_gpus) return cluster, cluster.pods[node_rank] @@ -75,7 +93,8 @@ def use_paddlecloud(): node_ips = os.getenv("PADDLE_TRAINERS") node_ip = os.getenv("POD_IP") node_rank = os.getenv("PADDLE_TRAINER_ID") - if node_ips is None or node_ip is None or node_rank is None: + paddle_ports_num = os.getenv("TRAINER_PORTS_NUM") + if node_ips is None or node_ip is None or node_rank is None or paddle_ports_num is None: return False else: return True diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 7778acaf83b310cfa9a04059ce6d3be2d5326089..6dba385c569be75b5b83e0a63e560ffa8ab73696 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -157,17 +157,20 @@ def get_cluster_from_args(args, gpus): free_ports = [x for x in range(start_port, start_port + len(gpus))] - return get_cluster(node_ips, node_ip, free_ports, gpus) + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) def get_gpus(gpus): if gpus is None: gpus_num = fluid.core.get_cuda_device_count() - gpus = [str(x) for x in range(0, gpus_num)] + res_gpus = [str(x) for x in range(0, gpus_num)] else: cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") if cuda_visible_devices is None or cuda_visible_devices == "": - gpus = [x.strip() for x in gpus.split(',')] + res_gpus = [x.strip() for x in gpus.split(',')] else: # change gpus into relative values # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; @@ -177,12 +180,16 @@ def get_gpus(gpus): assert x in cuda_visible_devices_list, "Can't find "\ "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ % (x, cuda_visible_devices) - gpus = [ + res_gpus = [ cuda_visible_devices_list.index(x.strip()) for x in gpus.split(',') ] + logger.info("Change selected_gpus into reletive values. --ips:{} " + "will change into relative_ips:{} according to your " + "CUDA_VISIBLE_DEVICES:{}".format( + gpus, res_gpus, cuda_visible_devices_list)) - return gpus + return res_gpus def launch_collective(args): diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 0e995200dde035842d89d9c503566b7b70ee67b7..b6f4c75a276920f966a6b324a9bea16148bf337c 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -227,18 +227,23 @@ def get_logger(log_level=20, name="root"): return logger -def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): - assert type(paddle_ports) is list, "paddle_ports must be list" +def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): + assert type(trainer_endpoints) is list, "trainer_endpoints must be list" cluster = Cluster(hdfs=None) trainer_rank = 0 for node_rank, ip in enumerate(node_ips): pod = Pod() pod.rank = node_rank pod.addr = ip + cur_node_endpoints = trainer_endpoints[node_rank] + # when use paddlecloud, endpoints may > selected_gpus(user_defined) + assert len(cur_node_endpoints) >= len( + selected_gpus + ), "current trainer_endpoints size should be greater equal than selected_gpus size." for i in range(len(selected_gpus)): trainer = Trainer() trainer.gpus.append(selected_gpus[i]) - trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.rank = trainer_rank trainer_rank += 1 @@ -424,10 +429,6 @@ def start_local_trainers(cluster, len(pod.trainers), pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) - logger.info( - "More details for debug about commands and environments are written in {}/run.sh". - format(log_dir)) - fn = None if log_dir is not None: os.system("mkdir -p {}".format(log_dir)) diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index e2ab321f9aebddd437c92ded9e6005495f760096..9b969cf3002379058b9cff0d604d2db750573028 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -160,18 +160,21 @@ def get_cluster_from_args(args, selected_gpus): x for x in range(started_port, started_port + len(selected_gpus)) ] - return get_cluster(node_ips, node_ip, free_ports, selected_gpus) + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus) def get_gpus(selected_gpus): if selected_gpus is None: from paddle.fluid import core gpus_num = core.get_cuda_device_count() - selected_gpus = [str(x) for x in range(0, gpus_num)] + gpus = [str(x) for x in range(0, gpus_num)] else: cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") if cuda_visible_devices is None or cuda_visible_devices == "": - selected_gpus = [x.strip() for x in selected_gpus.split(',')] + gpus = [x.strip() for x in selected_gpus.split(',')] else: # change selected_gpus into relative values # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; @@ -181,12 +184,16 @@ def get_gpus(selected_gpus): assert x in cuda_visible_devices_list, "Can't find "\ "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ % (x, cuda_visible_devices) - selected_gpus = [ + gpus = [ cuda_visible_devices_list.index(x.strip()) for x in selected_gpus.split(',') ] + logger.info("Change selected_gpus into reletive values. --ips:{} " + "will change into relative_ips:{} according to your " + "CUDA_VISIBLE_DEVICES:{}".format( + selected_gpus, gpus, cuda_visible_devices_list)) - return selected_gpus + return gpus def get_cluster_and_pod(args): diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index 1fa307c4d1b89d4033a8f8346b254177053e9dc0..be144a55b86200042f4d03b112071a374612b3a5 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -227,18 +227,23 @@ def get_logger(log_level, name="root"): return logger -def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): - assert type(paddle_ports) is list, "paddle_ports must be list" +def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): + assert type(trainer_endpoints) is list, "trainer_endpoints must be list" cluster = Cluster(hdfs=None) trainer_rank = 0 for node_rank, ip in enumerate(node_ips): pod = Pod() pod.rank = node_rank pod.addr = ip + cur_node_endpoints = trainer_endpoints[node_rank] + # when use paddlecloud, endpoints may > selected_gpus(user_defined) + assert len(cur_node_endpoints) >= len( + selected_gpus + ), "current trainer_endpoints size should be greater equal than selected_gpus size." for i in range(len(selected_gpus)): trainer = Trainer() trainer.gpus.append(selected_gpus[i]) - trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.rank = trainer_rank trainer_rank += 1 @@ -253,7 +258,8 @@ def terminate_local_procs(procs): for p in procs: if p.proc.poll() is None: p.proc.terminate() - p.log_fn.close() + if p.log_fn: + p.log_fn.close() logger.debug("terminate process id:{}".format(p.proc.pid)) #wait all process terminiated diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh index c5edc96963408bf1fad793f7271d75159934f019..e717962ead2e2da30092b12379bf36f368e8a735 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh @@ -79,9 +79,9 @@ if [ -f $file_1 ]; then rm $file_1 fi - +# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud unset PADDLE_PORT -unset TRAINER_PORTS_NUM +export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 echo "" echo "paddle.distributed.launch async poll process test" diff --git a/python/paddle/fluid/tests/unittests/test_launch.sh b/python/paddle/fluid/tests/unittests/test_launch.sh index 98c907a551965331f79d1635362213b43d867002..958d78246627d4cd2f826f74aeccff5ffe254034 100644 --- a/python/paddle/fluid/tests/unittests/test_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_launch.sh @@ -48,9 +48,9 @@ if [ -f $file_1 ]; then rm $file_1 fi - +# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud unset PADDLE_PORT -unset TRAINER_PORTS_NUM +export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 echo "" echo "paddle.distributed.launch async poll process test" diff --git a/python/paddle/tests/test_dist_hapi_model.py b/python/paddle/tests/test_dist_hapi_model.py index e75e08e3749e6ce629e88c486e4f87d9109dc709..db5b63c5ae0e29fa6f1274befd277c4e46c3a1b1 100644 --- a/python/paddle/tests/test_dist_hapi_model.py +++ b/python/paddle/tests/test_dist_hapi_model.py @@ -37,7 +37,11 @@ def get_cluster_from_args(selected_gpus): free_ports = find_free_ports(len(selected_gpus)) if free_ports is not None: free_ports = list(free_ports) - return get_cluster(node_ips, node_ip, free_ports, selected_gpus) + + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus) def get_gpus(selected_gpus):