monitor.py 19.9 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14 15 16 17 18 19 20 21 22
"""
Usage:
    Start monitor with one line command
    Example:
        python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
B
barrierye 已提交
23 24
import commands
import datetime
B
barrierye 已提交
25
import shutil
B
barrierye 已提交
26
import tarfile
B
barrierye 已提交
27 28 29
import logging

_LOGGER = logging.getLogger(__name__)
B
barrierye 已提交
30 31 32


class Monitor(object):
B
barrierye 已提交
33 34 35 36
    '''
    Monitor base class. It is used to monitor the remote model, pull and update the local model.
    '''

B
barrierye 已提交
37 38 39 40 41 42
    def __init__(self, interval):
        self._remote_path = None
        self._remote_model_name = None
        self._remote_donefile_name = None
        self._local_path = None
        self._local_model_name = None
B
barrierye 已提交
43
        self._local_timestamp_file = None
B
barrierye 已提交
44 45
        self._interval = interval
        self._remote_donefile_timestamp = None
B
barrierye 已提交
46
        self._local_tmp_path = None
B
barrierye 已提交
47
        self._unpacked_filename = None
B
barrierye 已提交
48

B
barrierye 已提交
49 50
    def set_remote_path(self, remote_path):
        self._remote_path = remote_path
B
barrierye 已提交
51

B
barrierye 已提交
52 53
    def set_remote_model_name(self, model_name):
        self._remote_model_name = model_name
B
barrierye 已提交
54

B
barrierye 已提交
55 56
    def set_remote_donefile_name(self, donefile_name):
        self._remote_donefile_name = donefile_name
B
barrierye 已提交
57

B
barrierye 已提交
58 59
    def set_local_path(self, local_path):
        self._local_path = local_path
B
barrierye 已提交
60

B
barrierye 已提交
61 62
    def set_local_model_name(self, model_name):
        self._local_model_name = model_name
B
barrierye 已提交
63

B
barrierye 已提交
64 65
    def set_local_timestamp_file(self, timestamp_file):
        self._local_timestamp_file = timestamp_file
B
barrierye 已提交
66

B
barrierye 已提交
67 68
    def set_local_tmp_path(self, tmp_path):
        self._local_tmp_path = tmp_path
B
barrierye 已提交
69

B
barrierye 已提交
70 71 72
    def set_unpacked_filename(self, unpacked_filename):
        self._unpacked_filename = unpacked_filename

B
barrierye 已提交
73 74 75 76
    def _check_param_help(self, param_name, param_value):
        return "Please check the {}({}) parameter.".format(param_name,
                                                           param_value)

B
barrierye 已提交
77 78 79 80 81 82 83 84 85
    def _check_params(self, params):
        for param in params:
            if getattr(self, param, None) is None:
                raise Exception('{} not set.'.format(param))

    def _print_params(self, params_name):
        self._check_params(params_name)
        for name in params_name:
            _LOGGER.info('{}: {}'.format(name, getattr(self, name)))
B
barrierye 已提交
86

B
barrierye 已提交
87 88
    def _decompress_model_file(self, local_tmp_path, model_name,
                               unpacked_filename):
B
barrierye 已提交
89
        if unpacked_filename is None:
B
barrierye 已提交
90 91
            _LOGGER.debug('remote file({}) is already unpacked.'.format(
                model_name))
B
barrierye 已提交
92 93
            return model_name
        tar_model_path = os.path.join(local_tmp_path, model_name)
B
barrierye 已提交
94
        _LOGGER.info("try to unpack remote file({})".format(tar_model_path))
B
barrierye 已提交
95
        if not tarfile.is_tarfile(tar_model_path):
B
barrierye 已提交
96 97
            raise Exception('not a tar packaged file type. {}'.format(
                self._check_param_help('remote_model_name', model_name)))
B
barrierye 已提交
98
        try:
B
barrierye 已提交
99
            _LOGGER.info('unpack remote file({}).'.format(model_name))
B
barrierye 已提交
100
            tar = tarfile.open(tar_model_path)
B
barrierye 已提交
101
            tar.extractall(local_tmp_path)
B
barrierye 已提交
102 103 104
            tar.close()
        except:
            raise Exception(
B
barrierye 已提交
105 106
                'Decompressing failed, maybe no disk space left. {}'.foemat(
                    self._check_param_help('local_tmp_path', local_tmp_path)))
B
barrierye 已提交
107 108
        finally:
            os.remove(tar_model_path)
B
barrierye 已提交
109
            _LOGGER.debug('remove packed file({}).'.format(tar_model_path))
B
barrierye 已提交
110 111
            _LOGGER.info('using unpacked filename: {}.'.format(
                unpacked_filename))
B
barrierye 已提交
112 113
            if not os.path.exists(
                    os.path.join(local_tmp_path, unpacked_filename)):
B
barrierye 已提交
114 115 116
                raise Exception('file not exist. {}'.format(
                    self._check_param_help('unpacked_filename',
                                           unpacked_filename)))
B
barrierye 已提交
117 118
            return unpacked_filename

B
barrierye 已提交
119
    def run(self):
B
barrierye 已提交
120
        '''
B
barrierye 已提交
121
        Monitor the remote model by polling and update the local model.
B
barrierye 已提交
122
        '''
B
barrierye 已提交
123 124 125 126 127 128
        params = [
            '_remote_path', '_remote_model_name', '_remote_donefile_name',
            '_local_model_name', '_local_path', '_local_timestamp_file',
            '_local_tmp_path', '_interval'
        ]
        self._print_params(params)
B
barrierye 已提交
129 130 131 132 133
        local_tmp_path = os.path.join(self._local_path, self._local_tmp_path)
        _LOGGER.info('local_tmp_path: {}'.format(local_tmp_path))
        if not os.path.exists(local_tmp_path):
            _LOGGER.info('mkdir: {}'.format(local_tmp_path))
            os.makedirs(local_tmp_path)
B
barrierye 已提交
134
        while True:
B
barrierye 已提交
135
            [flag, timestamp] = self._exist_remote_file(
B
barrierye 已提交
136
                self._remote_path, self._remote_donefile_name, local_tmp_path)
B
barrierye 已提交
137 138 139
            if flag:
                if self._remote_donefile_timestamp is None or \
                        timestamp != self._remote_donefile_timestamp:
B
barrierye 已提交
140 141
                    _LOGGER.info('doneilfe({}) changed.'.format(
                        self._remote_donefile_name))
B
barrierye 已提交
142
                    self._remote_donefile_timestamp = timestamp
B
barrierye 已提交
143 144
                    self._pull_remote_dir(self._remote_path,
                                          self._remote_model_name,
B
barrierye 已提交
145
                                          local_tmp_path)
B
barrierye 已提交
146 147
                    _LOGGER.info('pull remote model({}).'.format(
                        self._remote_model_name))
B
barrierye 已提交
148
                    unpacked_filename = self._decompress_model_file(
B
barrierye 已提交
149
                        local_tmp_path, self._remote_model_name,
B
barrierye 已提交
150
                        self._unpacked_filename)
B
barrierye 已提交
151 152 153
                    self._update_local_model(local_tmp_path, unpacked_filename,
                                             self._local_path,
                                             self._local_model_name)
B
barrierye 已提交
154 155
                    _LOGGER.info('update local model({}).'.format(
                        self._local_model_name))
B
barrierye 已提交
156 157
                    self._update_local_donefile(self._local_path,
                                                self._local_model_name,
B
barrierye 已提交
158
                                                self._local_timestamp_file)
B
barrierye 已提交
159 160
                    _LOGGER.info('update model timestamp({}).'.format(
                        self._local_timestamp_file))
B
barrierye 已提交
161
            else:
B
barrierye 已提交
162 163 164
                _LOGGER.info('remote({}) has no donefile.'.format(
                    self._remote_path))
            _LOGGER.info('sleep {}s.'.format(self._interval))
B
barrierye 已提交
165
            time.sleep(self._interval)
B
barrierye 已提交
166

B
barrierye 已提交
167
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
168
        raise Exception('This function must be inherited.')
B
barrierye 已提交
169

B
barrierye 已提交
170
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
171
        raise Exception('This function must be inherited.')
B
barrierye 已提交
172

B
barrierye 已提交
173 174 175 176 177
    def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
                            local_model_name):
        tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
        local_model_path = os.path.join(local_path, local_model_name)
        cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
B
barrierye 已提交
178
        _LOGGER.debug('update model cmd: {}'.format(cmd))
B
barrierye 已提交
179 180 181 182
        if os.system(cmd) != 0:
            raise Exception('update local model failed.')

    def _update_local_donefile(self, local_path, local_model_name,
B
barrierye 已提交
183
                               local_timestamp_file):
B
barrierye 已提交
184
        donefile_path = os.path.join(local_path, local_model_name,
B
barrierye 已提交
185
                                     local_timestamp_file)
B
barrierye 已提交
186
        cmd = 'touch {}'.format(donefile_path)
B
barrierye 已提交
187
        _LOGGER.debug('update timestamp cmd: {}'.format(cmd))
B
barrierye 已提交
188 189
        if os.system(cmd) != 0:
            raise Exception('update local donefile failed.')
B
barrierye 已提交
190

B
barrierye 已提交
191

B
barrierye 已提交
192 193 194 195 196
class HadoopMonitor(Monitor):
    ''' Monitor HDFS or AFS by Hadoop-client. '''

    def __init__(self, hadoop_bin, fs_name='', fs_ugi='', interval=10):
        super(HadoopMonitor, self).__init__(interval)
B
barrierye 已提交
197
        self._hadoop_bin = hadoop_bin
B
barrierye 已提交
198 199 200
        self._fs_name = fs_name
        self._fs_ugi = fs_ugi
        self._print_params(['_hadoop_bin', '_fs_name', '_fs_ugi'])
B
barrierye 已提交
201
        self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
B
barrierye 已提交
202 203 204 205 206
        if self._fs_name:
            self._cmd_prefix += '-D fs.default.name={} '.format(self._fs_name)
        if self._fs_ugi:
            self._cmd_prefix += '-D hadoop.job.ugi={} '.format(self._fs_ugi)
        _LOGGER.info('Hadoop prefix cmd: {}'.format(self._cmd_prefix))
B
barrierye 已提交
207 208 209

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
210 211
        cmd = '{} -ls {} 2>/dev/null'.format(self._cmd_prefix, remote_filepath)
        _LOGGER.debug('check cmd: {}'.format(cmd))
B
barrierye 已提交
212
        [status, output] = commands.getstatusoutput(cmd)
B
barrierye 已提交
213
        _LOGGER.debug('resp: {}'.format(output))
B
barrierye 已提交
214
        if status == 0:
B
barrierye 已提交
215
            [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1].split()
B
barrierye 已提交
216 217 218 219 220 221 222 223 224
            timestr = mdate + mtime
            return [True, timestr]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        # remove old file before pull remote dir
        local_dirpath = os.path.join(local_tmp_path, dirname)
        if os.path.exists(local_dirpath):
B
barrierye 已提交
225
            _LOGGER.info('remove old temporary model file({}).'.format(dirname))
B
barrierye 已提交
226 227
            shutil.rmtree(local_dirpath)
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
228 229 230
        cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
                                                 remote_dirpath, local_dirpath)
        _LOGGER.debug('pull cmd: {}'.format(cmd))
B
barrierye 已提交
231
        if os.system(cmd) != 0:
B
barrierye 已提交
232 233
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
234 235


B
barrierye 已提交
236
class FTPMonitor(Monitor):
B
barrierye 已提交
237 238
    ''' FTP Monitor. '''

B
barrierye 已提交
239
    def __init__(self, host, port, username="", password="", interval=10):
B
barrierye 已提交
240
        super(FTPMonitor, self).__init__(interval)
B
barrierye 已提交
241
        import ftplib
B
barrierye 已提交
242
        self._ftp = ftplib.FTP()
B
barrierye 已提交
243 244 245 246 247 248 249 250
        self._ftp_host = host
        self._ftp_port = port
        self._ftp_username = username
        self._ftp_password = password
        self._ftp.connect(self._ftp_host, self._ftp_port)
        self._ftp.login(self._ftp_username, self._ftp_password)
        self._print_params(
            ['_ftp_host', '_ftp_port', '_ftp_username', '_ftp_password'])
B
barrierye 已提交
251

B
barrierye 已提交
252
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
253
        import ftplib
B
barrierye 已提交
254
        try:
B
barrierye 已提交
255
            _LOGGER.debug('cwd: {}'.format(path))
B
barrierye 已提交
256 257
            self._ftp.cwd(path)
            timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
B
barrierye 已提交
258
            )
B
barrierye 已提交
259 260
            return [True, timestamp]
        except ftplib.error_perm:
B
barrierye 已提交
261
            _LOGGER.debug('remote file({}) not exist.'.format(filename))
B
barrierye 已提交
262
            return [False, None]
B
barrierye 已提交
263

B
barrierye 已提交
264 265 266 267 268 269 270 271 272 273
    def _download_remote_file(self,
                              remote_path,
                              remote_filename,
                              local_tmp_path,
                              overwrite=True):
        local_fullpath = os.path.join(local_tmp_path, remote_filename)
        if not overwrite and os.path.isfile(fullpath):
            return
        else:
            with open(local_fullpath, 'wb') as f:
B
barrierye 已提交
274
                _LOGGER.debug('cwd: {}'.format(remote_path))
B
barrierye 已提交
275
                self._ftp.cwd(remote_path)
B
barrierye 已提交
276 277
                _LOGGER.debug('download remote file({})'.format(
                    remote_filename))
B
barrierye 已提交
278 279
                self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)

B
barrierye 已提交
280 281
    def _download_remote_files(self,
                               remote_path,
B
barrierye 已提交
282 283 284
                               remote_dirname,
                               local_tmp_path,
                               overwrite=True):
B
barrierye 已提交
285
        import ftplib
B
barrierye 已提交
286
        remote_dirpath = os.path.join(remote_path, remote_dirname)
B
barrierye 已提交
287 288
        # Check whether remote_dirpath is a file or a folder
        try:
B
barrierye 已提交
289
            _LOGGER.debug('cwd: {}'.format(remote_dirpath))
B
barrierye 已提交
290
            self._ftp.cwd(remote_dirpath)
B
barrierye 已提交
291
            _LOGGER.debug('{} is folder.'.format(remote_dirname))
B
barrierye 已提交
292 293 294

            local_dirpath = os.path.join(local_tmp_path, remote_dirname)
            if not os.path.exists(local_dirpath):
B
barrierye 已提交
295
                _LOGGER.info('mkdir: {}'.format(local_dirpath))
B
barrierye 已提交
296 297 298 299 300 301 302 303 304 305
                os.mkdir(local_dirpath)

            output = []
            self._ftp.dir(output.append)
            for line in output:
                [attr, _, _, _, _, _, _, _, name] = line.split()
                if attr[0] == 'd':
                    self._download_remote_files(
                        os.path.join(remote_path, remote_dirname), name,
                        os.path.join(local_tmp_path, remote_dirname), overwrite)
B
barrierye 已提交
306
                else:
B
barrierye 已提交
307 308 309
                    self._download_remote_file(remote_dirname, name,
                                               local_tmp_path, overwrite)
        except ftplib.error_perm:
B
barrierye 已提交
310
            _LOGGER.debug('{} is file.'.format(remote_dirname))
B
barrierye 已提交
311 312 313
            self._download_remote_file(remote_path, remote_dirname,
                                       local_tmp_path, overwrite)
            return
B
barrierye 已提交
314 315

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
316
        self._download_remote_files(
B
barrierye 已提交
317 318 319 320 321 322 323 324
            remote_path, dirname, local_tmp_path, overwrite=True)


class GeneralMonitor(Monitor):
    ''' General Monitor. '''

    def __init__(self, host, interval=10):
        super(GeneralMonitor, self).__init__(interval)
B
barrierye 已提交
325 326
        self._general_host = host
        self._print_params(['_general_host'])
B
barrierye 已提交
327 328 329 330

    def _get_local_file_timestamp(self, filename):
        return os.path.getmtime(filename)

B
barrierye 已提交
331 332
    def _exist_remote_file(self, remote_path, filename, local_tmp_path):
        remote_filepath = os.path.join(remote_path, filename)
B
barrierye 已提交
333 334
        url = '{}/{}'.format(self._general_host, remote_filepath)
        _LOGGER.debug('remote file url: {}'.format(url))
B
barrierye 已提交
335 336
        # only for check donefile, which is not a folder.
        cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
337
        _LOGGER.debug('wget cmd: {}'.format(cmd))
B
barrierye 已提交
338
        if os.system(cmd) != 0:
B
barrierye 已提交
339
            _LOGGER.debug('remote file({}) not exist.'.format(remote_filepath))
B
barrierye 已提交
340 341 342 343 344 345
            return [False, None]
        else:
            timestamp = self._get_local_file_timestamp(
                os.path.join(local_tmp_path, filename))
            return [True, timestamp]

B
barrierye 已提交
346
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
347
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
348 349
        url = '{}/{}'.format(self._general_host, remote_dirpath)
        _LOGGER.debug('remote file url: {}'.format(url))
B
barrierye 已提交
350 351 352 353 354 355 356
        if self._unpacked_filename is None:
            # the remote file is model folder
            cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(
                os.path.join(local_tmp_path, dirname), url)
        else:
            # the remote file is a package file
            cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
357
        _LOGGER.debug('wget cmd: {}'.format(cmd))
B
barrierye 已提交
358 359
        if os.system(cmd) != 0:
            raise Exception('{} failed.'.format(cmd))
B
barrierye 已提交
360
        if os.system(cmd) != 0:
B
barrierye 已提交
361 362
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
363

B
barrierye 已提交
364

B
barrierye 已提交
365
def parse_args():
B
barrierye 已提交
366 367 368 369 370
    """ parse args.

    Returns:
        parser.parse_args().
    """
B
barrierye 已提交
371 372
    parser = argparse.ArgumentParser(description="Monitor")
    parser.add_argument(
B
barrierye 已提交
373
        "--type", type=str, default='general', help="Type of remote server")
B
barrierye 已提交
374
    parser.add_argument(
B
barrierye 已提交
375 376 377 378
        "--remote_path",
        type=str,
        required=True,
        help="The base path for the remote")
B
barrierye 已提交
379
    parser.add_argument(
B
barrierye 已提交
380 381 382
        "--remote_model_name",
        type=str,
        required=True,
B
barrierye 已提交
383
        help="The model name to be pulled from the remote")
B
barrierye 已提交
384
    parser.add_argument(
B
barrierye 已提交
385 386 387
        "--remote_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
388 389
        help="The donefile name that marks the completion of the remote model update"
    )
B
barrierye 已提交
390
    parser.add_argument(
B
barrierye 已提交
391
        "--local_path", type=str, required=True, help="Local work path")
B
barrierye 已提交
392 393 394
    parser.add_argument(
        "--local_model_name", type=str, required=True, help="Local model name")
    parser.add_argument(
B
barrierye 已提交
395
        "--local_timestamp_file",
B
barrierye 已提交
396
        type=str,
B
barrierye 已提交
397
        default='fluid_time_file',
B
barrierye 已提交
398 399
        help="The timestamp file used locally for hot loading, The file is considered to be placed in the `local_path/local_model_name` folder."
    )
B
barrierye 已提交
400
    parser.add_argument(
B
barrierye 已提交
401 402 403
        "--local_tmp_path",
        type=str,
        default='_serving_monitor_tmp',
B
barrierye 已提交
404 405
        help="The path of the folder where temporary files are stored locally. If it does not exist, it will be created automatically"
    )
B
barrierye 已提交
406 407 408 409 410 411
    parser.add_argument(
        "--unpacked_filename",
        type=str,
        default=None,
        help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
    )
B
barrierye 已提交
412
    parser.add_argument(
B
barrierye 已提交
413 414 415 416
        "--interval",
        type=int,
        default=10,
        help="The polling interval in seconds")
B
barrierye 已提交
417
    parser.add_argument(
B
barrierye 已提交
418
        "--debug", action='store_true', help="If set, output more details")
B
barrierye 已提交
419
    parser.set_defaults(debug=False)
B
barrierye 已提交
420
    # general monitor
B
barrierye 已提交
421
    parser.add_argument("--general_host", type=str, help="General remote host")
B
barrierye 已提交
422
    # ftp monitor
B
barrierye 已提交
423 424
    parser.add_argument("--ftp_host", type=str, help="FTP remote host")
    parser.add_argument("--ftp_port", type=int, help="FTP remote port")
B
barrierye 已提交
425
    parser.add_argument(
B
barrierye 已提交
426 427 428 429
        "--ftp_username",
        type=str,
        default='',
        help="FTP username. Not used if anonymous access.")
B
barrierye 已提交
430
    parser.add_argument(
B
barrierye 已提交
431 432 433 434
        "--ftp_password",
        type=str,
        default='',
        help="FTP password. Not used if anonymous access")
B
barrierye 已提交
435
    # afs/hdfs monitor
B
barrierye 已提交
436
    parser.add_argument(
B
barrierye 已提交
437
        "--hadoop_bin", type=str, help="Path of Hadoop binary file")
B
barrierye 已提交
438
    parser.add_argument(
B
barrierye 已提交
439
        "--fs_name",
B
barrierye 已提交
440
        type=str,
B
barrierye 已提交
441 442
        default='',
        help="AFS/HDFS fs_name. Not used if set in Hadoop-client.")
B
barrierye 已提交
443
    parser.add_argument(
B
barrierye 已提交
444
        "--fs_ugi",
B
barrierye 已提交
445
        type=str,
B
barrierye 已提交
446 447
        default='',
        help="AFS/HDFS fs_ugi, Not used if set in Hadoop-client")
B
barrierye 已提交
448 449
    return parser.parse_args()

B
barrierye 已提交
450

B
barrierye 已提交
451
def get_monitor(mtype):
B
barrierye 已提交
452 453 454 455 456 457 458 459
    """ generator monitor instance.

    Args:
        mtype: type of monitor

    Returns:
        monitor instance.
    """
B
barrierye 已提交
460
    if mtype == 'ftp':
B
barrierye 已提交
461 462 463 464 465 466 467 468
        return FTPMonitor(
            args.ftp_host,
            args.ftp_port,
            username=args.ftp_username,
            password=args.ftp_password,
            interval=args.interval)
    elif mtype == 'general':
        return GeneralMonitor(args.general_host, interval=args.interval)
B
barrierye 已提交
469 470 471
    elif mtype == 'afs' or mtype == 'hdfs':
        return HadoopMonitor(
            args.hadoop_bin, args.fs_name, args.fs_ugi, interval=args.interval)
B
barrierye 已提交
472 473 474 475 476 477 478 479 480 481
    else:
        raise Exception('unsupport type.')


def start_monitor(monitor, args):
    monitor.set_remote_path(args.remote_path)
    monitor.set_remote_model_name(args.remote_model_name)
    monitor.set_remote_donefile_name(args.remote_donefile_name)
    monitor.set_local_path(args.local_path)
    monitor.set_local_model_name(args.local_model_name)
B
barrierye 已提交
482
    monitor.set_local_timestamp_file(args.local_timestamp_file)
B
barrierye 已提交
483
    monitor.set_local_tmp_path(args.local_tmp_path)
B
barrierye 已提交
484
    monitor.set_unpacked_filename(args.unpacked_filename)
B
barrierye 已提交
485
    monitor.run()
B
barrierye 已提交
486

B
barrierye 已提交
487

B
barrierye 已提交
488 489
if __name__ == "__main__":
    args = parse_args()
B
barrierye 已提交
490 491 492 493 494 495 496 497 498 499
    if args.debug:
        logging.basicConfig(
            format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
            datefmt='%Y-%m-%d %H:%M',
            level=logging.DEBUG)
    else:
        logging.basicConfig(
            format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
            datefmt='%Y-%m-%d %H:%M',
            level=logging.INFO)
B
barrierye 已提交
500 501
    monitor = get_monitor(args.type)
    start_monitor(monitor, args)