未验证 提交 5440d2f9 编写于 作者: X xiayanming 提交者: GitHub

[Auto Parallel] elastic support auto parallel re-launch (#37523)

* [Auto Parallel] elastic support auto parallel re-launch

* [Auto Parallel] elastic support auto parallel re-launch

* fix ci issue

* fix ci issue

* fix rank mapping unittest

* fix rank mapping unittest

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue

* fix ci issue
上级 3d2ec707
...@@ -18,14 +18,20 @@ import os, sys ...@@ -18,14 +18,20 @@ import os, sys
from .manager import ElasticManager from .manager import ElasticManager
from .manager import ElasticStatus from .manager import ElasticStatus
from .manager import ELASTIC_EXIT_CODE from .manager import ELASTIC_EXIT_CODE
from .manager import ElasticLevel
from .collective import CollectiveLauncher from .collective import CollectiveLauncher
from paddle.distributed.fleet.launch_utils import DistributeMode from paddle.distributed.fleet.launch_utils import DistributeMode
def enable_elastic(args, distribute_mode): def enable_elastic(args, distribute_mode):
if distribute_mode != DistributeMode.COLLECTIVE: #elastic_level = os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL')
return False #if not elastic_level and (elastic_level != ElasticLevel.FAULT_TOLERANCE and
# elastic_level != ElasticLevel.ELASTIC):
# return False
#if distribute_mode != DistributeMode.COLLECTIVE:
# return False
if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'): if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'):
return False return False
......
...@@ -30,42 +30,10 @@ class CollectiveLauncher(LauncherInterface): ...@@ -30,42 +30,10 @@ class CollectiveLauncher(LauncherInterface):
def launch(self): def launch(self):
logger.info("collective lauchner launch ...") logger.info("collective lauchner launch ...")
args = self.args args = self.args
# parse arguments, used for cloud-single-machine and local self.tmp_dir = tempfile.mkdtemp()
(device_mode, global_envs = paddle.distributed.fleet.launch.get_global_envs(
devices_per_proc) = launch_utils.get_device_proc_info(args) args, self.tmp_dir)
trainers_num = cloud_utils.get_trainers_num() cluster, pod = paddle.distributed.fleet.launch.get_cluster_info(args)
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".
format(trainers_num, device_mode, devices_per_proc))
cluster = None
pod = None
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = paddle.distributed.fleet.launch.get_cluster_from_args(
args, device_mode, devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(
os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir
self.procs = start_local_trainers( self.procs = start_local_trainers(
cluster, cluster,
...@@ -82,8 +50,8 @@ class CollectiveLauncher(LauncherInterface): ...@@ -82,8 +50,8 @@ class CollectiveLauncher(LauncherInterface):
logger.info("collective lauchner stop ...") logger.info("collective lauchner stop ...")
if not self._terminate_procs(): if not self._terminate_procs():
logger.error("kill process failed") logger.error("kill process failed")
if os.path.exists(self.gloo_rendezvous_dir): if os.path.exists(self.tmp_dir):
shutil.rmtree(self.gloo_rendezvous_dir) shutil.rmtree(self.tmp_dir)
def watch(self): def watch(self):
logger.debug("collective lauchner watch ...") logger.debug("collective lauchner watch ...")
......
...@@ -35,6 +35,7 @@ ch.setFormatter(formatter) ...@@ -35,6 +35,7 @@ ch.setFormatter(formatter)
logger.addHandler(ch) logger.addHandler(ch)
ELASTIC_EXIT_CODE = 101 ELASTIC_EXIT_CODE = 101
ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102
# wait for timeout, unit: seconds # wait for timeout, unit: seconds
ELASTIC_TIMEOUT = 2 * 60 ELASTIC_TIMEOUT = 2 * 60
...@@ -103,6 +104,9 @@ class LauncherInterface(object): ...@@ -103,6 +104,9 @@ class LauncherInterface(object):
if ret is None: if ret is None:
alive = True alive = True
elif ret != 0: elif ret != 0:
if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
logger.info("return form elastic auto parallel re-launch")
return ret
logger.error("ABORT!!! ABORT!!! ABORT!!!") logger.error("ABORT!!! ABORT!!! ABORT!!!")
logger.error( logger.error(
"ERROR rank {} error with exit code {}, check log for detail.". "ERROR rank {} error with exit code {}, check log for detail.".
...@@ -232,6 +236,7 @@ class ElasticManager(object): ...@@ -232,6 +236,7 @@ class ElasticManager(object):
six.ensure_str(i[0]) six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix) for i in self.etcd.get_prefix(self.node_prefix)
] ]
self.hosts = list(set(self.hosts)) if self.hosts else self.hosts
logger.info( logger.info(
f"host_call_back curr_host={self.curr_host}, hosts:{self.hosts}") f"host_call_back curr_host={self.curr_host}, hosts:{self.hosts}")
self.need_sync = True self.need_sync = True
...@@ -251,6 +256,7 @@ class ElasticManager(object): ...@@ -251,6 +256,7 @@ class ElasticManager(object):
six.ensure_str(i[0]) six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix) for i in self.etcd.get_prefix(self.node_prefix)
] ]
hosts = list(set(hosts)) if hosts else hosts
logger.info( logger.info(
f"[lease_heartbeat] curr_host={self.curr_host}, hosts={hosts}" f"[lease_heartbeat] curr_host={self.curr_host}, hosts={hosts}"
) )
...@@ -335,6 +341,7 @@ class ElasticManager(object): ...@@ -335,6 +341,7 @@ class ElasticManager(object):
if not self.args.elastic_pre_hook: if not self.args.elastic_pre_hook:
logger.info("skip pre_hook") logger.info("skip pre_hook")
return return
logger.info("execute pre_hook...")
current_env = copy.copy(os.environ.copy()) current_env = copy.copy(os.environ.copy())
out, err = subprocess.Popen( out, err = subprocess.Popen(
self.args.elastic_pre_hook, self.args.elastic_pre_hook,
...@@ -391,6 +398,7 @@ class ElasticManager(object): ...@@ -391,6 +398,7 @@ class ElasticManager(object):
six.ensure_str(i[0]) six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix) for i in self.etcd.get_prefix(self.node_prefix)
] ]
self.hosts = list(set(self.hosts)) if self.hosts else self.hosts
if self.elastic_level == ElasticLevel.FAULT_TOLERANCE: if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
if len(self.hosts) == self.np: if len(self.hosts) == self.np:
...@@ -430,6 +438,9 @@ class ElasticManager(object): ...@@ -430,6 +438,9 @@ class ElasticManager(object):
def _update_fault_tolrance(self): def _update_fault_tolrance(self):
rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) rank = int(os.getenv('PADDLE_TRAINER_ID', -1))
logger.debug(
f"self.curr_host={self.curr_host}, self.dist_endpoints={self.dist_endpoints}"
)
if self.curr_host in self.dist_endpoints: if self.curr_host in self.dist_endpoints:
os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
os.environ['PADDLE_TRAINERS'] = self.trainers os.environ['PADDLE_TRAINERS'] = self.trainers
...@@ -550,7 +561,6 @@ class ElasticManager(object): ...@@ -550,7 +561,6 @@ class ElasticManager(object):
self.hosts)) self.hosts))
idx += 1 idx += 1
time.sleep(2) time.sleep(2)
return return
def run(self, launcher): def run(self, launcher):
...@@ -571,6 +581,11 @@ class ElasticManager(object): ...@@ -571,6 +581,11 @@ class ElasticManager(object):
if ret is not None: # self terminated if ret is not None: # self terminated
logger.info('job exit with code {}'.format(ret)) logger.info('job exit with code {}'.format(ret))
if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
logger.info('job re-launch for auto parallel')
self.launcher.stop()
return ElasticStatus.HOLD
# process is completed if ret >= 0 or error else # process is completed if ret >= 0 or error else
completed = True if ret == 0 else False completed = True if ret == 0 else False
self.exit(completed=completed) self.exit(completed=completed)
......
...@@ -65,6 +65,7 @@ import os ...@@ -65,6 +65,7 @@ import os
import time import time
import six import six
import copy import copy
import pathlib
import argparse import argparse
from argparse import ArgumentParser, REMAINDER from argparse import ArgumentParser, REMAINDER
import paddle import paddle
...@@ -283,7 +284,7 @@ def cpuonly_check(args): ...@@ -283,7 +284,7 @@ def cpuonly_check(args):
return True return True
def launch_collective(args): def get_cluster_info(args):
# parse arguments, used for cloud-single-machine and local # parse arguments, used for cloud-single-machine and local
if args.backend == 'gloo': cpuonly_check(args) if args.backend == 'gloo': cpuonly_check(args)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
...@@ -316,14 +317,23 @@ def launch_collective(args): ...@@ -316,14 +317,23 @@ def launch_collective(args):
cluster, pod = get_cluster_from_args(args, device_mode, cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc) devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster)) logger.debug("get cluster from args:{}".format(cluster))
return cluster, pod
def get_global_envs(args, tmp_dir):
global_envs = copy.copy(os.environ.copy()) global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env # add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir global_envs["PADDLE_GLOO_FS_PATH"] = tmp_dir
global_envs["PADDLE_DISTRI_BACKEND"] = args.backend global_envs["PADDLE_DISTRI_BACKEND"] = args.backend
return global_envs
def launch_collective(args):
tmp_dir = tempfile.mkdtemp()
cluster, pod = get_cluster_info(args)
global_envs = get_global_envs(args, tmp_dir)
procs = start_local_trainers( procs = start_local_trainers(
cluster, cluster,
...@@ -352,8 +362,8 @@ def launch_collective(args): ...@@ -352,8 +362,8 @@ def launch_collective(args):
terminate_local_procs(procs) terminate_local_procs(procs)
exit(1) exit(1)
if os.path.exists(gloo_rendezvous_dir): if os.path.exists(tmp_dir):
shutil.rmtree(gloo_rendezvous_dir) shutil.rmtree(tmp_dir)
def launch_ps(args, distribute_mode): def launch_ps(args, distribute_mode):
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import json
import unittest
import argparse
import tempfile
import traceback
from warnings import catch_warnings
from paddle.distributed.fleet.elastic.collective import CollectiveLauncher
from paddle.distributed.fleet.launch import launch_collective
fake_python_code = """
print("test")
"""
class TestCollectiveLauncher(unittest.TestCase):
def setUp(self):
file_dir = os.path.dirname(os.path.abspath(__file__))
self.code_path = os.path.join(file_dir, "fake_python_for_elastic.py")
with open(self.code_path, "w") as f:
f.write(fake_python_code)
def test_launch(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "1"
gpus = "0"
nproc_per_node = 1
host = None
curr_host = None
ips = "127.0.0.1"
scale = None
force = None
backend = 'gloo'
enable_auto_mapping = False
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script_args = ["--use_amp false"]
log_dir = None
args = Argument()
launch = CollectiveLauncher(args)
try:
args.backend = "gloo"
launch.launch()
launch.stop()
except Exception as e:
pass
try:
args.backend = "gloo"
launch_collective(args)
except Exception as e:
pass
def test_stop(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "1"
gpus = "0"
nproc_per_node = 1
host = None
curr_host = None
ips = "127.0.0.1"
scale = None
force = None
backend = 'gloo'
enable_auto_mapping = False
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script_args = ["--use_amp false"]
log_dir = None
args = Argument()
try:
launch = CollectiveLauncher(args)
launch.tmp_dir = tempfile.mkdtemp()
launch.stop()
except Exception as e:
pass
if __name__ == "__main__":
unittest.main()
...@@ -20,7 +20,9 @@ import unittest ...@@ -20,7 +20,9 @@ import unittest
import argparse import argparse
from paddle.distributed.fleet.elastic.manager import ElasticManager from paddle.distributed.fleet.elastic.manager import ElasticManager
from paddle.distributed.fleet.elastic.manager import LauncherInterface
from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT
from paddle.distributed.fleet.elastic.manager import ELASTIC_AUTO_PARALLEL_EXIT_CODE
class MockLease(): class MockLease():
...@@ -347,6 +349,47 @@ class TestElasticManager(unittest.TestCase): ...@@ -347,6 +349,47 @@ class TestElasticManager(unittest.TestCase):
args.elastic_pre_hook = "hostname" args.elastic_pre_hook = "hostname"
elastic.pre_hook() elastic.pre_hook()
def test_watch(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "2"
gpus = "0"
nproc_per_node = 1
host = None
curr_host = None
ips = None
scale = None
force = None
backend = 'gloo'
elastic_pre_hook = None
class ElasticLauncher:
def watch(self):
return ELASTIC_AUTO_PARALLEL_EXIT_CODE
def stop(self):
pass
args = Argument()
elastic = ElasticManager(args, self.etcd_client)
elastic.stopped = False
elastic.launcher = ElasticLauncher()
elastic.watch()
def test_launcher_interface_check_procs(self):
class Proc:
def poll(self):
return ELASTIC_AUTO_PARALLEL_EXIT_CODE
class ProcList:
def __init__(self):
self.proc = Proc()
launch = LauncherInterface(None)
launch.procs = [ProcList()]
launch._check_procs()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册