monitor.py 17.2 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14 15 16 17 18 19 20 21 22
"""
Usage:
    Start monitor with one line command
    Example:
        python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
B
barrierye 已提交
23 24
import commands
import datetime
B
barrierye 已提交
25
import shutil
B
barrierye 已提交
26
import tarfile
B
barrierye 已提交
27 28 29


class Monitor(object):
B
barrierye 已提交
30 31 32 33
    '''
    Monitor base class. It is used to monitor the remote model, pull and update the local model.
    '''

B
barrierye 已提交
34 35 36 37 38 39
    def __init__(self, interval):
        self._remote_path = None
        self._remote_model_name = None
        self._remote_donefile_name = None
        self._local_path = None
        self._local_model_name = None
B
barrierye 已提交
40
        self._local_timestamp_file = None
B
barrierye 已提交
41 42
        self._interval = interval
        self._remote_donefile_timestamp = None
B
barrierye 已提交
43
        self._local_tmp_path = None
B
barrierye 已提交
44
        self._unpacked_filename = None
B
barrierye 已提交
45

B
barrierye 已提交
46 47
    def set_remote_path(self, remote_path):
        self._remote_path = remote_path
B
barrierye 已提交
48

B
barrierye 已提交
49 50
    def set_remote_model_name(self, model_name):
        self._remote_model_name = model_name
B
barrierye 已提交
51

B
barrierye 已提交
52 53
    def set_remote_donefile_name(self, donefile_name):
        self._remote_donefile_name = donefile_name
B
barrierye 已提交
54

B
barrierye 已提交
55 56
    def set_local_path(self, local_path):
        self._local_path = local_path
B
barrierye 已提交
57

B
barrierye 已提交
58 59
    def set_local_model_name(self, model_name):
        self._local_model_name = model_name
B
barrierye 已提交
60

B
barrierye 已提交
61 62
    def set_local_timestamp_file(self, timestamp_file):
        self._local_timestamp_file = timestamp_file
B
barrierye 已提交
63

B
barrierye 已提交
64 65
    def set_local_tmp_path(self, tmp_path):
        self._local_tmp_path = tmp_path
B
barrierye 已提交
66

B
barrierye 已提交
67 68 69
    def set_unpacked_filename(self, unpacked_filename):
        self._unpacked_filename = unpacked_filename

B
barrierye 已提交
70 71 72 73 74 75 76 77 78 79 80
    def _check_params(self):
        if self._remote_path is None:
            raise Exception('remote_path not set.')
        if self._remote_model_name is None:
            raise Exception('remote_model_name not set.')
        if self._remote_donefile_name is None:
            raise Exception('remote_donefile_name not set.')
        if self._local_model_name is None:
            raise Exception('local_model_name not set.')
        if self._local_path is None:
            raise Exception('local_path not set.')
B
barrierye 已提交
81 82
        if self._local_timestamp_file is None:
            raise Exception('local_timestamp_file not set.')
B
barrierye 已提交
83 84
        if self._local_tmp_path is None:
            raise Exception('local_tmp_path not set.')
B
barrierye 已提交
85

B
barrierye 已提交
86 87 88 89 90
    def _decompress_model_file(self, local_tmp_path, model_name,
                               unpacked_filename):
        import sys
        print(unpacked_filename)
        sys.stdout.flush()
B
barrierye 已提交
91 92 93
        if unpacked_filename is None:
            return model_name
        tar_model_path = os.path.join(local_tmp_path, model_name)
B
barrierye 已提交
94 95
        print(tar_model_path)
        sys.stdout.flush()
B
barrierye 已提交
96 97 98 99 100 101
        if not tarfile.is_tarfile(tar_model_path):
            raise Exception(
                'the model({}) of remote production is not a tar packaged file type.'.
                format(tar_model_path))
        try:
            tar = tarfile.open(tar_model_path)
B
barrierye 已提交
102
            tar.extractall(local_tmp_path)
B
barrierye 已提交
103
            tar.close()
B
barrierye 已提交
104 105
            print('ok')
            sys.stdout.flush()
B
barrierye 已提交
106 107 108 109 110 111 112 113 114 115 116 117
        except:
            raise Exception(
                'Decompressing failed, please check your permission of {} or disk space left.'.
                foemat(local_tmp_path))
        finally:
            os.remove(tar_model_path)
            if not os.path.exists(unpacked_filename):
                raise Exception(
                    '{} not exists. Please check the unpacked_filename parameter.'.
                    format(unpacked_filename))
            return unpacked_filename

B
barrierye 已提交
118
    def run(self):
B
barrierye 已提交
119
        '''
B
barrierye 已提交
120
        Monitor the remote model by polling and update the local model.
B
barrierye 已提交
121
        '''
B
barrierye 已提交
122
        self._check_params()
B
barrierye 已提交
123 124
        if not os.path.exists(self._local_tmp_path):
            os.makedirs(self._local_tmp_path)
B
barrierye 已提交
125
        while True:
B
barrierye 已提交
126
            [flag, timestamp] = self._exist_remote_file(
B
barrierye 已提交
127 128
                self._remote_path, self._remote_donefile_name,
                self._local_tmp_path)
B
barrierye 已提交
129 130 131 132
            if flag:
                if self._remote_donefile_timestamp is None or \
                        timestamp != self._remote_donefile_timestamp:
                    self._remote_donefile_timestamp = timestamp
B
barrierye 已提交
133 134 135 136 137
                    self._pull_remote_dir(self._remote_path,
                                          self._remote_model_name,
                                          self._local_tmp_path)
                    print('{} [INFO] pull remote model'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
B
barrierye 已提交
138
                    unpacked_filename = self._decompress_model_file(
B
barrierye 已提交
139
                        self._local_tmp_path, self._remote_model_name,
B
barrierye 已提交
140 141 142
                        self._unpacked_filename)
                    self._update_local_model(
                        self._local_tmp_path, unpacked_filename,
B
barrierye 已提交
143 144 145 146 147
                        self._local_path, self._local_model_name)
                    print('{} [INFO] update model'.format(datetime.datetime.now(
                    ).strftime('%Y-%m-%d %H:%M:%S')))
                    self._update_local_donefile(self._local_path,
                                                self._local_model_name,
B
barrierye 已提交
148
                                                self._local_timestamp_file)
B
barrierye 已提交
149 150
                    print('{} [INFO] update local donefile'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
B
barrierye 已提交
151
            else:
B
barrierye 已提交
152 153 154 155
                print('{} [INFO] no donefile.'.format(datetime.datetime.now(
                ).strftime('%Y-%m-%d %H:%M:%S')))
            print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime(
                '%Y-%m-%d %H:%M:%S'), self._interval))
B
barrierye 已提交
156
            time.sleep(self._interval)
B
barrierye 已提交
157

B
barrierye 已提交
158
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
159
        raise Exception('This function must be inherited.')
B
barrierye 已提交
160

B
barrierye 已提交
161
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
162
        raise Exception('This function must be inherited.')
B
barrierye 已提交
163

B
barrierye 已提交
164 165 166 167 168 169 170 171 172
    def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
                            local_model_name):
        tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
        local_model_path = os.path.join(local_path, local_model_name)
        cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
        if os.system(cmd) != 0:
            raise Exception('update local model failed.')

    def _update_local_donefile(self, local_path, local_model_name,
B
barrierye 已提交
173
                               local_timestamp_file):
B
barrierye 已提交
174
        donefile_path = os.path.join(local_path, local_model_name,
B
barrierye 已提交
175
                                     local_timestamp_file)
B
barrierye 已提交
176 177 178
        cmd = 'touch {}'.format(donefile_path)
        if os.system(cmd) != 0:
            raise Exception('update local donefile failed.')
B
barrierye 已提交
179

B
barrierye 已提交
180

B
barrierye 已提交
181 182 183 184
class AFSMonitor(Monitor):
    ''' AFS Monitor(by hadoop-client). '''

    def __init__(self,
B
barrierye 已提交
185
                 hadoop_bin,
B
barrierye 已提交
186 187 188 189
                 hadoop_host=None,
                 hadoop_ugi=None,
                 interval=10):
        super(AFSMonitor, self).__init__(interval)
B
barrierye 已提交
190
        self._hadoop_bin = hadoop_bin
B
barrierye 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
        self._hadoop_host = hadoop_host
        self._hadoop_ugi = hadoop_ugi
        self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
        if not self._hadoop_host and not self._hadoop_ugi:
            self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
                self._hadoop_host, self._hadoop_ugi)

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
        cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath)
        [status, output] = commands.getstatusoutput(cmd)
        if status == 0:
            [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1]
            timestr = mdate + mtime
            return [True, timestr]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        # remove old file before pull remote dir
        local_dirpath = os.path.join(local_tmp_path, dirname)
        if os.path.exists(local_dirpath):
            shutil.rmtree(local_dirpath)
        remote_dirpath = os.path.join(remote_path, dirname)
        cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath,
                                     local_dirpath)
        if os.system(cmd) != 0:
            raise Exception('pull remote dir failed.')


B
barrierye 已提交
221 222 223 224 225 226
class HDFSMonitor(Monitor):
    ''' HDFS Monitor. '''

    def __init__(self, bin_path, interval=10):
        super(HDFSMonitor, self).__init__(interval)
        self._hdfs_bin_path = bin_path
B
barrierye 已提交
227
        self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
B
barrierye 已提交
228

B
barrierye 已提交
229
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
230
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
231
        cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
B
barrierye 已提交
232 233 234 235 236 237 238 239
        [status, timestamp] = commands.getstatusoutput(cmd)
        if status == 0:
            return [True, timestamp]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
240 241
        cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
                                        local_tmp_path)
B
barrierye 已提交
242 243 244
        if os.system(cmd) != 0:
            raise Exception('pull remote dir failed.')

B
barrierye 已提交
245 246

class FTPMonitor(Monitor):
B
barrierye 已提交
247 248
    ''' FTP Monitor. '''

B
barrierye 已提交
249
    def __init__(self, host, port, username="", password="", interval=10):
B
barrierye 已提交
250
        super(FTPMonitor, self).__init__(interval)
B
barrierye 已提交
251
        import ftplib
B
barrierye 已提交
252
        self._ftp = ftplib.FTP()
B
barrierye 已提交
253
        self._ftp.connect(host, port)
B
barrierye 已提交
254
        self._ftp.login(username, password)
B
barrierye 已提交
255
        self._ftp_url = 'ftp://{}:{}/'.format(host, port)
B
barrierye 已提交
256

B
barrierye 已提交
257
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
258
        import ftplib
B
barrierye 已提交
259
        try:
B
barrierye 已提交
260 261
            self._ftp.cwd(path)
            timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
B
barrierye 已提交
262
            )
B
barrierye 已提交
263 264 265
            return [True, timestamp]
        except ftplib.error_perm:
            return [False, None]
B
barrierye 已提交
266

B
barrierye 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279
    def _download_remote_file(self,
                              remote_path,
                              remote_filename,
                              local_tmp_path,
                              overwrite=True):
        local_fullpath = os.path.join(local_tmp_path, remote_filename)
        if not overwrite and os.path.isfile(fullpath):
            return
        else:
            with open(local_fullpath, 'wb') as f:
                self._ftp.cwd(remote_path)
                self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)

B
barrierye 已提交
280 281
    def _download_remote_files(self,
                               remote_path,
B
barrierye 已提交
282 283 284
                               remote_dirname,
                               local_tmp_path,
                               overwrite=True):
B
barrierye 已提交
285
        import ftplib
B
barrierye 已提交
286
        remote_dirpath = os.path.join(remote_path, remote_dirname)
B
barrierye 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
        # Check whether remote_dirpath is a file or a folder
        try:
            self._ftp.cwd(remote_dirpath)

            local_dirpath = os.path.join(local_tmp_path, remote_dirname)
            if not os.path.exists(local_dirpath):
                os.mkdir(local_dirpath)

            output = []
            self._ftp.dir(output.append)
            for line in output:
                [attr, _, _, _, _, _, _, _, name] = line.split()
                if attr[0] == 'd':
                    self._download_remote_files(
                        os.path.join(remote_path, remote_dirname), name,
                        os.path.join(local_tmp_path, remote_dirname), overwrite)
B
barrierye 已提交
303
                else:
B
barrierye 已提交
304 305 306 307 308 309
                    self._download_remote_file(remote_dirname, name,
                                               local_tmp_path, overwrite)
        except ftplib.error_perm:
            self._download_remote_file(remote_path, remote_dirname,
                                       local_tmp_path, overwrite)
            return
B
barrierye 已提交
310 311

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
312
        self._download_remote_files(
B
barrierye 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
            remote_path, dirname, local_tmp_path, overwrite=True)


class GeneralMonitor(Monitor):
    ''' General Monitor. '''

    def __init__(self, host, interval=10):
        super(GeneralMonitor, self).__init__(interval)
        self._host = host

    def _get_local_file_timestamp(self, filename):
        return os.path.getmtime(filename)

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
        url = '{}/{}'.format(self._host, remote_filepath)
        cmd = 'wget -N -P {} {}'.format(local_tmp_path, url)
        if os.system(cmd) != 0:
            return [False, None]
        else:
            timestamp = self._get_local_file_timestamp(
                os.path.join(local_tmp_path, filename))
            return [True, timestamp]

B
barrierye 已提交
337
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
338 339 340
        remote_dirpath = os.path.join(remote_path, dirname)
        url = '{}/{}'.format(self._host, remote_dirpath)
        cmd = 'wget -nH -r -P {} {} &> /dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
341
        if os.system(cmd) != 0:
B
barrierye 已提交
342
            raise Exception('pull remote dir failed.')
B
barrierye 已提交
343

B
barrierye 已提交
344

B
barrierye 已提交
345
def parse_args():
B
barrierye 已提交
346
    ''' parse args. '''
B
barrierye 已提交
347 348
    parser = argparse.ArgumentParser(description="Monitor")
    parser.add_argument(
B
barrierye 已提交
349
        "--type", type=str, default='general', help="Type of remote server")
B
barrierye 已提交
350 351 352
    parser.add_argument(
        "--remote_path", type=str, required=True, help="Remote path")
    parser.add_argument(
B
barrierye 已提交
353 354 355
        "--remote_model_name",
        type=str,
        required=True,
B
barrierye 已提交
356 357
        help="Remote model name")
    parser.add_argument(
B
barrierye 已提交
358 359 360
        "--remote_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
361 362 363 364 365 366
        help="Remote donefile name")
    parser.add_argument(
        "--local_path", type=str, required=True, help="Local path")
    parser.add_argument(
        "--local_model_name", type=str, required=True, help="Local model name")
    parser.add_argument(
B
barrierye 已提交
367
        "--local_timestamp_file",
B
barrierye 已提交
368
        type=str,
B
barrierye 已提交
369 370
        default='fluid_time_file',
        help="Local timestamp file name(fluid_time_file in model file)")
B
barrierye 已提交
371
    parser.add_argument(
B
barrierye 已提交
372 373 374 375
        "--local_tmp_path",
        type=str,
        default='_serving_monitor_tmp',
        help="Local tmp path")
B
barrierye 已提交
376 377 378 379 380 381
    parser.add_argument(
        "--unpacked_filename",
        type=str,
        default=None,
        help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
    )
B
barrierye 已提交
382 383
    parser.add_argument(
        "--interval", type=int, default=10, help="Time interval")
B
barrierye 已提交
384
    # general monitor
B
barrierye 已提交
385
    parser.add_argument(
B
barrierye 已提交
386
        "--general_host", type=str, help="Host of general remote server")
B
barrierye 已提交
387
    # hdfs monitor
B
barrierye 已提交
388
    parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
B
barrierye 已提交
389
    # ftp monitor
B
barrierye 已提交
390 391 392 393 394 395
    parser.add_argument("--ftp_host", type=str, help="Host of ftp")
    parser.add_argument("--ftp_port", type=int, help="Port of ftp")
    parser.add_argument(
        "--ftp_username", type=str, default='', help="Username of ftp")
    parser.add_argument(
        "--ftp_password", type=str, default='', help="Password of ftp")
B
barrierye 已提交
396 397
    # afs monitor
    parser.add_argument(
B
barrierye 已提交
398
        "--hadoop_bin", type=str, help="Hadoop_bin_path for afs")
B
barrierye 已提交
399 400 401 402
    parser.add_argument(
        "--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
    parser.add_argument(
        "--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs")
B
barrierye 已提交
403 404
    return parser.parse_args()

B
barrierye 已提交
405

B
barrierye 已提交
406
def get_monitor(mtype):
B
barrierye 已提交
407 408 409 410 411 412 413 414
    """ generator monitor instance.

    Args:
        mtype: type of monitor

    Returns:
        monitor instance.
    """
B
barrierye 已提交
415
    if mtype == 'ftp':
B
barrierye 已提交
416 417 418 419 420 421
        return FTPMonitor(
            args.ftp_host,
            args.ftp_port,
            username=args.ftp_username,
            password=args.ftp_password,
            interval=args.interval)
B
barrierye 已提交
422 423
    elif mtype == 'hdfs':
        return HDFSMonitor(args.hdfs_bin, interval=args.interval)
B
barrierye 已提交
424 425
    elif mtype == 'general':
        return GeneralMonitor(args.general_host, interval=args.interval)
B
barrierye 已提交
426 427
    elif mtype == 'afs':
        return AFSMonitor(
B
barrierye 已提交
428
            args.hadoop_bin,
B
barrierye 已提交
429 430 431
            args.hadoop_host,
            args.hadoop_ugi,
            interval=args.interval)
B
barrierye 已提交
432 433 434 435 436 437 438 439 440 441
    else:
        raise Exception('unsupport type.')


def start_monitor(monitor, args):
    monitor.set_remote_path(args.remote_path)
    monitor.set_remote_model_name(args.remote_model_name)
    monitor.set_remote_donefile_name(args.remote_donefile_name)
    monitor.set_local_path(args.local_path)
    monitor.set_local_model_name(args.local_model_name)
B
barrierye 已提交
442
    monitor.set_local_timestamp_file(args.local_timestamp_file)
B
barrierye 已提交
443
    monitor.set_local_tmp_path(args.local_tmp_path)
B
barrierye 已提交
444
    monitor.set_unpacked_filename(args.unpacked_filename)
B
barrierye 已提交
445
    monitor.run()
B
barrierye 已提交
446

B
barrierye 已提交
447

B
barrierye 已提交
448 449
if __name__ == "__main__":
    args = parse_args()
B
barrierye 已提交
450 451
    monitor = get_monitor(args.type)
    start_monitor(monitor, args)