未验证 提交 7de3f81c 编写于 作者: B Bo Liu 提交者: GitHub

Add lazy distributed launch with rank mapping (#36570)

上级 ff3018d7
......@@ -65,6 +65,7 @@ import os
import time
import six
import copy
import argparse
from argparse import ArgumentParser, REMAINDER
import paddle
import paddle.fluid as fluid
......@@ -162,6 +163,31 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
type=str,
default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
collective_group.add_argument(
"--rank_mapping_file",
type=argparse.FileType('r'),
default=sys.stdin,
help="This rank mapping information in json format is used specifically "
"for lazy launch for auto parallel. Some of the ranks in each node "
"may not be used, and the indices of rank should be kept the same "
"as the indices of sub-task splited by auto parallel. "
" { "
" \"ip_ranks\": [ "
" { "
" \"ip\": \"127.0.0.1\", "
" \"ranks\": [0,1] "
" }, "
" { "
" \"ip\": \"127.0.0.2\", "
" \"ranks\": [2,3,4] "
" } "
" ] "
" } ")
collective_group.add_argument(
"--enable_auto_mapping",
type=bool,
default=False,
help="Set true to enable the lazy launch for auto-parallel scenario.")
ps_group = parser.add_argument_group("Parameter-Server Parameters")
# for parameter server
......@@ -261,21 +287,25 @@ def launch_collective(args):
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
# lazy launch for auto-parallel
if args.enable_auto_mapping == True:
cluster, pod = get_mapped_cluster_from_args(args, device_mode)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
# for ascend
if device_mode == DeviceMode.ASCEND_NPU:
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
elif cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
......
......@@ -27,6 +27,7 @@ import socket
import warnings
import six
import struct
import json
import paddle
import paddle.fluid as fluid
......@@ -527,8 +528,9 @@ def start_local_trainers(cluster,
pretty_print_envs(proc_env, ("Distributed Envs",
"Value"))))
logger.info(
"details abouts PADDLE_TRAINER_ENDPOINTS can be found in {}/endpoints.log, and detail running logs maybe found in {}/workerlog.0".
format(log_dir, log_dir))
"details about PADDLE_TRAINER_ENDPOINTS can be found in "
"{}/endpoints.log, and detail running logs maybe found in "
"{}/workerlog.0".format(log_dir, log_dir))
fn = None
pre_fn = None if os.name == 'nt' else os.setsid
if log_dir is not None:
......@@ -805,6 +807,97 @@ def cloud_ps_heter_env_set(args):
pretty_print_envs(environs)))
def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
node_mapping_ranks):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now."
cluster = Cluster(hdfs=None)
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
pod.device_mode = device_mode
cur_node_endpoints = trainer_endpoints[node_rank]
# choose rank from global mapped ranks and set it to the trainer.
ranks_per_node = node_mapping_ranks[node_rank]
for i in range(len(ranks_per_node)):
trainer = Trainer()
# change global rank(mapped) to local rank within each node.
# e.g. mapped ranks of node: 3,4,7 -> 0,1,2
local_rank = ranks_per_node.index(ranks_per_node[i])
trainer.accelerators.append(local_rank)
trainer.endpoint = "%s" % (cur_node_endpoints[i])
# global mapped ranks
trainer.rank = ranks_per_node[i]
pod.trainers.append(trainer)
cluster.pods.append(pod)
pod_rank = node_ips.index(node_ip)
return cluster, cluster.pods[pod_rank]
def get_mapped_cluster_from_args(args, device_mode):
assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now."
gpus_num = fluid.core.get_cuda_device_count()
# parse ip-ranks json file
json_data = None
with args.rank_mapping_file as json_file:
json_data = json.load(json_file)
node_ips = []
node_ranks_mapping = []
ip_ranks_list = json_data['ip_ranks']
for ip_ranks in ip_ranks_list:
node_ips.append(ip_ranks['ip'])
node_ranks_mapping.append(ip_ranks['ranks'])
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
if args.host:
node_ip = args.host
else:
_, node_ip = get_host_name_ip()
assert node_ip in node_ips, \
"Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips)
node_rank = node_ips.index(node_ip)
assert len(node_ranks_mapping[node_rank]) <= gpus_num, \
"number of ranks mapped to one node should not exceed the avaiable ones."
assert len(node_ranks_mapping) == len(node_ips), \
"ranks length should be equal to ips length."
logger.debug("parsed from args: node_ips:{} node_ip:{} "
"node_rank:{} node_ranks_mapping:{}".format(
node_ips, node_ip, node_rank, node_ranks_mapping[
node_rank]))
# NOTE: there are different number of global mapped ranks on each node.
free_ports = []
trainer_endpoints = []
for ip in node_ips:
node_rank = node_ips.index(ip)
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = int(os.environ.get('FLAGS_START_PORT'))
free_ports = [
x
for x in range(start_port, start_port + len(node_ranks_mapping[
node_rank]))
]
else:
free_ports = find_free_ports(len(node_ranks_mapping[node_rank]))
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
node_ranks_mapping)
class ParameterServerLauncher(object):
def __init__(self, args, distribute_mode):
self.args = args
......
......@@ -58,6 +58,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_rank_mapping)
list(APPEND MIXED_DIST_TEST_OPS test_ascend_group)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
......@@ -655,6 +656,7 @@ if(WITH_DISTRIBUTE)
bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_rank_mapping START_BASH test_fleet_launch_rank_mapping.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
if(WITH_ASCEND OR WITH_ASCEND_CL)
bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_ascend_group START_BASH test_ascend_group.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
......
#!/bin/bash
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# use single node
echo "begin test"
RANK_MAPPING_FILE_NAME="rank_mapping_file.json"
cat > ${RANK_MAPPING_FILE_NAME} <<EOF
{
"ip_ranks": [
{
"ip": "127.0.0.1",
"ranks": [0,1]
}
]
}
EOF
export FLAGS_START_PORT=35789
distributed_args="--rank_mapping_file ${RANK_MAPPING_FILE_NAME} --enable_auto_mapping true --log_dir=testlog"
CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process.py fleetlaunchcloud_rank_mapping
str1="selected_gpus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
str2="selected_gpus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35790 trainer_id:1"
file_0="multi_process_fleetlaunchcloud_rank_mapping.check_0.log"
file_1="multi_process_fleetlaunchcloud_rank_mapping.check_1.log"
echo "paddlecloud params test"
if grep -q "$str1" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if grep -q "$str2" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
# test async poll process
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册