monitor.py 9.8 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14 15 16 17 18 19 20 21 22
"""
Usage:
    Start monitor with one line command
    Example:
        python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
B
barrierye 已提交
23 24
import commands
import datetime
B
barrierye 已提交
25 26 27


class Monitor(object):
B
barrierye 已提交
28 29 30 31
    '''
    Monitor base class. It is used to monitor the remote model, pull and update the local model.
    '''

B
barrierye 已提交
32 33 34 35 36 37 38 39 40
    def __init__(self, interval):
        self._remote_path = None
        self._remote_model_name = None
        self._remote_donefile_name = None
        self._local_path = None
        self._local_model_name = None
        self._local_donefile_name = None
        self._interval = interval
        self._remote_donefile_timestamp = None
B
barrierye 已提交
41
        self._local_tmp_path = None
B
barrierye 已提交
42

B
barrierye 已提交
43 44
    def set_remote_path(self, remote_path):
        self._remote_path = remote_path
B
barrierye 已提交
45

B
barrierye 已提交
46 47
    def set_remote_model_name(self, model_name):
        self._remote_model_name = model_name
B
barrierye 已提交
48

B
barrierye 已提交
49 50
    def set_remote_donefile_name(self, donefile_name):
        self._remote_donefile_name = donefile_name
B
barrierye 已提交
51

B
barrierye 已提交
52 53
    def set_local_path(self, local_path):
        self._local_path = local_path
B
barrierye 已提交
54

B
barrierye 已提交
55 56
    def set_local_model_name(self, model_name):
        self._local_model_name = model_name
B
barrierye 已提交
57

B
barrierye 已提交
58 59
    def set_local_donefile_name(self, donefile_name):
        self._local_donefile_name = donefile_name
B
barrierye 已提交
60

B
barrierye 已提交
61 62
    def set_local_tmp_path(self, tmp_path):
        self._local_tmp_path = tmp_path
B
barrierye 已提交
63

B
barrierye 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76
    def _check_params(self):
        if self._remote_path is None:
            raise Exception('remote_path not set.')
        if self._remote_model_name is None:
            raise Exception('remote_model_name not set.')
        if self._remote_donefile_name is None:
            raise Exception('remote_donefile_name not set.')
        if self._local_model_name is None:
            raise Exception('local_model_name not set.')
        if self._local_path is None:
            raise Exception('local_path not set.')
        if self._local_donefile_name is None:
            raise Exception('local_donefile_name not set.')
B
barrierye 已提交
77 78
        if self._local_tmp_path is None:
            raise Exception('local_tmp_path not set.')
B
barrierye 已提交
79

B
barrierye 已提交
80
    def run(self):
B
barrierye 已提交
81
        '''
B
barrierye 已提交
82
        Monitor the remote model by polling and update the local model.
B
barrierye 已提交
83
        '''
B
barrierye 已提交
84
        self._check_params()
B
barrierye 已提交
85 86
        if not os.path.exists(self._local_tmp_path):
            os.makedirs(self._local_tmp_path)
B
barrierye 已提交
87
        while True:
B
barrierye 已提交
88 89
            [flag, timestamp] = self._exist_remote_file(
                self._remote_path, self._remote_donefile_name)
B
barrierye 已提交
90 91 92 93
            if flag:
                if self._remote_donefile_timestamp is None or \
                        timestamp != self._remote_donefile_timestamp:
                    self._remote_donefile_timestamp = timestamp
B
barrierye 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
                    self._pull_remote_dir(self._remote_path,
                                          self._remote_model_name,
                                          self._local_tmp_path)
                    print('{} [INFO] pull remote model'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
                    self._update_local_model(
                        self._local_tmp_path, self._remote_model_name,
                        self._local_path, self._local_model_name)
                    print('{} [INFO] update model'.format(datetime.datetime.now(
                    ).strftime('%Y-%m-%d %H:%M:%S')))
                    self._update_local_donefile(self._local_path,
                                                self._local_model_name,
                                                self._local_donefile_name)
                    print('{} [INFO] update local donefile'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
B
barrierye 已提交
109
            else:
B
barrierye 已提交
110 111 112 113
                print('{} [INFO] no donefile.'.format(datetime.datetime.now(
                ).strftime('%Y-%m-%d %H:%M:%S')))
            print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime(
                '%Y-%m-%d %H:%M:%S'), self._interval))
B
barrierye 已提交
114
            time.sleep(self._interval)
B
barrierye 已提交
115

B
barrierye 已提交
116
    def _exist_remote_file(self, path, filename):
B
barrierye 已提交
117
        raise Exception('This function must be inherited.')
B
barrierye 已提交
118

B
barrierye 已提交
119
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
120
        raise Exception('This function must be inherited.')
B
barrierye 已提交
121

B
barrierye 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
                            local_model_name):
        tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
        local_model_path = os.path.join(local_path, local_model_name)
        cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
        if os.system(cmd) != 0:
            raise Exception('update local model failed.')

    def _update_local_donefile(self, local_path, local_model_name,
                               local_donefile_name):
        donefile_path = os.path.join(local_path, local_model_name,
                                     local_donefile_name)
        cmd = 'touch {}'.format(donefile_path)
        if os.system(cmd) != 0:
            raise Exception('update local donefile failed.')
B
barrierye 已提交
137

B
barrierye 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

class HDFSMonitor(Monitor):
    ''' HDFS Monitor. '''

    def __init__(self, bin_path, interval=10):
        super(HDFSMonitor, self).__init__(interval)
        self._hdfs_bin_path = bin_path

    def _exist_remote_file(self, path, filename):
        remote_filepath = os.path.join(path, filename)
        cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path,
                                            remote_filepath)
        [status, timestamp] = commands.getstatusoutput(cmd)
        if status == 0:
            return [True, timestamp]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        remote_dirpath = os.path.join(remote_path, dirname)
        cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath,
                                            local_tmp_path)
        if os.system(cmd) != 0:
            raise Exception('pull remote dir failed.')

B
barrierye 已提交
163 164

class FTPMonitor(Monitor):
B
barrierye 已提交
165 166
    ''' FTP Monitor. '''

B
barrierye 已提交
167 168
    def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10):
        super(FTPMonitor, self).__init__(interval)
B
barrierye 已提交
169
        import ftplib
B
barrierye 已提交
170 171 172 173
        self._ftp_ip = ftp_ip
        self._ftp_port = ftp_port
        self._ftp = ftplib.FTP()
        self._connect(ftp_ip, ftp_port, username, password)
B
barrierye 已提交
174

B
barrierye 已提交
175 176 177
    def _connect(self, ftp_ip, ftp_port, username, password):
        self._ftp.connect(ftp_ip, ftp_port)
        self._ftp.login(username, password)
B
barrierye 已提交
178

B
barrierye 已提交
179
    def _exist_remote_file(self, path, filename):
B
barrierye 已提交
180
        import ftplib
B
barrierye 已提交
181
        try:
B
barrierye 已提交
182 183 184
            filepath = os.path.join(path, filename)
            timestamp = self._ftp.voidcmd('MDTM {}'.format(filepath))[4:].strip(
            )
B
barrierye 已提交
185 186 187
            return [True, timestamp]
        except ftplib.error_perm:
            return [False, None]
B
barrierye 已提交
188

B
barrierye 已提交
189 190 191 192
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        filepath = os.path.join(remote_path, dirname)
        cmd = 'wget -nH -r -P {} ftp://{}:{}/{} &> /dev/null'.format(
            local_tmp_path, self._ftp_ip, self._ftp_port, filepath)
B
barrierye 已提交
193
        if os.system(cmd) != 0:
B
barrierye 已提交
194
            raise Exception('pull remote dir failed.')
B
barrierye 已提交
195

B
barrierye 已提交
196

B
barrierye 已提交
197
def parse_args():
B
barrierye 已提交
198
    ''' parse args. '''
B
barrierye 已提交
199 200 201 202 203 204
    parser = argparse.ArgumentParser(description="Monitor")
    parser.add_argument(
        "--type", type=str, required=True, help="Type of remote server")
    parser.add_argument(
        "--remote_path", type=str, required=True, help="Remote path")
    parser.add_argument(
B
barrierye 已提交
205 206 207
        "--remote_model_name",
        type=str,
        required=True,
B
barrierye 已提交
208 209
        help="Remote model name")
    parser.add_argument(
B
barrierye 已提交
210 211 212
        "--remote_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
213 214 215 216 217 218
        help="Remote donefile name")
    parser.add_argument(
        "--local_path", type=str, required=True, help="Local path")
    parser.add_argument(
        "--local_model_name", type=str, required=True, help="Local model name")
    parser.add_argument(
B
barrierye 已提交
219 220 221
        "--local_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
222
        help="Local donfile name(fluid_time_file in model file)")
B
barrierye 已提交
223
    parser.add_argument(
B
barrierye 已提交
224
        "--local_tmp_path", type=str, default='tmp', help="Local tmp path")
B
barrierye 已提交
225 226
    parser.add_argument(
        "--interval", type=int, default=10, help="Time interval")
B
barrierye 已提交
227 228
    parser.add_argument("--ftp_ip", type=str, help="Ip the ftp")
    parser.add_argument("--ftp_port", type=int, help="Port the ftp")
B
barrierye 已提交
229
    parser.add_argument(
B
barrierye 已提交
230
        "--hdfs_bin", type=str, default='hdfs', help="Hdfs binary file path")
B
barrierye 已提交
231 232
    return parser.parse_args()

B
barrierye 已提交
233

B
barrierye 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
def get_monitor(mtype):
    if mtype == 'ftp':
        return FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval)
    elif mtype == 'hdfs':
        return HDFSMonitor(args.hdfs_bin, interval=args.interval)
    else:
        raise Exception('unsupport type.')


def start_monitor(monitor, args):
    monitor.set_remote_path(args.remote_path)
    monitor.set_remote_model_name(args.remote_model_name)
    monitor.set_remote_donefile_name(args.remote_donefile_name)
    monitor.set_local_path(args.local_path)
    monitor.set_local_model_name(args.local_model_name)
    monitor.set_local_donefile_name(args.local_donefile_name)
    monitor.set_local_tmp_path(args.local_tmp_path)
    monitor.run()
B
barrierye 已提交
252

B
barrierye 已提交
253

B
barrierye 已提交
254 255
if __name__ == "__main__":
    args = parse_args()
B
barrierye 已提交
256 257
    monitor = get_monitor(args.type)
    start_monitor(monitor, args)