From 7de3f81cd0f5fed290e8719f8355df5f91471ec7 Mon Sep 17 00:00:00 2001 From: Bo Liu Date: Thu, 28 Oct 2021 11:03:28 +0800 Subject: [PATCH] Add lazy distributed launch with rank mapping (#36570) --- python/paddle/distributed/fleet/launch.py | 58 ++++++++--- .../paddle/distributed/fleet/launch_utils.py | 97 ++++++++++++++++++- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../test_fleet_launch_rank_mapping.sh | 64 ++++++++++++ 4 files changed, 205 insertions(+), 16 deletions(-) create mode 100755 python/paddle/fluid/tests/unittests/test_fleet_launch_rank_mapping.sh diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index b12a392501a..946c8986699 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -65,6 +65,7 @@ import os import time import six import copy +import argparse from argparse import ArgumentParser, REMAINDER import paddle import paddle.fluid as fluid @@ -162,6 +163,31 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra type=str, default="127.0.0.1", help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") + collective_group.add_argument( + "--rank_mapping_file", + type=argparse.FileType('r'), + default=sys.stdin, + help="This rank mapping information in json format is used specifically " + "for lazy launch for auto parallel. Some of the ranks in each node " + "may not be used, and the indices of rank should be kept the same " + "as the indices of sub-task splited by auto parallel. " + " { " + " \"ip_ranks\": [ " + " { " + " \"ip\": \"127.0.0.1\", " + " \"ranks\": [0,1] " + " }, " + " { " + " \"ip\": \"127.0.0.2\", " + " \"ranks\": [2,3,4] " + " } " + " ] " + " } ") + collective_group.add_argument( + "--enable_auto_mapping", + type=bool, + default=False, + help="Set true to enable the lazy launch for auto-parallel scenario.") ps_group = parser.add_argument_group("Parameter-Server Parameters") # for parameter server @@ -261,21 +287,25 @@ def launch_collective(args): start_port = 6170 if os.environ.get('FLAGS_START_PORT') is not None: start_port = os.environ.get('FLAGS_START_PORT') - if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.ips, device_mode, devices_per_proc, start_port) - logger.debug("get cluster from cloud:{}".format(cluster)) - elif device_mode == DeviceMode.ASCEND_NPU: - # for ascend - cluster, pod = ascend_utils.get_cloud_cluster( - rank_table_file=os.getenv("RANK_TABLE_FILE", None), - device_mode=device_mode, - start_port=start_port) + # lazy launch for auto-parallel + if args.enable_auto_mapping == True: + cluster, pod = get_mapped_cluster_from_args(args, device_mode) else: - # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = get_cluster_from_args(args, device_mode, - devices_per_proc) - logger.debug("get cluster from args:{}".format(cluster)) + # for ascend + if device_mode == DeviceMode.ASCEND_NPU: + cluster, pod = ascend_utils.get_cloud_cluster( + rank_table_file=os.getenv("RANK_TABLE_FILE", None), + device_mode=device_mode, + start_port=start_port) + elif cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) + logger.debug("get cluster from cloud:{}".format(cluster)) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = get_cluster_from_args(args, device_mode, + devices_per_proc) + logger.debug("get cluster from args:{}".format(cluster)) global_envs = copy.copy(os.environ.copy()) gloo_rendezvous_dir = tempfile.mkdtemp() diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 3aced0ab996..b4ebe9ef125 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -27,6 +27,7 @@ import socket import warnings import six import struct +import json import paddle import paddle.fluid as fluid @@ -527,8 +528,9 @@ def start_local_trainers(cluster, pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) logger.info( - "details abouts PADDLE_TRAINER_ENDPOINTS can be found in {}/endpoints.log, and detail running logs maybe found in {}/workerlog.0". - format(log_dir, log_dir)) + "details about PADDLE_TRAINER_ENDPOINTS can be found in " + "{}/endpoints.log, and detail running logs maybe found in " + "{}/workerlog.0".format(log_dir, log_dir)) fn = None pre_fn = None if os.name == 'nt' else os.setsid if log_dir is not None: @@ -805,6 +807,97 @@ def cloud_ps_heter_env_set(args): pretty_print_envs(environs))) +def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, + node_mapping_ranks): + assert type(trainer_endpoints) is list, "trainer_endpoints must be list" + assert device_mode == DeviceMode.GPU, \ + "Only support get mapped cluster for gpu now." + cluster = Cluster(hdfs=None) + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + pod.device_mode = device_mode + cur_node_endpoints = trainer_endpoints[node_rank] + + # choose rank from global mapped ranks and set it to the trainer. + ranks_per_node = node_mapping_ranks[node_rank] + for i in range(len(ranks_per_node)): + trainer = Trainer() + # change global rank(mapped) to local rank within each node. + # e.g. mapped ranks of node: 3,4,7 -> 0,1,2 + local_rank = ranks_per_node.index(ranks_per_node[i]) + trainer.accelerators.append(local_rank) + trainer.endpoint = "%s" % (cur_node_endpoints[i]) + # global mapped ranks + trainer.rank = ranks_per_node[i] + + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def get_mapped_cluster_from_args(args, device_mode): + assert device_mode == DeviceMode.GPU, \ + "Only support get mapped cluster for gpu now." + gpus_num = fluid.core.get_cuda_device_count() + + # parse ip-ranks json file + json_data = None + with args.rank_mapping_file as json_file: + json_data = json.load(json_file) + + node_ips = [] + node_ranks_mapping = [] + ip_ranks_list = json_data['ip_ranks'] + for ip_ranks in ip_ranks_list: + node_ips.append(ip_ranks['ip']) + node_ranks_mapping.append(ip_ranks['ranks']) + + if len(node_ips) == 1: + node_ip = node_ips[0] + else: + if args.host: + node_ip = args.host + else: + _, node_ip = get_host_name_ip() + + assert node_ip in node_ips, \ + "Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips) + node_rank = node_ips.index(node_ip) + + assert len(node_ranks_mapping[node_rank]) <= gpus_num, \ + "number of ranks mapped to one node should not exceed the avaiable ones." + assert len(node_ranks_mapping) == len(node_ips), \ + "ranks length should be equal to ips length." + + logger.debug("parsed from args: node_ips:{} node_ip:{} " + "node_rank:{} node_ranks_mapping:{}".format( + node_ips, node_ip, node_rank, node_ranks_mapping[ + node_rank])) + + # NOTE: there are different number of global mapped ranks on each node. + free_ports = [] + trainer_endpoints = [] + for ip in node_ips: + node_rank = node_ips.index(ip) + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = int(os.environ.get('FLAGS_START_PORT')) + free_ports = [ + x + for x in range(start_port, start_port + len(node_ranks_mapping[ + node_rank])) + ] + else: + free_ports = find_free_ports(len(node_ranks_mapping[node_rank])) + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + + return get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, + node_ranks_mapping) + + class ParameterServerLauncher(object): def __init__(self, args, distribute_mode): self.args = args diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 34ba1d19b80..4edc675acc7 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -58,6 +58,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_rank_mapping) list(APPEND MIXED_DIST_TEST_OPS test_ascend_group) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) @@ -655,6 +656,7 @@ if(WITH_DISTRIBUTE) bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_rank_mapping START_BASH test_fleet_launch_rank_mapping.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) if(WITH_ASCEND OR WITH_ASCEND_CL) bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_ascend_group START_BASH test_ascend_group.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_rank_mapping.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_rank_mapping.sh new file mode 100755 index 00000000000..eb84f9f6e84 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_rank_mapping.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# use single node +echo "begin test" + +RANK_MAPPING_FILE_NAME="rank_mapping_file.json" +cat > ${RANK_MAPPING_FILE_NAME} <