__init__.py 2.7 KB
Newer Older
K
kuizhiqing 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import signal
import os, sys

from .manager import ElasticManager
from .manager import ElasticStatus
from .manager import ELASTIC_EXIT_CODE
21
from .manager import ElasticLevel
K
kuizhiqing 已提交
22 23 24 25 26 27
from .collective import CollectiveLauncher

from paddle.distributed.fleet.launch_utils import DistributeMode


def enable_elastic(args, distribute_mode):
28 29 30 31 32 33 34
    #elastic_level = os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL')
    #if not elastic_level and (elastic_level != ElasticLevel.FAULT_TOLERANCE and
    #                          elastic_level != ElasticLevel.ELASTIC):
    #    return False

    #if distribute_mode != DistributeMode.COLLECTIVE:
    #    return False
K
kuizhiqing 已提交
35 36 37 38 39 40 41

    if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'):
        return False

    if not args.job_id and not os.getenv('PADDLE_ELASTIC_JOB_ID'):
        return False

42
    if not args.np and not os.getenv('PADDLE_ELASTIC_NP'):
K
kuizhiqing 已提交
43 44 45 46 47 48 49
        return False

    return True


def launch_elastic(args, distribute_mode):

50 51 52 53 54
    server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
    srv, port = server.split(':')
    import etcd3
    etcd_client = etcd3.client(host=srv, port=port)
    elastic = ElasticManager(args, etcd_client)
K
kuizhiqing 已提交
55 56 57 58 59 60 61 62 63 64

    signal.signal(signal.SIGTERM, elastic.signal_handler)
    signal.signal(signal.SIGABRT, elastic.signal_handler)
    signal.signal(signal.SIGINT, elastic.signal_handler)

    while True:

        # wait for all nodes ready to run
        elastic.wait()

65 66 67
        # execute pre hook action, eg: run shell
        elastic.pre_hook()

K
kuizhiqing 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        # run self with specified launcher
        elastic.run(CollectiveLauncher)

        # keep wathing the health status of self and being notified for other's failure
        ret = elastic.watch()
        if ret == ElasticStatus.COMPLETED:
            break
        if ret == ElasticStatus.HOLD:
            continue
        if ret == ElasticStatus.EXIT:
            break
        if ret == ElasticStatus.ERROR:
            sys.exit(3)
        if ret == ElasticStatus.RESTART:
            sys.exit(ELASTIC_EXIT_CODE)

    if int(elastic.sigint) > 0:
        sys.exit(128 + int(elastic.sigint))
    else:
        sys.exit(0)