diff --git a/python/paddle/distributed/fleet/elastic/__init__.py b/python/paddle/distributed/fleet/elastic/__init__.py index 127aba7d93717c6ad0ad055d451647bfe319d9b4..503d2966a80e7fd22b3a762d33b71f65d13eaa6c 100644 --- a/python/paddle/distributed/fleet/elastic/__init__.py +++ b/python/paddle/distributed/fleet/elastic/__init__.py @@ -18,14 +18,20 @@ import os, sys from .manager import ElasticManager from .manager import ElasticStatus from .manager import ELASTIC_EXIT_CODE +from .manager import ElasticLevel from .collective import CollectiveLauncher from paddle.distributed.fleet.launch_utils import DistributeMode def enable_elastic(args, distribute_mode): - if distribute_mode != DistributeMode.COLLECTIVE: - return False + #elastic_level = os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL') + #if not elastic_level and (elastic_level != ElasticLevel.FAULT_TOLERANCE and + # elastic_level != ElasticLevel.ELASTIC): + # return False + + #if distribute_mode != DistributeMode.COLLECTIVE: + # return False if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'): return False diff --git a/python/paddle/distributed/fleet/elastic/collective.py b/python/paddle/distributed/fleet/elastic/collective.py index d9c2735c4bd0191ac4c71e3a4301addd0192dc93..82055314b0dc8c7820f159fb641e17b37a50b42e 100644 --- a/python/paddle/distributed/fleet/elastic/collective.py +++ b/python/paddle/distributed/fleet/elastic/collective.py @@ -30,42 +30,10 @@ class CollectiveLauncher(LauncherInterface): def launch(self): logger.info("collective lauchner launch ...") args = self.args - # parse arguments, used for cloud-single-machine and local - (device_mode, - devices_per_proc) = launch_utils.get_device_proc_info(args) - trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}". - format(trainers_num, device_mode, devices_per_proc)) - - cluster = None - pod = None - - start_port = 6170 - if os.environ.get('FLAGS_START_PORT') is not None: - start_port = os.environ.get('FLAGS_START_PORT') - if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.ips, device_mode, devices_per_proc, start_port) - logger.debug("get cluster from cloud:{}".format(cluster)) - elif device_mode == DeviceMode.ASCEND_NPU: - # for ascend - cluster, pod = ascend_utils.get_cloud_cluster( - rank_table_file=os.getenv("RANK_TABLE_FILE", None), - device_mode=device_mode, - start_port=start_port) - else: - # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = paddle.distributed.fleet.launch.get_cluster_from_args( - args, device_mode, devices_per_proc) - logger.debug("get cluster from args:{}".format(cluster)) - - global_envs = copy.copy(os.environ.copy()) - self.gloo_rendezvous_dir = tempfile.mkdtemp() - # add gloo env - global_envs["PADDLE_WITH_GLOO"] = str( - os.getenv("PADDLE_WITH_GLOO", "0")) - global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" - global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir + self.tmp_dir = tempfile.mkdtemp() + global_envs = paddle.distributed.fleet.launch.get_global_envs( + args, self.tmp_dir) + cluster, pod = paddle.distributed.fleet.launch.get_cluster_info(args) self.procs = start_local_trainers( cluster, @@ -82,8 +50,8 @@ class CollectiveLauncher(LauncherInterface): logger.info("collective lauchner stop ...") if not self._terminate_procs(): logger.error("kill process failed") - if os.path.exists(self.gloo_rendezvous_dir): - shutil.rmtree(self.gloo_rendezvous_dir) + if os.path.exists(self.tmp_dir): + shutil.rmtree(self.tmp_dir) def watch(self): logger.debug("collective lauchner watch ...") diff --git a/python/paddle/distributed/fleet/elastic/manager.py b/python/paddle/distributed/fleet/elastic/manager.py index 391c563166318092687bfe8c3b714c203241c540..1716e332c8286178e6381ba5ee7d4569ff271b8a 100644 --- a/python/paddle/distributed/fleet/elastic/manager.py +++ b/python/paddle/distributed/fleet/elastic/manager.py @@ -35,6 +35,7 @@ ch.setFormatter(formatter) logger.addHandler(ch) ELASTIC_EXIT_CODE = 101 +ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102 # wait for timeout, unit: seconds ELASTIC_TIMEOUT = 2 * 60 @@ -103,6 +104,9 @@ class LauncherInterface(object): if ret is None: alive = True elif ret != 0: + if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE: + logger.info("return form elastic auto parallel re-launch") + return ret logger.error("ABORT!!! ABORT!!! ABORT!!!") logger.error( "ERROR rank {} error with exit code {}, check log for detail.". @@ -232,6 +236,7 @@ class ElasticManager(object): six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) ] + self.hosts = list(set(self.hosts)) if self.hosts else self.hosts logger.info( f"host_call_back curr_host={self.curr_host}, hosts:{self.hosts}") self.need_sync = True @@ -251,6 +256,7 @@ class ElasticManager(object): six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) ] + hosts = list(set(hosts)) if hosts else hosts logger.info( f"[lease_heartbeat] curr_host={self.curr_host}, hosts={hosts}" ) @@ -335,6 +341,7 @@ class ElasticManager(object): if not self.args.elastic_pre_hook: logger.info("skip pre_hook") return + logger.info("execute pre_hook...") current_env = copy.copy(os.environ.copy()) out, err = subprocess.Popen( self.args.elastic_pre_hook, @@ -391,6 +398,7 @@ class ElasticManager(object): six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) ] + self.hosts = list(set(self.hosts)) if self.hosts else self.hosts if self.elastic_level == ElasticLevel.FAULT_TOLERANCE: if len(self.hosts) == self.np: @@ -430,6 +438,9 @@ class ElasticManager(object): def _update_fault_tolrance(self): rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) + logger.debug( + f"self.curr_host={self.curr_host}, self.dist_endpoints={self.dist_endpoints}" + ) if self.curr_host in self.dist_endpoints: os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints os.environ['PADDLE_TRAINERS'] = self.trainers @@ -550,7 +561,6 @@ class ElasticManager(object): self.hosts)) idx += 1 time.sleep(2) - return def run(self, launcher): @@ -571,6 +581,11 @@ class ElasticManager(object): if ret is not None: # self terminated logger.info('job exit with code {}'.format(ret)) + if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE: + logger.info('job re-launch for auto parallel') + self.launcher.stop() + return ElasticStatus.HOLD + # process is completed if ret >= 0 or error else completed = True if ret == 0 else False self.exit(completed=completed) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 2595512789bb6b6d24ed9a69799f2f46345341df..0aae3331793ca7ebc4ecd16805b828fa542775f3 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -65,6 +65,7 @@ import os import time import six import copy +import pathlib import argparse from argparse import ArgumentParser, REMAINDER import paddle @@ -283,7 +284,7 @@ def cpuonly_check(args): return True -def launch_collective(args): +def get_cluster_info(args): # parse arguments, used for cloud-single-machine and local if args.backend == 'gloo': cpuonly_check(args) (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) @@ -316,14 +317,23 @@ def launch_collective(args): cluster, pod = get_cluster_from_args(args, device_mode, devices_per_proc) logger.debug("get cluster from args:{}".format(cluster)) + return cluster, pod + +def get_global_envs(args, tmp_dir): global_envs = copy.copy(os.environ.copy()) - gloo_rendezvous_dir = tempfile.mkdtemp() # add gloo env global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" - global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + global_envs["PADDLE_GLOO_FS_PATH"] = tmp_dir global_envs["PADDLE_DISTRI_BACKEND"] = args.backend + return global_envs + + +def launch_collective(args): + tmp_dir = tempfile.mkdtemp() + cluster, pod = get_cluster_info(args) + global_envs = get_global_envs(args, tmp_dir) procs = start_local_trainers( cluster, @@ -352,8 +362,8 @@ def launch_collective(args): terminate_local_procs(procs) exit(1) - if os.path.exists(gloo_rendezvous_dir): - shutil.rmtree(gloo_rendezvous_dir) + if os.path.exists(tmp_dir): + shutil.rmtree(tmp_dir) def launch_ps(args, distribute_mode): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_elastic_collective.py b/python/paddle/fluid/tests/unittests/test_fleet_elastic_collective.py new file mode 100644 index 0000000000000000000000000000000000000000..2d2f019c5ed09df3b63a80cd47e6245823634dcc --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_elastic_collective.py @@ -0,0 +1,111 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import time +import json +import unittest +import argparse +import tempfile +import traceback +from warnings import catch_warnings + +from paddle.distributed.fleet.elastic.collective import CollectiveLauncher +from paddle.distributed.fleet.launch import launch_collective + +fake_python_code = """ +print("test") +""" + + +class TestCollectiveLauncher(unittest.TestCase): + def setUp(self): + file_dir = os.path.dirname(os.path.abspath(__file__)) + + self.code_path = os.path.join(file_dir, "fake_python_for_elastic.py") + with open(self.code_path, "w") as f: + f.write(fake_python_code) + + def test_launch(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "1" + gpus = "0" + nproc_per_node = 1 + host = None + curr_host = None + ips = "127.0.0.1" + scale = None + force = None + backend = 'gloo' + enable_auto_mapping = False + run_mode = "cpuonly" + servers = None + rank_mapping_path = None + training_script = "fake_python_for_elastic.py" + training_script_args = ["--use_amp false"] + log_dir = None + + args = Argument() + + launch = CollectiveLauncher(args) + + try: + args.backend = "gloo" + launch.launch() + launch.stop() + except Exception as e: + pass + + try: + args.backend = "gloo" + launch_collective(args) + except Exception as e: + pass + + def test_stop(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "1" + gpus = "0" + nproc_per_node = 1 + host = None + curr_host = None + ips = "127.0.0.1" + scale = None + force = None + backend = 'gloo' + enable_auto_mapping = False + run_mode = "cpuonly" + servers = None + rank_mapping_path = None + training_script = "fake_python_for_elastic.py" + training_script_args = ["--use_amp false"] + log_dir = None + + args = Argument() + try: + launch = CollectiveLauncher(args) + launch.tmp_dir = tempfile.mkdtemp() + launch.stop() + except Exception as e: + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py b/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py index 149304e505c12372476fbb24d836d750e1506b35..6dc9f69d03f7ce8d7e5d70f63761f9b2e77eef2f 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_elastic_manager.py @@ -20,7 +20,9 @@ import unittest import argparse from paddle.distributed.fleet.elastic.manager import ElasticManager +from paddle.distributed.fleet.elastic.manager import LauncherInterface from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT +from paddle.distributed.fleet.elastic.manager import ELASTIC_AUTO_PARALLEL_EXIT_CODE class MockLease(): @@ -347,6 +349,47 @@ class TestElasticManager(unittest.TestCase): args.elastic_pre_hook = "hostname" elastic.pre_hook() + def test_watch(self): + class Argument: + elastic_server = "127.0.0.1:2379" + job_id = "test_job_id_123" + np = "2" + gpus = "0" + nproc_per_node = 1 + host = None + curr_host = None + ips = None + scale = None + force = None + backend = 'gloo' + elastic_pre_hook = None + + class ElasticLauncher: + def watch(self): + return ELASTIC_AUTO_PARALLEL_EXIT_CODE + + def stop(self): + pass + + args = Argument() + elastic = ElasticManager(args, self.etcd_client) + elastic.stopped = False + elastic.launcher = ElasticLauncher() + elastic.watch() + + def test_launcher_interface_check_procs(self): + class Proc: + def poll(self): + return ELASTIC_AUTO_PARALLEL_EXIT_CODE + + class ProcList: + def __init__(self): + self.proc = Proc() + + launch = LauncherInterface(None) + launch.procs = [ProcList()] + launch._check_procs() + if __name__ == "__main__": unittest.main()