From 3516690287af487b481bd6b46ef8a841fb0758c6 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 11 Aug 2022 14:41:31 +0800 Subject: [PATCH] launch suport ip port (#45052) --- .../distributed/launch/context/__init__.py | 16 +----- .../distributed/launch/context/args_envs.py | 12 ++++ .../launch/controllers/collective.py | 56 +++++++++++++++++++ .../distributed/launch/plugins/__init__.py | 5 ++ .../unittests/mlu/test_launch_nproc_mlu.sh | 6 +- .../unittests/test_fleet_launch_nproc.sh | 10 ++-- 6 files changed, 84 insertions(+), 21 deletions(-) diff --git a/python/paddle/distributed/launch/context/__init__.py b/python/paddle/distributed/launch/context/__init__.py index 921f653b48a..b067973d374 100644 --- a/python/paddle/distributed/launch/context/__init__.py +++ b/python/paddle/distributed/launch/context/__init__.py @@ -51,24 +51,14 @@ class Context(object): if self.args.legacy: return True + if self.args.master: + return False + if len(self.unknown_args) > 0: self.logger.warning("Compatible mode enable with args {}".format( self.unknown_args)) return True - legacy_env_list = [ - 'FLAGS_START_PORT', - ] - - for env in legacy_env_list: - if env in self.envs: - self.logger.warning( - "ENV {} is deprecated, legacy launch enable".format(env)) - return True - - if self.args.master: - return False - return False def get_envs(self): diff --git a/python/paddle/distributed/launch/context/args_envs.py b/python/paddle/distributed/launch/context/args_envs.py index b44065c6700..104ad1b789f 100644 --- a/python/paddle/distributed/launch/context/args_envs.py +++ b/python/paddle/distributed/launch/context/args_envs.py @@ -35,6 +35,8 @@ env_args_mapping = { 'PADDLE_TRAINERS_ENDPOINTS': 'trainers', 'PADDLE_GLOO_PORT': 'gloo_port', 'PADDLE_WITH_GLOO': 'with_gloo', + 'PADDLE_START_PORT': 'start_port', + 'PADDLE_IPS': 'ips', } @@ -105,6 +107,16 @@ def parse_args(): base_group.add_argument("--host", type=str, default=None, help="host ip") + base_group.add_argument("--ips", + type=str, + default=None, + help="nodes ips, e.g. 10.10.1.1,10.10.1.2") + + base_group.add_argument("--start_port", + type=int, + default=6070, + help="fix port start with") + base_group.add_argument("training_script", type=str, help="the full path of py script," diff --git a/python/paddle/distributed/launch/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py index 873cfe09ac8..e155d31b459 100644 --- a/python/paddle/distributed/launch/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -34,6 +34,62 @@ class CollectiveController(Controller): return False def build_pod(self): + if self.ctx.args.master is None and self.ctx.args.start_port and self.ctx.args.ips: + self._build_pod_with_args() + else: + self._build_pod_with_master() + + def _build_pod_with_args(self): + self.pod.replicas = self.pod_replicas() + + start_port = int(self.ctx.args.start_port) + ips = self.ctx.args.ips.split(',') + + job_endpoints = [ + f"{h}:{p+start_port}" for h in ips for p in range(self.pod.replicas) + ] + + self.ctx.logger.debug("job endpoints: {}".format(job_endpoints)) + + rank_offset = ips.index( + self.ctx.node.ip) if self.ctx.node.ip in ips else 0 + + self.save_pod_log(job_endpoints) + + selected_dev_key = self.ctx.node.device.get_selected_device_key() + selected_dev_list = self.ctx.node.device.get_selected_devices( + self.ctx.args.devices) + + for i in range(self.pod.replicas): + e = { + "PADDLE_GLOBAL_SIZE": "{}".format(len(job_endpoints)), + "PADDLE_LOCAL_SIZE": "{}".format(self.pod.replicas), + "PADDLE_GLOBAL_RANK": "{}".format(i + rank_offset), + "PADDLE_LOCAL_RANK": "{}".format(i), + "PADDLE_NNODES": "{}".format(self.job.replicas), + ## compatible env + "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), + "PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset], + "PADDLE_TRAINER_ID": "{}".format(i + rank_offset), + "PADDLE_TRAINERS_NUM": "{}".format(len(job_endpoints)), + "PADDLE_RANK_IN_NODE": str(i), + } + if len(selected_dev_list) > 0: + if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE: + e.update(self.ctx.node.device.get_custom_device_envs()) + if self.pod.replicas == 1: + e.update({selected_dev_key: ",".join(selected_dev_list)}) + else: + e.update({selected_dev_key: selected_dev_list[i]}) + else: + e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) + + log_file = f"workerlog.{i}" + self.add_container(envs=e, log_file=log_file) + + return True + + def _build_pod_with_master(self): self.pod.replicas = self.pod_replicas() # rank will be reset when restart diff --git a/python/paddle/distributed/launch/plugins/__init__.py b/python/paddle/distributed/launch/plugins/__init__.py index 946768db32c..c82fa55de43 100644 --- a/python/paddle/distributed/launch/plugins/__init__.py +++ b/python/paddle/distributed/launch/plugins/__init__.py @@ -37,6 +37,11 @@ def process_args(ctx): f'Device not found {d} from {argdev} for setting {ctx.node.device.labels}' ) + if ctx.args.ips: + ips = ctx.args.ips.split(',') + if '127.0.0.1' in ips and len(ips) != 1: + raise "127.0.0.1 in ips is not allowed in multi-nodes." + def collective_compatible(ctx): if 'PADDLE_TRAINER_ENDPOINTS' in ctx.envs: diff --git a/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh b/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh index 722590dc87f..8d8ad713ecc 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh +++ b/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh @@ -15,7 +15,7 @@ # limitations under the License. set -e -export FLAGS_START_PORT=35789 +export PADDLE_START_PORT=35789 export MLU_VISIBLE_DEVICES=0,1 @@ -23,7 +23,7 @@ function test_nproc_0(){ mlus=$1 file_0="fleet_nproc_0.check_0.log" rm -f ${file_0} - distributed_args="--log_dir=testlog --nproc_per_node=1" + distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0 str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -44,7 +44,7 @@ function test_nproc_1(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1 str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh index 89f696dee47..63fce18683c 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh @@ -15,7 +15,7 @@ # limitations under the License. set -e -export FLAGS_START_PORT=35789 +export PADDLE_START_PORT=35789 #local_ip=`ip route get 1 | awk '{print $NF;exit}'` file_0="fleet_nproc_0.check_0.log" @@ -23,7 +23,7 @@ file_0="fleet_nproc_0.check_0.log" function test_nproc_0(){ gpus=$1 rm -f ${file_0} - distributed_args="--log_dir=testlog --nproc_per_node=1" + distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1" # nproc_per_node=1, each with 2 gpus python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_0 @@ -62,7 +62,7 @@ function test_nproc_1_gpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -94,7 +94,7 @@ function test_nproc_1_cpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -127,7 +127,7 @@ function test_nproc_1_xpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" -- GitLab