diff --git a/python/paddle/distributed/elastic.py b/python/paddle/distributed/elastic.py index 3e4fea5e6f34d7eb7c8eb4ea8fe40a4939a81f68..e6f21f6603d8dad6c802663e9ea6940d7eb0826e 100644 --- a/python/paddle/distributed/elastic.py +++ b/python/paddle/distributed/elastic.py @@ -37,6 +37,9 @@ class Command(object): return True return False + def clean(self): + self.etcd.delete_prefix(self.prefix) + def close(self): self.etcd.close() @@ -53,13 +56,6 @@ if __name__ == '__main__': args = parser.parse_args() server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') - # compatible with kuberntes service discovery - if not server and os.getenv( - 'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv( - 'PADDLE_ELASTIC_ETCD_SERVICE_PORT'): - server = '{}:{}'.format( - os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'), - os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')) name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) @@ -69,6 +65,9 @@ if __name__ == '__main__': if args.action == "scale": cmd.scale_np(np) + if args.action == "clean": + cmd.clean() + print("action {} done".format(args.action)) cmd.close() diff --git a/python/paddle/distributed/fleet/elastic/__init__.py b/python/paddle/distributed/fleet/elastic/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1ac81729d5430a5b5174e6b07bda1ca0d0f5a971 --- /dev/null +++ b/python/paddle/distributed/fleet/elastic/__init__.py @@ -0,0 +1,74 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import signal +import os, sys + +from .manager import ElasticManager +from .manager import ElasticStatus +from .manager import ELASTIC_EXIT_CODE +from .collective import CollectiveLauncher + +from paddle.distributed.fleet.launch_utils import DistributeMode + + +def enable_elastic(args, distribute_mode): + if distribute_mode != DistributeMode.COLLECTIVE: + return False + + if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'): + return False + + if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'): + return False + + if not args.np and not int(os.getenv('PADDLE_ELASTIC_NP', 0)): + return False + + return True + + +def launch_elastic(args, distribute_mode): + + elastic = ElasticManager(args) + + signal.signal(signal.SIGTERM, elastic.signal_handler) + signal.signal(signal.SIGABRT, elastic.signal_handler) + signal.signal(signal.SIGINT, elastic.signal_handler) + + while True: + + # wait for all nodes ready to run + elastic.wait() + + # run self with specified launcher + elastic.run(CollectiveLauncher) + + # keep wathing the health status of self and being notified for other's failure + ret = elastic.watch() + if ret == ElasticStatus.COMPLETED: + break + if ret == ElasticStatus.HOLD: + continue + if ret == ElasticStatus.EXIT: + break + if ret == ElasticStatus.ERROR: + sys.exit(3) + if ret == ElasticStatus.RESTART: + sys.exit(ELASTIC_EXIT_CODE) + + if int(elastic.sigint) > 0: + sys.exit(128 + int(elastic.sigint)) + else: + sys.exit(0) diff --git a/python/paddle/distributed/fleet/elastic/collective.py b/python/paddle/distributed/fleet/elastic/collective.py new file mode 100644 index 0000000000000000000000000000000000000000..94fe6a54b5809b2daee9a9335421912bfddaa108 --- /dev/null +++ b/python/paddle/distributed/fleet/elastic/collective.py @@ -0,0 +1,93 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.distributed.fleet import launch_utils +import paddle.distributed.fleet.cloud_utils as cloud_utils +import paddle.distributed.fleet.ascend_utils as ascend_utils + +from paddle.distributed.fleet.launch_utils import * + +from paddle.distributed.fleet.elastic.manager import LauncherInterface + + +class CollectiveLauncher(LauncherInterface): + def __init__(self, args): + self.args = args + self.procs = [] + + def launch(self): + logger.info("collective lauchner launch ...") + args = self.args + # parse arguments, used for cloud-single-machine and local + (device_mode, + devices_per_proc) = launch_utils.get_device_proc_info(args) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}". + format(trainers_num, device_mode, devices_per_proc)) + + cluster = None + pod = None + + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + if cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) + logger.debug("get cluster from cloud:{}".format(cluster)) + elif device_mode == DeviceMode.ASCEND_NPU: + # for ascend + cluster, pod = ascend_utils.get_cloud_cluster( + rank_table_file=os.getenv("RANK_TABLE_FILE", None), + device_mode=device_mode, + start_port=start_port) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = paddle.distributed.fleet.launch.get_cluster_from_args( + args, device_mode, devices_per_proc) + logger.debug("get cluster from args:{}".format(cluster)) + + global_envs = copy.copy(os.environ.copy()) + self.gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = str( + os.getenv("PADDLE_WITH_GLOO", "0")) + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" + global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir + + self.procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir, + envs=global_envs) + + for idx, proc in enumerate(self.procs): + logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + + def stop(self): + logger.info("collective lauchner stop ...") + if not self._terminate_procs(): + logger.error("kill process failed") + if os.path.exists(self.gloo_rendezvous_dir): + shutil.rmtree(self.gloo_rendezvous_dir) + + def watch(self): + logger.debug("collective lauchner watch ...") + for p in self.procs: + if p.log_fn and p.local_rank == 0: + pull_worker_log(p) + ret = self._check_procs() + return ret diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic/manager.py similarity index 100% rename from python/paddle/distributed/fleet/elastic.py rename to python/paddle/distributed/fleet/elastic/manager.py diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index f407892e79acf60b540d160077a3cc18cc7148d2..7704a87e234f6b5e3e84e53062233ba737286a63 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -69,17 +69,13 @@ from argparse import ArgumentParser, REMAINDER import paddle import paddle.fluid as fluid from paddle.distributed.fleet import launch_utils -import signal # TODO(danleifeng): Don't import * from a module from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils import paddle.distributed.fleet.ascend_utils as ascend_utils -from paddle.distributed.fleet.elastic import ElasticManager -from paddle.distributed.fleet.elastic import LauncherInterface -from paddle.distributed.fleet.elastic import ElasticStatus -from paddle.distributed.fleet.elastic import ELASTIC_EXIT_CODE +from paddle.distributed.fleet.elastic import enable_elastic, launch_elastic __all__ = [] @@ -235,76 +231,65 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) -class CollectiveLauncher(LauncherInterface): - def __init__(self, args): - self.args = args - self.procs = [] +def launch_collective(args): + # parse arguments, used for cloud-single-machine and local + (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( + trainers_num, device_mode, devices_per_proc)) + + cluster = None + pod = None + + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + if cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) + logger.debug("get cluster from cloud:{}".format(cluster)) + elif device_mode == DeviceMode.ASCEND_NPU: + # for ascend + cluster, pod = ascend_utils.get_cloud_cluster( + rank_table_file=os.getenv("RANK_TABLE_FILE", None), + device_mode=device_mode, + start_port=start_port) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = get_cluster_from_args(args, device_mode, + devices_per_proc) + logger.debug("get cluster from args:{}".format(cluster)) + + global_envs = copy.copy(os.environ.copy()) + gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" + global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + + procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir, + envs=global_envs) + + for idx, proc in enumerate(procs): + print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) - def launch(self): - logger.info("collective lauchner launch ...") - args = self.args - # parse arguments, used for cloud-single-machine and local - (device_mode, - devices_per_proc) = launch_utils.get_device_proc_info(args) - trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}". - format(trainers_num, device_mode, devices_per_proc)) + while True: + alive = watch_local_trainers(procs, cluster.trainers_nranks()) - cluster = None - pod = None + if not alive: + logger.info("Local processes completed.") + logger.debug("POD info:{}".format(pod)) + break - start_port = 6170 - if os.environ.get('FLAGS_START_PORT') is not None: - start_port = os.environ.get('FLAGS_START_PORT') - if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.ips, device_mode, devices_per_proc, start_port) - logger.debug("get cluster from cloud:{}".format(cluster)) - elif device_mode == DeviceMode.ASCEND_NPU: - # for ascend - cluster, pod = ascend_utils.get_cloud_cluster( - rank_table_file=os.getenv("RANK_TABLE_FILE", None), - device_mode=device_mode, - start_port=start_port) - else: - # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = get_cluster_from_args(args, device_mode, - devices_per_proc) - logger.debug("get cluster from args:{}".format(cluster)) - - global_envs = copy.copy(os.environ.copy()) - self.gloo_rendezvous_dir = tempfile.mkdtemp() - # add gloo env - global_envs["PADDLE_WITH_GLOO"] = str( - os.getenv("PADDLE_WITH_GLOO", "0")) - global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" - global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir - - self.procs = start_local_trainers( - cluster, - pod, - training_script=args.training_script, - training_script_args=args.training_script_args, - log_dir=args.log_dir, - envs=global_envs) - - for idx, proc in enumerate(self.procs): - logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) - - def stop(self): - logger.info("collective lauchner stop ...") - if not self._terminate_procs(): - logger.error("kill process failed") - if os.path.exists(self.gloo_rendezvous_dir): - shutil.rmtree(self.gloo_rendezvous_dir) - - def watch(self): - logger.debug("collective lauchner watch ...") - for p in self.procs: - if p.log_fn and p.local_rank == 0: - pull_worker_log(p) - ret = self._check_procs() - return ret + time.sleep(3) + + if os.path.exists(gloo_rendezvous_dir): + shutil.rmtree(gloo_rendezvous_dir) def launch_ps(args, distribute_mode): @@ -399,42 +384,15 @@ def launch(): _print_arguments(args) distribute_mode = which_distributed_mode(args) - # TODO(kuizhiqing) support ps later - if not distribute_mode == DistributeMode.COLLECTIVE: - launch_ps(args, distribute_mode) - return - - elastic = ElasticManager(args) - signal.signal(signal.SIGTERM, elastic.signal_handler) - signal.signal(signal.SIGABRT, elastic.signal_handler) - signal.signal(signal.SIGINT, elastic.signal_handler) - - while True: - - # wait for all nodes ready to run - elastic.wait() - - # run self with specified launcher - elastic.run(CollectiveLauncher) - - # keep wathing the health status of self and being notified for other's failure - ret = elastic.watch() - if ret == ElasticStatus.COMPLETED: - break - if ret == ElasticStatus.HOLD: - continue - if ret == ElasticStatus.EXIT: - break - if ret == ElasticStatus.ERROR: - sys.exit(3) - if ret == ElasticStatus.RESTART: - sys.exit(ELASTIC_EXIT_CODE) + if enable_elastic(args, distribute_mode): + launch_elastic(args, distribute_mode) + return - if int(elastic.sigint) > 0: - sys.exit(128 + int(elastic.sigint)) + if distribute_mode == DistributeMode.COLLECTIVE: + launch_collective(args) else: - sys.exit(0) + launch_ps(args, distribute_mode) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh index 105ed1356ede3aa593e64d4b8be1e59dbe953ff8..8b618195f55ea089c9801bb9bdce5c033e884b30 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_elastic.sh @@ -17,6 +17,15 @@ echo "begin test elastic" unset GREP_OPTIONS rm -rf log +pids=`ps -ef | grep "python -m paddle.distributed.launch elastic_demo.[py]" | awk '{print $2}'` +if [ -n "$pids" ]; then + echo $pids | xargs kill -9 +fi +pids=`ps -ef | grep "/usr/bin/python -u elastic_demo.[py]" | awk '{print $2}'` +if [ -n "$pids" ]; then + echo $pids | xargs kill -9 +fi + python -m pip install --no-cache-dir etcd3 -i https://mirror.baidu.com/pypi/simple # common env @@ -115,6 +124,8 @@ do fi done +> $lw0 + # rerun node 1 export NVIDIA_VISIBLE_DEVICES=1 export CUDA_VISIBLE_DEVICES=1 @@ -144,5 +155,54 @@ done check_env +> log_0.log + +for i in {1..10} +do + ## kill with -9 + kill -9 $p0 + sleep 1 + if [ `ps -p $p0 | wc -l` == "2" ]; then + echo "force stop node 0 error" + exit -1 + else + echo "force stop node 0 ok" + break + fi +done + +> $lw0 + +# rerun node 0 +export NVIDIA_VISIBLE_DEVICES=0 +export CUDA_VISIBLE_DEVICES=0 +export DISTRIBUTED_TRAINER_ENDPOINTS=10.10.10.10:8001,10.10.10.3:8001 +export PADDLE_TRAINERS=10.10.10.10,10.10.10.3 +export TRAINER_PORTS_NUM=1 +export POD_IP=10.10.10.10 +export PADDLE_TRAINER_ID=0 +export PADDLE_TRAINERS_NUM=2 + +python -m paddle.distributed.launch elastic_demo.py &> log_0.log & +p0=$! + +for i in {1..10} +do + if grep "INFO:ELASTIC:ready with hosts" log_1.log | grep -q '10.10.10.10'; then + echo "rerun node 0 ok" + break + else + sleep 1 + fi + if [ $i -eq 10 ]; then + echo "rerun node 0 error" + exit -1 + fi +done + +check_env + +echo "All check done" + sleep 3 kill $p0 $p1 diff --git a/python/setup.py.in b/python/setup.py.in index 0db6c0c27d743d6d663e97f5d42b78b7036a6b8b..64b5dd4cd21063c7db08a51b65552d0d587f5953 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -149,6 +149,7 @@ packages=['paddle', 'paddle.incubate.operators', 'paddle.distributed.fleet', 'paddle.distributed.fleet.base', + 'paddle.distributed.fleet.elastic', 'paddle.distributed.fleet.meta_optimizers', 'paddle.distributed.fleet.meta_optimizers.sharding', 'paddle.distributed.fleet.meta_optimizers.ascend',