diff --git a/python/paddle/distributed/launch/context/__init__.py b/python/paddle/distributed/launch/context/__init__.py index 921f653b48a6da088070ef121bad5342f0664eeb..b067973d3749d726bc11929a8f4453bbca70c9fa 100644 --- a/python/paddle/distributed/launch/context/__init__.py +++ b/python/paddle/distributed/launch/context/__init__.py @@ -51,24 +51,14 @@ class Context(object): if self.args.legacy: return True + if self.args.master: + return False + if len(self.unknown_args) > 0: self.logger.warning("Compatible mode enable with args {}".format( self.unknown_args)) return True - legacy_env_list = [ - 'FLAGS_START_PORT', - ] - - for env in legacy_env_list: - if env in self.envs: - self.logger.warning( - "ENV {} is deprecated, legacy launch enable".format(env)) - return True - - if self.args.master: - return False - return False def get_envs(self): diff --git a/python/paddle/distributed/launch/context/args_envs.py b/python/paddle/distributed/launch/context/args_envs.py index b44065c670005cb1a9fdbc038345d58cb6f8863d..104ad1b789f54b2959b34acfe5bef5be91f52944 100644 --- a/python/paddle/distributed/launch/context/args_envs.py +++ b/python/paddle/distributed/launch/context/args_envs.py @@ -35,6 +35,8 @@ env_args_mapping = { 'PADDLE_TRAINERS_ENDPOINTS': 'trainers', 'PADDLE_GLOO_PORT': 'gloo_port', 'PADDLE_WITH_GLOO': 'with_gloo', + 'PADDLE_START_PORT': 'start_port', + 'PADDLE_IPS': 'ips', } @@ -105,6 +107,16 @@ def parse_args(): base_group.add_argument("--host", type=str, default=None, help="host ip") + base_group.add_argument("--ips", + type=str, + default=None, + help="nodes ips, e.g. 10.10.1.1,10.10.1.2") + + base_group.add_argument("--start_port", + type=int, + default=6070, + help="fix port start with") + base_group.add_argument("training_script", type=str, help="the full path of py script," diff --git a/python/paddle/distributed/launch/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py index 873cfe09ac8b89ed486f2104607ad82675ebbe7b..e155d31b459ff51a610a56d572c64d63726dadcd 100644 --- a/python/paddle/distributed/launch/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -34,6 +34,62 @@ class CollectiveController(Controller): return False def build_pod(self): + if self.ctx.args.master is None and self.ctx.args.start_port and self.ctx.args.ips: + self._build_pod_with_args() + else: + self._build_pod_with_master() + + def _build_pod_with_args(self): + self.pod.replicas = self.pod_replicas() + + start_port = int(self.ctx.args.start_port) + ips = self.ctx.args.ips.split(',') + + job_endpoints = [ + f"{h}:{p+start_port}" for h in ips for p in range(self.pod.replicas) + ] + + self.ctx.logger.debug("job endpoints: {}".format(job_endpoints)) + + rank_offset = ips.index( + self.ctx.node.ip) if self.ctx.node.ip in ips else 0 + + self.save_pod_log(job_endpoints) + + selected_dev_key = self.ctx.node.device.get_selected_device_key() + selected_dev_list = self.ctx.node.device.get_selected_devices( + self.ctx.args.devices) + + for i in range(self.pod.replicas): + e = { + "PADDLE_GLOBAL_SIZE": "{}".format(len(job_endpoints)), + "PADDLE_LOCAL_SIZE": "{}".format(self.pod.replicas), + "PADDLE_GLOBAL_RANK": "{}".format(i + rank_offset), + "PADDLE_LOCAL_RANK": "{}".format(i), + "PADDLE_NNODES": "{}".format(self.job.replicas), + ## compatible env + "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), + "PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset], + "PADDLE_TRAINER_ID": "{}".format(i + rank_offset), + "PADDLE_TRAINERS_NUM": "{}".format(len(job_endpoints)), + "PADDLE_RANK_IN_NODE": str(i), + } + if len(selected_dev_list) > 0: + if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE: + e.update(self.ctx.node.device.get_custom_device_envs()) + if self.pod.replicas == 1: + e.update({selected_dev_key: ",".join(selected_dev_list)}) + else: + e.update({selected_dev_key: selected_dev_list[i]}) + else: + e.update({'PADDLE_DISTRI_BACKEND': 'gloo'}) + + log_file = f"workerlog.{i}" + self.add_container(envs=e, log_file=log_file) + + return True + + def _build_pod_with_master(self): self.pod.replicas = self.pod_replicas() # rank will be reset when restart diff --git a/python/paddle/distributed/launch/plugins/__init__.py b/python/paddle/distributed/launch/plugins/__init__.py index 946768db32ca92f5d39bfc4627dc7d17307031c4..c82fa55de430a7849ae5408a10758af7f579ad52 100644 --- a/python/paddle/distributed/launch/plugins/__init__.py +++ b/python/paddle/distributed/launch/plugins/__init__.py @@ -37,6 +37,11 @@ def process_args(ctx): f'Device not found {d} from {argdev} for setting {ctx.node.device.labels}' ) + if ctx.args.ips: + ips = ctx.args.ips.split(',') + if '127.0.0.1' in ips and len(ips) != 1: + raise "127.0.0.1 in ips is not allowed in multi-nodes." + def collective_compatible(ctx): if 'PADDLE_TRAINER_ENDPOINTS' in ctx.envs: diff --git a/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh b/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh index 722590dc87f09f67823e3eb9d95b69c9a0d29c6c..8d8ad713eccfbfec94cbf0f23de3254270060546 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh +++ b/python/paddle/fluid/tests/unittests/mlu/test_launch_nproc_mlu.sh @@ -15,7 +15,7 @@ # limitations under the License. set -e -export FLAGS_START_PORT=35789 +export PADDLE_START_PORT=35789 export MLU_VISIBLE_DEVICES=0,1 @@ -23,7 +23,7 @@ function test_nproc_0(){ mlus=$1 file_0="fleet_nproc_0.check_0.log" rm -f ${file_0} - distributed_args="--log_dir=testlog --nproc_per_node=1" + distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0 str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -44,7 +44,7 @@ function test_nproc_1(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1 str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh index 89f696dee471a0889f447d5c633f50e622e8c52c..63fce18683c04341e01d792d0eab0d44824d952a 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh @@ -15,7 +15,7 @@ # limitations under the License. set -e -export FLAGS_START_PORT=35789 +export PADDLE_START_PORT=35789 #local_ip=`ip route get 1 | awk '{print $NF;exit}'` file_0="fleet_nproc_0.check_0.log" @@ -23,7 +23,7 @@ file_0="fleet_nproc_0.check_0.log" function test_nproc_0(){ gpus=$1 rm -f ${file_0} - distributed_args="--log_dir=testlog --nproc_per_node=1" + distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1" # nproc_per_node=1, each with 2 gpus python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_0 @@ -62,7 +62,7 @@ function test_nproc_1_gpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -94,7 +94,7 @@ function test_nproc_1_cpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" @@ -127,7 +127,7 @@ function test_nproc_1_xpu(){ file_1="fleet_nproc_1.check_1.log" rm -f ${file_0} ${file_1} - distributed_args="--log_dir=testlog --nproc_per_node=2" + distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1" python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"