monitor.py 17.5 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14 15 16 17 18 19 20 21 22
"""
Usage:
    Start monitor with one line command
    Example:
        python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
B
barrierye 已提交
23 24
import commands
import datetime
B
barrierye 已提交
25
import shutil
B
barrierye 已提交
26
import tarfile
B
barrierye 已提交
27 28 29


class Monitor(object):
B
barrierye 已提交
30 31 32 33
    '''
    Monitor base class. It is used to monitor the remote model, pull and update the local model.
    '''

B
barrierye 已提交
34 35 36 37 38 39
    def __init__(self, interval):
        self._remote_path = None
        self._remote_model_name = None
        self._remote_donefile_name = None
        self._local_path = None
        self._local_model_name = None
B
barrierye 已提交
40
        self._local_timestamp_file = None
B
barrierye 已提交
41 42
        self._interval = interval
        self._remote_donefile_timestamp = None
B
barrierye 已提交
43
        self._local_tmp_path = None
B
barrierye 已提交
44
        self._unpacked_filename = None
B
barrierye 已提交
45

B
barrierye 已提交
46 47
    def set_remote_path(self, remote_path):
        self._remote_path = remote_path
B
barrierye 已提交
48

B
barrierye 已提交
49 50
    def set_remote_model_name(self, model_name):
        self._remote_model_name = model_name
B
barrierye 已提交
51

B
barrierye 已提交
52 53
    def set_remote_donefile_name(self, donefile_name):
        self._remote_donefile_name = donefile_name
B
barrierye 已提交
54

B
barrierye 已提交
55 56
    def set_local_path(self, local_path):
        self._local_path = local_path
B
barrierye 已提交
57

B
barrierye 已提交
58 59
    def set_local_model_name(self, model_name):
        self._local_model_name = model_name
B
barrierye 已提交
60

B
barrierye 已提交
61 62
    def set_local_timestamp_file(self, timestamp_file):
        self._local_timestamp_file = timestamp_file
B
barrierye 已提交
63

B
barrierye 已提交
64 65
    def set_local_tmp_path(self, tmp_path):
        self._local_tmp_path = tmp_path
B
barrierye 已提交
66

B
barrierye 已提交
67 68 69
    def set_unpacked_filename(self, unpacked_filename):
        self._unpacked_filename = unpacked_filename

B
barrierye 已提交
70 71 72 73
    def _check_param_help(self, param_name, param_value):
        return "Please check the {}({}) parameter.".format(param_name,
                                                           param_value)

B
barrierye 已提交
74 75 76 77 78 79 80 81 82 83 84
    def _check_params(self):
        if self._remote_path is None:
            raise Exception('remote_path not set.')
        if self._remote_model_name is None:
            raise Exception('remote_model_name not set.')
        if self._remote_donefile_name is None:
            raise Exception('remote_donefile_name not set.')
        if self._local_model_name is None:
            raise Exception('local_model_name not set.')
        if self._local_path is None:
            raise Exception('local_path not set.')
B
barrierye 已提交
85 86
        if self._local_timestamp_file is None:
            raise Exception('local_timestamp_file not set.')
B
barrierye 已提交
87 88
        if self._local_tmp_path is None:
            raise Exception('local_tmp_path not set.')
B
barrierye 已提交
89

B
barrierye 已提交
90 91
    def _decompress_model_file(self, local_tmp_path, model_name,
                               unpacked_filename):
B
barrierye 已提交
92 93 94 95
        if unpacked_filename is None:
            return model_name
        tar_model_path = os.path.join(local_tmp_path, model_name)
        if not tarfile.is_tarfile(tar_model_path):
B
barrierye 已提交
96 97
            raise Exception('not a tar packaged file type. {}'.format(
                self._check_param_help('remote_model_name', model_name)))
B
barrierye 已提交
98 99
        try:
            tar = tarfile.open(tar_model_path)
B
barrierye 已提交
100
            tar.extractall(local_tmp_path)
B
barrierye 已提交
101 102 103
            tar.close()
        except:
            raise Exception(
B
barrierye 已提交
104 105
                'Decompressing failed, maybe no disk space left. {}'.foemat(
                    self._check_param_help('local_tmp_path', local_tmp_path)))
B
barrierye 已提交
106 107 108
        finally:
            os.remove(tar_model_path)
            if not os.path.exists(unpacked_filename):
B
barrierye 已提交
109 110 111
                raise Exception('file not exist. {}'.format(
                    self._check_param_help('unpacked_filename',
                                           unpacked_filename)))
B
barrierye 已提交
112 113
            return unpacked_filename

B
barrierye 已提交
114
    def run(self):
B
barrierye 已提交
115
        '''
B
barrierye 已提交
116
        Monitor the remote model by polling and update the local model.
B
barrierye 已提交
117
        '''
B
barrierye 已提交
118
        self._check_params()
B
barrierye 已提交
119 120
        if not os.path.exists(self._local_tmp_path):
            os.makedirs(self._local_tmp_path)
B
barrierye 已提交
121
        while True:
B
barrierye 已提交
122
            [flag, timestamp] = self._exist_remote_file(
B
barrierye 已提交
123 124
                self._remote_path, self._remote_donefile_name,
                self._local_tmp_path)
B
barrierye 已提交
125 126 127 128
            if flag:
                if self._remote_donefile_timestamp is None or \
                        timestamp != self._remote_donefile_timestamp:
                    self._remote_donefile_timestamp = timestamp
B
barrierye 已提交
129 130 131 132 133
                    self._pull_remote_dir(self._remote_path,
                                          self._remote_model_name,
                                          self._local_tmp_path)
                    print('{} [INFO] pull remote model'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
B
barrierye 已提交
134
                    unpacked_filename = self._decompress_model_file(
B
barrierye 已提交
135
                        self._local_tmp_path, self._remote_model_name,
B
barrierye 已提交
136 137 138
                        self._unpacked_filename)
                    self._update_local_model(
                        self._local_tmp_path, unpacked_filename,
B
barrierye 已提交
139 140 141 142 143
                        self._local_path, self._local_model_name)
                    print('{} [INFO] update model'.format(datetime.datetime.now(
                    ).strftime('%Y-%m-%d %H:%M:%S')))
                    self._update_local_donefile(self._local_path,
                                                self._local_model_name,
B
barrierye 已提交
144
                                                self._local_timestamp_file)
B
barrierye 已提交
145 146
                    print('{} [INFO] update local donefile'.format(
                        datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
B
barrierye 已提交
147
            else:
B
barrierye 已提交
148 149 150 151
                print('{} [INFO] no donefile.'.format(datetime.datetime.now(
                ).strftime('%Y-%m-%d %H:%M:%S')))
            print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime(
                '%Y-%m-%d %H:%M:%S'), self._interval))
B
barrierye 已提交
152
            time.sleep(self._interval)
B
barrierye 已提交
153

B
barrierye 已提交
154
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
155
        raise Exception('This function must be inherited.')
B
barrierye 已提交
156

B
barrierye 已提交
157
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
158
        raise Exception('This function must be inherited.')
B
barrierye 已提交
159

B
barrierye 已提交
160 161 162 163 164 165 166 167 168
    def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
                            local_model_name):
        tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
        local_model_path = os.path.join(local_path, local_model_name)
        cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
        if os.system(cmd) != 0:
            raise Exception('update local model failed.')

    def _update_local_donefile(self, local_path, local_model_name,
B
barrierye 已提交
169
                               local_timestamp_file):
B
barrierye 已提交
170
        donefile_path = os.path.join(local_path, local_model_name,
B
barrierye 已提交
171
                                     local_timestamp_file)
B
barrierye 已提交
172 173 174
        cmd = 'touch {}'.format(donefile_path)
        if os.system(cmd) != 0:
            raise Exception('update local donefile failed.')
B
barrierye 已提交
175

B
barrierye 已提交
176

B
barrierye 已提交
177 178 179 180
class AFSMonitor(Monitor):
    ''' AFS Monitor(by hadoop-client). '''

    def __init__(self,
B
barrierye 已提交
181
                 hadoop_bin,
B
barrierye 已提交
182 183 184 185
                 hadoop_host=None,
                 hadoop_ugi=None,
                 interval=10):
        super(AFSMonitor, self).__init__(interval)
B
barrierye 已提交
186
        self._hadoop_bin = hadoop_bin
B
barrierye 已提交
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
        self._hadoop_host = hadoop_host
        self._hadoop_ugi = hadoop_ugi
        self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
        if not self._hadoop_host and not self._hadoop_ugi:
            self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
                self._hadoop_host, self._hadoop_ugi)

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
        cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath)
        [status, output] = commands.getstatusoutput(cmd)
        if status == 0:
            [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1]
            timestr = mdate + mtime
            return [True, timestr]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        # remove old file before pull remote dir
        local_dirpath = os.path.join(local_tmp_path, dirname)
        if os.path.exists(local_dirpath):
            shutil.rmtree(local_dirpath)
        remote_dirpath = os.path.join(remote_path, dirname)
        cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath,
                                     local_dirpath)
        if os.system(cmd) != 0:
B
barrierye 已提交
214 215
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
216 217


B
barrierye 已提交
218 219 220 221 222 223
class HDFSMonitor(Monitor):
    ''' HDFS Monitor. '''

    def __init__(self, bin_path, interval=10):
        super(HDFSMonitor, self).__init__(interval)
        self._hdfs_bin_path = bin_path
B
barrierye 已提交
224
        self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
B
barrierye 已提交
225

B
barrierye 已提交
226
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
227
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
228
        cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
B
barrierye 已提交
229 230 231 232 233 234 235 236
        [status, timestamp] = commands.getstatusoutput(cmd)
        if status == 0:
            return [True, timestamp]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
237 238
        cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
                                        local_tmp_path)
B
barrierye 已提交
239
        if os.system(cmd) != 0:
B
barrierye 已提交
240 241
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
242

B
barrierye 已提交
243 244

class FTPMonitor(Monitor):
B
barrierye 已提交
245 246
    ''' FTP Monitor. '''

B
barrierye 已提交
247
    def __init__(self, host, port, username="", password="", interval=10):
B
barrierye 已提交
248
        super(FTPMonitor, self).__init__(interval)
B
barrierye 已提交
249
        import ftplib
B
barrierye 已提交
250
        self._ftp = ftplib.FTP()
B
barrierye 已提交
251
        self._ftp.connect(host, port)
B
barrierye 已提交
252
        self._ftp.login(username, password)
B
barrierye 已提交
253
        self._ftp_url = 'ftp://{}:{}/'.format(host, port)
B
barrierye 已提交
254

B
barrierye 已提交
255
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
256
        import ftplib
B
barrierye 已提交
257
        try:
B
barrierye 已提交
258 259
            self._ftp.cwd(path)
            timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
B
barrierye 已提交
260
            )
B
barrierye 已提交
261 262 263
            return [True, timestamp]
        except ftplib.error_perm:
            return [False, None]
B
barrierye 已提交
264

B
barrierye 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277
    def _download_remote_file(self,
                              remote_path,
                              remote_filename,
                              local_tmp_path,
                              overwrite=True):
        local_fullpath = os.path.join(local_tmp_path, remote_filename)
        if not overwrite and os.path.isfile(fullpath):
            return
        else:
            with open(local_fullpath, 'wb') as f:
                self._ftp.cwd(remote_path)
                self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)

B
barrierye 已提交
278 279
    def _download_remote_files(self,
                               remote_path,
B
barrierye 已提交
280 281 282
                               remote_dirname,
                               local_tmp_path,
                               overwrite=True):
B
barrierye 已提交
283
        import ftplib
B
barrierye 已提交
284
        remote_dirpath = os.path.join(remote_path, remote_dirname)
B
barrierye 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
        # Check whether remote_dirpath is a file or a folder
        try:
            self._ftp.cwd(remote_dirpath)

            local_dirpath = os.path.join(local_tmp_path, remote_dirname)
            if not os.path.exists(local_dirpath):
                os.mkdir(local_dirpath)

            output = []
            self._ftp.dir(output.append)
            for line in output:
                [attr, _, _, _, _, _, _, _, name] = line.split()
                if attr[0] == 'd':
                    self._download_remote_files(
                        os.path.join(remote_path, remote_dirname), name,
                        os.path.join(local_tmp_path, remote_dirname), overwrite)
B
barrierye 已提交
301
                else:
B
barrierye 已提交
302 303 304 305 306 307
                    self._download_remote_file(remote_dirname, name,
                                               local_tmp_path, overwrite)
        except ftplib.error_perm:
            self._download_remote_file(remote_path, remote_dirname,
                                       local_tmp_path, overwrite)
            return
B
barrierye 已提交
308 309

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
310
        self._download_remote_files(
B
barrierye 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
            remote_path, dirname, local_tmp_path, overwrite=True)


class GeneralMonitor(Monitor):
    ''' General Monitor. '''

    def __init__(self, host, interval=10):
        super(GeneralMonitor, self).__init__(interval)
        self._host = host

    def _get_local_file_timestamp(self, filename):
        return os.path.getmtime(filename)

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
        url = '{}/{}'.format(self._host, remote_filepath)
B
barrierye 已提交
327
        cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
328 329 330 331 332 333 334
        if os.system(cmd) != 0:
            return [False, None]
        else:
            timestamp = self._get_local_file_timestamp(
                os.path.join(local_tmp_path, filename))
            return [True, timestamp]

B
barrierye 已提交
335
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
336 337
        remote_dirpath = os.path.join(remote_path, dirname)
        url = '{}/{}'.format(self._host, remote_dirpath)
B
barrierye 已提交
338
        cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
339
        if os.system(cmd) != 0:
B
barrierye 已提交
340 341
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
342

B
barrierye 已提交
343

B
barrierye 已提交
344
def parse_args():
B
barrierye 已提交
345
    ''' parse args. '''
B
barrierye 已提交
346 347
    parser = argparse.ArgumentParser(description="Monitor")
    parser.add_argument(
B
barrierye 已提交
348
        "--type", type=str, default='general', help="Type of remote server")
B
barrierye 已提交
349 350 351
    parser.add_argument(
        "--remote_path", type=str, required=True, help="Remote path")
    parser.add_argument(
B
barrierye 已提交
352 353 354
        "--remote_model_name",
        type=str,
        required=True,
B
barrierye 已提交
355 356
        help="Remote model name")
    parser.add_argument(
B
barrierye 已提交
357 358 359
        "--remote_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
360 361 362 363 364 365
        help="Remote donefile name")
    parser.add_argument(
        "--local_path", type=str, required=True, help="Local path")
    parser.add_argument(
        "--local_model_name", type=str, required=True, help="Local model name")
    parser.add_argument(
B
barrierye 已提交
366
        "--local_timestamp_file",
B
barrierye 已提交
367
        type=str,
B
barrierye 已提交
368 369
        default='fluid_time_file',
        help="Local timestamp file name(fluid_time_file in model file)")
B
barrierye 已提交
370
    parser.add_argument(
B
barrierye 已提交
371 372 373 374
        "--local_tmp_path",
        type=str,
        default='_serving_monitor_tmp',
        help="Local tmp path")
B
barrierye 已提交
375 376 377 378 379 380
    parser.add_argument(
        "--unpacked_filename",
        type=str,
        default=None,
        help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
    )
B
barrierye 已提交
381 382
    parser.add_argument(
        "--interval", type=int, default=10, help="Time interval")
B
barrierye 已提交
383
    # general monitor
B
barrierye 已提交
384
    parser.add_argument(
B
barrierye 已提交
385
        "--general_host", type=str, help="Host of general remote server")
B
barrierye 已提交
386
    # hdfs monitor
B
barrierye 已提交
387
    parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
B
barrierye 已提交
388
    # ftp monitor
B
barrierye 已提交
389 390 391 392 393 394
    parser.add_argument("--ftp_host", type=str, help="Host of ftp")
    parser.add_argument("--ftp_port", type=int, help="Port of ftp")
    parser.add_argument(
        "--ftp_username", type=str, default='', help="Username of ftp")
    parser.add_argument(
        "--ftp_password", type=str, default='', help="Password of ftp")
B
barrierye 已提交
395 396
    # afs monitor
    parser.add_argument(
B
barrierye 已提交
397
        "--hadoop_bin", type=str, help="Hadoop_bin_path for afs")
B
barrierye 已提交
398 399 400 401
    parser.add_argument(
        "--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
    parser.add_argument(
        "--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs")
B
barrierye 已提交
402 403
    return parser.parse_args()

B
barrierye 已提交
404

B
barrierye 已提交
405
def get_monitor(mtype):
B
barrierye 已提交
406 407 408 409 410 411 412 413
    """ generator monitor instance.

    Args:
        mtype: type of monitor

    Returns:
        monitor instance.
    """
B
barrierye 已提交
414
    if mtype == 'ftp':
B
barrierye 已提交
415 416 417 418 419 420
        return FTPMonitor(
            args.ftp_host,
            args.ftp_port,
            username=args.ftp_username,
            password=args.ftp_password,
            interval=args.interval)
B
barrierye 已提交
421 422
    elif mtype == 'hdfs':
        return HDFSMonitor(args.hdfs_bin, interval=args.interval)
B
barrierye 已提交
423 424
    elif mtype == 'general':
        return GeneralMonitor(args.general_host, interval=args.interval)
B
barrierye 已提交
425 426
    elif mtype == 'afs':
        return AFSMonitor(
B
barrierye 已提交
427
            args.hadoop_bin,
B
barrierye 已提交
428 429 430
            args.hadoop_host,
            args.hadoop_ugi,
            interval=args.interval)
B
barrierye 已提交
431 432 433 434 435 436 437 438 439 440
    else:
        raise Exception('unsupport type.')


def start_monitor(monitor, args):
    monitor.set_remote_path(args.remote_path)
    monitor.set_remote_model_name(args.remote_model_name)
    monitor.set_remote_donefile_name(args.remote_donefile_name)
    monitor.set_local_path(args.local_path)
    monitor.set_local_model_name(args.local_model_name)
B
barrierye 已提交
441
    monitor.set_local_timestamp_file(args.local_timestamp_file)
B
barrierye 已提交
442
    monitor.set_local_tmp_path(args.local_tmp_path)
B
barrierye 已提交
443
    monitor.set_unpacked_filename(args.unpacked_filename)
B
barrierye 已提交
444
    monitor.run()
B
barrierye 已提交
445

B
barrierye 已提交
446

B
barrierye 已提交
447 448
if __name__ == "__main__":
    args = parse_args()
B
barrierye 已提交
449 450
    monitor = get_monitor(args.type)
    start_monitor(monitor, args)