未验证 提交 35166902 编写于 作者: K kuizhiqing 提交者: GitHub

launch suport ip port (#45052)

上级 27e3b06f
...@@ -51,24 +51,14 @@ class Context(object): ...@@ -51,24 +51,14 @@ class Context(object):
if self.args.legacy: if self.args.legacy:
return True return True
if self.args.master:
return False
if len(self.unknown_args) > 0: if len(self.unknown_args) > 0:
self.logger.warning("Compatible mode enable with args {}".format( self.logger.warning("Compatible mode enable with args {}".format(
self.unknown_args)) self.unknown_args))
return True return True
legacy_env_list = [
'FLAGS_START_PORT',
]
for env in legacy_env_list:
if env in self.envs:
self.logger.warning(
"ENV {} is deprecated, legacy launch enable".format(env))
return True
if self.args.master:
return False
return False return False
def get_envs(self): def get_envs(self):
......
...@@ -35,6 +35,8 @@ env_args_mapping = { ...@@ -35,6 +35,8 @@ env_args_mapping = {
'PADDLE_TRAINERS_ENDPOINTS': 'trainers', 'PADDLE_TRAINERS_ENDPOINTS': 'trainers',
'PADDLE_GLOO_PORT': 'gloo_port', 'PADDLE_GLOO_PORT': 'gloo_port',
'PADDLE_WITH_GLOO': 'with_gloo', 'PADDLE_WITH_GLOO': 'with_gloo',
'PADDLE_START_PORT': 'start_port',
'PADDLE_IPS': 'ips',
} }
...@@ -105,6 +107,16 @@ def parse_args(): ...@@ -105,6 +107,16 @@ def parse_args():
base_group.add_argument("--host", type=str, default=None, help="host ip") base_group.add_argument("--host", type=str, default=None, help="host ip")
base_group.add_argument("--ips",
type=str,
default=None,
help="nodes ips, e.g. 10.10.1.1,10.10.1.2")
base_group.add_argument("--start_port",
type=int,
default=6070,
help="fix port start with")
base_group.add_argument("training_script", base_group.add_argument("training_script",
type=str, type=str,
help="the full path of py script," help="the full path of py script,"
......
...@@ -34,6 +34,62 @@ class CollectiveController(Controller): ...@@ -34,6 +34,62 @@ class CollectiveController(Controller):
return False return False
def build_pod(self): def build_pod(self):
if self.ctx.args.master is None and self.ctx.args.start_port and self.ctx.args.ips:
self._build_pod_with_args()
else:
self._build_pod_with_master()
def _build_pod_with_args(self):
self.pod.replicas = self.pod_replicas()
start_port = int(self.ctx.args.start_port)
ips = self.ctx.args.ips.split(',')
job_endpoints = [
f"{h}:{p+start_port}" for h in ips for p in range(self.pod.replicas)
]
self.ctx.logger.debug("job endpoints: {}".format(job_endpoints))
rank_offset = ips.index(
self.ctx.node.ip) if self.ctx.node.ip in ips else 0
self.save_pod_log(job_endpoints)
selected_dev_key = self.ctx.node.device.get_selected_device_key()
selected_dev_list = self.ctx.node.device.get_selected_devices(
self.ctx.args.devices)
for i in range(self.pod.replicas):
e = {
"PADDLE_GLOBAL_SIZE": "{}".format(len(job_endpoints)),
"PADDLE_LOCAL_SIZE": "{}".format(self.pod.replicas),
"PADDLE_GLOBAL_RANK": "{}".format(i + rank_offset),
"PADDLE_LOCAL_RANK": "{}".format(i),
"PADDLE_NNODES": "{}".format(self.job.replicas),
## compatible env
"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints),
"PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset],
"PADDLE_TRAINER_ID": "{}".format(i + rank_offset),
"PADDLE_TRAINERS_NUM": "{}".format(len(job_endpoints)),
"PADDLE_RANK_IN_NODE": str(i),
}
if len(selected_dev_list) > 0:
if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE:
e.update(self.ctx.node.device.get_custom_device_envs())
if self.pod.replicas == 1:
e.update({selected_dev_key: ",".join(selected_dev_list)})
else:
e.update({selected_dev_key: selected_dev_list[i]})
else:
e.update({'PADDLE_DISTRI_BACKEND': 'gloo'})
log_file = f"workerlog.{i}"
self.add_container(envs=e, log_file=log_file)
return True
def _build_pod_with_master(self):
self.pod.replicas = self.pod_replicas() self.pod.replicas = self.pod_replicas()
# rank will be reset when restart # rank will be reset when restart
......
...@@ -37,6 +37,11 @@ def process_args(ctx): ...@@ -37,6 +37,11 @@ def process_args(ctx):
f'Device not found {d} from {argdev} for setting {ctx.node.device.labels}' f'Device not found {d} from {argdev} for setting {ctx.node.device.labels}'
) )
if ctx.args.ips:
ips = ctx.args.ips.split(',')
if '127.0.0.1' in ips and len(ips) != 1:
raise "127.0.0.1 in ips is not allowed in multi-nodes."
def collective_compatible(ctx): def collective_compatible(ctx):
if 'PADDLE_TRAINER_ENDPOINTS' in ctx.envs: if 'PADDLE_TRAINER_ENDPOINTS' in ctx.envs:
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
set -e set -e
export FLAGS_START_PORT=35789 export PADDLE_START_PORT=35789
export MLU_VISIBLE_DEVICES=0,1 export MLU_VISIBLE_DEVICES=0,1
...@@ -23,7 +23,7 @@ function test_nproc_0(){ ...@@ -23,7 +23,7 @@ function test_nproc_0(){
mlus=$1 mlus=$1
file_0="fleet_nproc_0.check_0.log" file_0="fleet_nproc_0.check_0.log"
rm -f ${file_0} rm -f ${file_0}
distributed_args="--log_dir=testlog --nproc_per_node=1" distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0 python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0
str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0" str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0"
...@@ -44,7 +44,7 @@ function test_nproc_1(){ ...@@ -44,7 +44,7 @@ function test_nproc_1(){
file_1="fleet_nproc_1.check_1.log" file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1} rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2" distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1 python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1
str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
set -e set -e
export FLAGS_START_PORT=35789 export PADDLE_START_PORT=35789
#local_ip=`ip route get 1 | awk '{print $NF;exit}'` #local_ip=`ip route get 1 | awk '{print $NF;exit}'`
file_0="fleet_nproc_0.check_0.log" file_0="fleet_nproc_0.check_0.log"
...@@ -23,7 +23,7 @@ file_0="fleet_nproc_0.check_0.log" ...@@ -23,7 +23,7 @@ file_0="fleet_nproc_0.check_0.log"
function test_nproc_0(){ function test_nproc_0(){
gpus=$1 gpus=$1
rm -f ${file_0} rm -f ${file_0}
distributed_args="--log_dir=testlog --nproc_per_node=1" distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1"
# nproc_per_node=1, each with 2 gpus # nproc_per_node=1, each with 2 gpus
python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_0 python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_0
...@@ -62,7 +62,7 @@ function test_nproc_1_gpu(){ ...@@ -62,7 +62,7 @@ function test_nproc_1_gpu(){
file_1="fleet_nproc_1.check_1.log" file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1} rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2" distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1
str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
...@@ -94,7 +94,7 @@ function test_nproc_1_cpu(){ ...@@ -94,7 +94,7 @@ function test_nproc_1_cpu(){
file_1="fleet_nproc_1.check_1.log" file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1} rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2" distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1
str0="selected_devices: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" str0="selected_devices: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
...@@ -127,7 +127,7 @@ function test_nproc_1_xpu(){ ...@@ -127,7 +127,7 @@ function test_nproc_1_xpu(){
file_1="fleet_nproc_1.check_1.log" file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1} rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2" distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1
str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" str0="selected_devices:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册