elastic.py 2.2 KB
Newer Older
K
kuizhiqing 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os


class Command(object):
    def __init__(self, server, name):
        import etcd3

        srv, port = server.split(':')
        self.etcd = etcd3.client(host=srv, port=port)

        self.prefix = "/paddle/" + name
        self.node_prefix = self.prefix + '/nodes'
        self.np_path = self.prefix + '/np'

    def set_np(self, np):
31
        self.etcd.put(self.np_path, '{}'.format(np).encode('latin-1'))
K
kuizhiqing 已提交
32 33

    def scale_np(self, np):
34
        if self.etcd.get(self.np_path)[0] is not None:
K
kuizhiqing 已提交
35 36 37 38
            self.set_np(np)
            return True
        return False

K
kuizhiqing 已提交
39 40 41
    def clean(self):
        self.etcd.delete_prefix(self.prefix)

K
kuizhiqing 已提交
42 43 44 45 46 47 48
    def close(self):
        self.etcd.close()


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Elastic Command')
49 50 51
    parser.add_argument(
        "--elastic_server", type=str, help="etcd server host:port"
    )
K
kuizhiqing 已提交
52
    parser.add_argument("--job_id", type=str, help="job unique id")
53 54 55
    parser.add_argument(
        "--np",
        type=str,
56 57
        help="job pod/node number, need to be 'MIN' or 'MIN:MAX' format",
    )
K
kuizhiqing 已提交
58 59 60 61 62 63 64
    parser.add_argument("action", type=str, help="action to take")

    args = parser.parse_args()

    server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
    name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')

65
    np = int(args.np.split(":")[0]) or int(os.getenv('PADDLE_ELASTIC_NP', 0))
K
kuizhiqing 已提交
66 67 68 69 70 71

    cmd = Command(server, name)

    if args.action == "scale":
        cmd.scale_np(np)

K
kuizhiqing 已提交
72 73 74
    if args.action == "clean":
        cmd.clean()

K
kuizhiqing 已提交
75 76 77
    print("action {} done".format(args.action))

    cmd.close()