未验证 提交 1f76a2f7 编写于 作者: K kuizhiqing 提交者: GitHub

Elastic as module (#34572)

上级 91be8769
......@@ -37,6 +37,9 @@ class Command(object):
return True
return False
def clean(self):
self.etcd.delete_prefix(self.prefix)
def close(self):
self.etcd.close()
......@@ -53,13 +56,6 @@ if __name__ == '__main__':
args = parser.parse_args()
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
# compatible with kuberntes service discovery
if not server and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_PORT'):
server = '{}:{}'.format(
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'),
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT'))
name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0))
......@@ -69,6 +65,9 @@ if __name__ == '__main__':
if args.action == "scale":
cmd.scale_np(np)
if args.action == "clean":
cmd.clean()
print("action {} done".format(args.action))
cmd.close()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import signal
import os, sys
from .manager import ElasticManager
from .manager import ElasticStatus
from .manager import ELASTIC_EXIT_CODE
from .collective import CollectiveLauncher
from paddle.distributed.fleet.launch_utils import DistributeMode
def enable_elastic(args, distribute_mode):
if distribute_mode != DistributeMode.COLLECTIVE:
return False
if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'):
return False
if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'):
return False
if not args.np and not int(os.getenv('PADDLE_ELASTIC_NP', 0)):
return False
return True
def launch_elastic(args, distribute_mode):
elastic = ElasticManager(args)
signal.signal(signal.SIGTERM, elastic.signal_handler)
signal.signal(signal.SIGABRT, elastic.signal_handler)
signal.signal(signal.SIGINT, elastic.signal_handler)
while True:
# wait for all nodes ready to run
elastic.wait()
# run self with specified launcher
elastic.run(CollectiveLauncher)
# keep wathing the health status of self and being notified for other's failure
ret = elastic.watch()
if ret == ElasticStatus.COMPLETED:
break
if ret == ElasticStatus.HOLD:
continue
if ret == ElasticStatus.EXIT:
break
if ret == ElasticStatus.ERROR:
sys.exit(3)
if ret == ElasticStatus.RESTART:
sys.exit(ELASTIC_EXIT_CODE)
if int(elastic.sigint) > 0:
sys.exit(128 + int(elastic.sigint))
else:
sys.exit(0)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.distributed.fleet import launch_utils
import paddle.distributed.fleet.cloud_utils as cloud_utils
import paddle.distributed.fleet.ascend_utils as ascend_utils
from paddle.distributed.fleet.launch_utils import *
from paddle.distributed.fleet.elastic.manager import LauncherInterface
class CollectiveLauncher(LauncherInterface):
def __init__(self, args):
self.args = args
self.procs = []
def launch(self):
logger.info("collective lauchner launch ...")
args = self.args
# parse arguments, used for cloud-single-machine and local
(device_mode,
devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".
format(trainers_num, device_mode, devices_per_proc))
cluster = None
pod = None
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = paddle.distributed.fleet.launch.get_cluster_from_args(
args, device_mode, devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(
os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir
self.procs = start_local_trainers(
cluster,
pod,
training_script=args.training_script,
training_script_args=args.training_script_args,
log_dir=args.log_dir,
envs=global_envs)
for idx, proc in enumerate(self.procs):
logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx))
def stop(self):
logger.info("collective lauchner stop ...")
if not self._terminate_procs():
logger.error("kill process failed")
if os.path.exists(self.gloo_rendezvous_dir):
shutil.rmtree(self.gloo_rendezvous_dir)
def watch(self):
logger.debug("collective lauchner watch ...")
for p in self.procs:
if p.log_fn and p.local_rank == 0:
pull_worker_log(p)
ret = self._check_procs()
return ret
......@@ -69,17 +69,13 @@ from argparse import ArgumentParser, REMAINDER
import paddle
import paddle.fluid as fluid
from paddle.distributed.fleet import launch_utils
import signal
# TODO(danleifeng): Don't import * from a module
from paddle.distributed.fleet.launch_utils import *
import paddle.distributed.fleet.cloud_utils as cloud_utils
import paddle.distributed.fleet.ascend_utils as ascend_utils
from paddle.distributed.fleet.elastic import ElasticManager
from paddle.distributed.fleet.elastic import LauncherInterface
from paddle.distributed.fleet.elastic import ElasticStatus
from paddle.distributed.fleet.elastic import ELASTIC_EXIT_CODE
from paddle.distributed.fleet.elastic import enable_elastic, launch_elastic
__all__ = []
......@@ -235,76 +231,65 @@ def get_cluster_from_args(args, device_mode, devices_per_proc):
devices_per_proc)
class CollectiveLauncher(LauncherInterface):
def __init__(self, args):
self.args = args
self.procs = []
def launch_collective(args):
# parse arguments, used for cloud-single-machine and local
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format(
trainers_num, device_mode, devices_per_proc))
cluster = None
pod = None
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
procs = start_local_trainers(
cluster,
pod,
training_script=args.training_script,
training_script_args=args.training_script_args,
log_dir=args.log_dir,
envs=global_envs)
for idx, proc in enumerate(procs):
print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx))
def launch(self):
logger.info("collective lauchner launch ...")
args = self.args
# parse arguments, used for cloud-single-machine and local
(device_mode,
devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".
format(trainers_num, device_mode, devices_per_proc))
while True:
alive = watch_local_trainers(procs, cluster.trainers_nranks())
cluster = None
pod = None
if not alive:
logger.info("Local processes completed.")
logger.debug("POD info:{}".format(pod))
break
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(
os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir
self.procs = start_local_trainers(
cluster,
pod,
training_script=args.training_script,
training_script_args=args.training_script_args,
log_dir=args.log_dir,
envs=global_envs)
for idx, proc in enumerate(self.procs):
logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx))
def stop(self):
logger.info("collective lauchner stop ...")
if not self._terminate_procs():
logger.error("kill process failed")
if os.path.exists(self.gloo_rendezvous_dir):
shutil.rmtree(self.gloo_rendezvous_dir)
def watch(self):
logger.debug("collective lauchner watch ...")
for p in self.procs:
if p.log_fn and p.local_rank == 0:
pull_worker_log(p)
ret = self._check_procs()
return ret
time.sleep(3)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch_ps(args, distribute_mode):
......@@ -399,42 +384,15 @@ def launch():
_print_arguments(args)
distribute_mode = which_distributed_mode(args)
# TODO(kuizhiqing) support ps later
if not distribute_mode == DistributeMode.COLLECTIVE:
launch_ps(args, distribute_mode)
return
elastic = ElasticManager(args)
signal.signal(signal.SIGTERM, elastic.signal_handler)
signal.signal(signal.SIGABRT, elastic.signal_handler)
signal.signal(signal.SIGINT, elastic.signal_handler)
while True:
# wait for all nodes ready to run
elastic.wait()
# run self with specified launcher
elastic.run(CollectiveLauncher)
# keep wathing the health status of self and being notified for other's failure
ret = elastic.watch()
if ret == ElasticStatus.COMPLETED:
break
if ret == ElasticStatus.HOLD:
continue
if ret == ElasticStatus.EXIT:
break
if ret == ElasticStatus.ERROR:
sys.exit(3)
if ret == ElasticStatus.RESTART:
sys.exit(ELASTIC_EXIT_CODE)
if enable_elastic(args, distribute_mode):
launch_elastic(args, distribute_mode)
return
if int(elastic.sigint) > 0:
sys.exit(128 + int(elastic.sigint))
if distribute_mode == DistributeMode.COLLECTIVE:
launch_collective(args)
else:
sys.exit(0)
launch_ps(args, distribute_mode)
if __name__ == "__main__":
......
......@@ -17,6 +17,15 @@ echo "begin test elastic"
unset GREP_OPTIONS
rm -rf log
pids=`ps -ef | grep "python -m paddle.distributed.launch elastic_demo.[py]" | awk '{print $2}'`
if [ -n "$pids" ]; then
echo $pids | xargs kill -9
fi
pids=`ps -ef | grep "/usr/bin/python -u elastic_demo.[py]" | awk '{print $2}'`
if [ -n "$pids" ]; then
echo $pids | xargs kill -9
fi
python -m pip install --no-cache-dir etcd3 -i https://mirror.baidu.com/pypi/simple
# common env
......@@ -115,6 +124,8 @@ do
fi
done
> $lw0
# rerun node 1
export NVIDIA_VISIBLE_DEVICES=1
export CUDA_VISIBLE_DEVICES=1
......@@ -144,5 +155,54 @@ done
check_env
> log_0.log
for i in {1..10}
do
## kill with -9
kill -9 $p0
sleep 1
if [ `ps -p $p0 | wc -l` == "2" ]; then
echo "force stop node 0 error"
exit -1
else
echo "force stop node 0 ok"
break
fi
done
> $lw0
# rerun node 0
export NVIDIA_VISIBLE_DEVICES=0
export CUDA_VISIBLE_DEVICES=0
export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.10:8001,10.10.10.3:8001
export PADDLE_TRAINERS=10.10.10.10,10.10.10.3
export TRAINER_PORTS_NUM=1
export POD_IP=10.10.10.10
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=2
python -m paddle.distributed.launch elastic_demo.py &> log_0.log &
p0=$!
for i in {1..10}
do
if grep "INFO:ELASTIC:ready with hosts" log_1.log | grep -q '10.10.10.10'; then
echo "rerun node 0 ok"
break
else
sleep 1
fi
if [ $i -eq 10 ]; then
echo "rerun node 0 error"
exit -1
fi
done
check_env
echo "All check done"
sleep 3
kill $p0 $p1
......@@ -149,6 +149,7 @@ packages=['paddle',
'paddle.incubate.operators',
'paddle.distributed.fleet',
'paddle.distributed.fleet.base',
'paddle.distributed.fleet.elastic',
'paddle.distributed.fleet.meta_optimizers',
'paddle.distributed.fleet.meta_optimizers.sharding',
'paddle.distributed.fleet.meta_optimizers.ascend',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册