monitor.py 19.9 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14 15 16 17 18 19 20 21 22
"""
Usage:
    Start monitor with one line command
    Example:
        python -m paddle_serving_server.monitor
"""
import os
import time
import argparse
B
barrierye 已提交
23 24
import commands
import datetime
B
barrierye 已提交
25
import shutil
B
barrierye 已提交
26
import tarfile
B
barrierye 已提交
27 28 29
import logging

_LOGGER = logging.getLogger(__name__)
B
barrierye 已提交
30 31 32


class Monitor(object):
B
barrierye 已提交
33 34 35 36
    '''
    Monitor base class. It is used to monitor the remote model, pull and update the local model.
    '''

B
barrierye 已提交
37 38 39 40 41 42
    def __init__(self, interval):
        self._remote_path = None
        self._remote_model_name = None
        self._remote_donefile_name = None
        self._local_path = None
        self._local_model_name = None
B
barrierye 已提交
43
        self._local_timestamp_file = None
B
barrierye 已提交
44 45
        self._interval = interval
        self._remote_donefile_timestamp = None
B
barrierye 已提交
46
        self._local_tmp_path = None
B
barrierye 已提交
47
        self._unpacked_filename = None
B
barrierye 已提交
48

B
barrierye 已提交
49 50
    def set_remote_path(self, remote_path):
        self._remote_path = remote_path
B
barrierye 已提交
51

B
barrierye 已提交
52 53
    def set_remote_model_name(self, model_name):
        self._remote_model_name = model_name
B
barrierye 已提交
54

B
barrierye 已提交
55 56
    def set_remote_donefile_name(self, donefile_name):
        self._remote_donefile_name = donefile_name
B
barrierye 已提交
57

B
barrierye 已提交
58 59
    def set_local_path(self, local_path):
        self._local_path = local_path
B
barrierye 已提交
60

B
barrierye 已提交
61 62
    def set_local_model_name(self, model_name):
        self._local_model_name = model_name
B
barrierye 已提交
63

B
barrierye 已提交
64 65
    def set_local_timestamp_file(self, timestamp_file):
        self._local_timestamp_file = timestamp_file
B
barrierye 已提交
66

B
barrierye 已提交
67 68
    def set_local_tmp_path(self, tmp_path):
        self._local_tmp_path = tmp_path
B
barrierye 已提交
69

B
barrierye 已提交
70 71 72
    def set_unpacked_filename(self, unpacked_filename):
        self._unpacked_filename = unpacked_filename

B
barrierye 已提交
73 74 75 76
    def _check_param_help(self, param_name, param_value):
        return "Please check the {}({}) parameter.".format(param_name,
                                                           param_value)

B
barrierye 已提交
77 78 79 80 81 82 83 84 85
    def _check_params(self, params):
        for param in params:
            if getattr(self, param, None) is None:
                raise Exception('{} not set.'.format(param))

    def _print_params(self, params_name):
        self._check_params(params_name)
        for name in params_name:
            _LOGGER.info('{}: {}'.format(name, getattr(self, name)))
B
barrierye 已提交
86

B
barrierye 已提交
87 88
    def _decompress_model_file(self, local_tmp_path, model_name,
                               unpacked_filename):
B
barrierye 已提交
89
        if unpacked_filename is None:
B
barrierye 已提交
90 91
            _LOGGER.debug('remote file({}) is already unpacked.'.format(
                model_name))
B
barrierye 已提交
92 93 94
            return model_name
        tar_model_path = os.path.join(local_tmp_path, model_name)
        if not tarfile.is_tarfile(tar_model_path):
B
barrierye 已提交
95 96
            raise Exception('not a tar packaged file type. {}'.format(
                self._check_param_help('remote_model_name', model_name)))
B
barrierye 已提交
97
        try:
B
barrierye 已提交
98
            _LOGGER.info('unpack remote file({}).'.format(model_name))
B
barrierye 已提交
99
            tar = tarfile.open(tar_model_path)
B
barrierye 已提交
100
            tar.extractall(local_tmp_path)
B
barrierye 已提交
101 102 103
            tar.close()
        except:
            raise Exception(
B
barrierye 已提交
104 105
                'Decompressing failed, maybe no disk space left. {}'.foemat(
                    self._check_param_help('local_tmp_path', local_tmp_path)))
B
barrierye 已提交
106 107
        finally:
            os.remove(tar_model_path)
B
barrierye 已提交
108 109 110
            _LOGGER.debug('remove packed file({}).'.format(model_name))
            _LOGGER.info('using unpacked filename: {}.'.format(
                unpacked_filename))
B
barrierye 已提交
111
            if not os.path.exists(unpacked_filename):
B
barrierye 已提交
112 113 114
                raise Exception('file not exist. {}'.format(
                    self._check_param_help('unpacked_filename',
                                           unpacked_filename)))
B
barrierye 已提交
115 116
            return unpacked_filename

B
barrierye 已提交
117
    def run(self):
B
barrierye 已提交
118
        '''
B
barrierye 已提交
119
        Monitor the remote model by polling and update the local model.
B
barrierye 已提交
120
        '''
B
barrierye 已提交
121 122 123 124 125 126
        params = [
            '_remote_path', '_remote_model_name', '_remote_donefile_name',
            '_local_model_name', '_local_path', '_local_timestamp_file',
            '_local_tmp_path', '_interval'
        ]
        self._print_params(params)
B
barrierye 已提交
127
        if not os.path.exists(self._local_tmp_path):
B
barrierye 已提交
128
            _LOGGER.info('mkdir: {}'.format(self._local_tmp_path))
B
barrierye 已提交
129
            os.makedirs(self._local_tmp_path)
B
barrierye 已提交
130
        while True:
B
barrierye 已提交
131
            [flag, timestamp] = self._exist_remote_file(
B
barrierye 已提交
132 133
                self._remote_path, self._remote_donefile_name,
                self._local_tmp_path)
B
barrierye 已提交
134 135 136
            if flag:
                if self._remote_donefile_timestamp is None or \
                        timestamp != self._remote_donefile_timestamp:
B
barrierye 已提交
137 138
                    _LOGGER.info('doneilfe({}) changed.'.format(
                        self._remote_donefile_name))
B
barrierye 已提交
139
                    self._remote_donefile_timestamp = timestamp
B
barrierye 已提交
140 141 142
                    self._pull_remote_dir(self._remote_path,
                                          self._remote_model_name,
                                          self._local_tmp_path)
B
barrierye 已提交
143 144
                    _LOGGER.info('pull remote model({}).'.format(
                        self._remote_model_name))
B
barrierye 已提交
145
                    unpacked_filename = self._decompress_model_file(
B
barrierye 已提交
146
                        self._local_tmp_path, self._remote_model_name,
B
barrierye 已提交
147 148 149
                        self._unpacked_filename)
                    self._update_local_model(
                        self._local_tmp_path, unpacked_filename,
B
barrierye 已提交
150
                        self._local_path, self._local_model_name)
B
barrierye 已提交
151 152
                    _LOGGER.info('update local model({}).'.format(
                        self._local_model_name))
B
barrierye 已提交
153 154
                    self._update_local_donefile(self._local_path,
                                                self._local_model_name,
B
barrierye 已提交
155
                                                self._local_timestamp_file)
B
barrierye 已提交
156 157
                    _LOGGER.info('update model timestamp({}).'.format(
                        self._local_timestamp_file))
B
barrierye 已提交
158
            else:
B
barrierye 已提交
159 160 161
                _LOGGER.info('remote({}) has no donefile.'.format(
                    self._remote_path))
            _LOGGER.info('sleep {}s.'.format(self._interval))
B
barrierye 已提交
162
            time.sleep(self._interval)
B
barrierye 已提交
163

B
barrierye 已提交
164
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
165
        raise Exception('This function must be inherited.')
B
barrierye 已提交
166

B
barrierye 已提交
167
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
168
        raise Exception('This function must be inherited.')
B
barrierye 已提交
169

B
barrierye 已提交
170 171 172 173 174
    def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
                            local_model_name):
        tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
        local_model_path = os.path.join(local_path, local_model_name)
        cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
B
barrierye 已提交
175
        _LOGGER.debug('update model cmd: {}'.format(cmd))
B
barrierye 已提交
176 177 178 179
        if os.system(cmd) != 0:
            raise Exception('update local model failed.')

    def _update_local_donefile(self, local_path, local_model_name,
B
barrierye 已提交
180
                               local_timestamp_file):
B
barrierye 已提交
181
        donefile_path = os.path.join(local_path, local_model_name,
B
barrierye 已提交
182
                                     local_timestamp_file)
B
barrierye 已提交
183
        cmd = 'touch {}'.format(donefile_path)
B
barrierye 已提交
184
        _LOGGER.debug('update timestamp cmd: {}'.format(cmd))
B
barrierye 已提交
185 186
        if os.system(cmd) != 0:
            raise Exception('update local donefile failed.')
B
barrierye 已提交
187

B
barrierye 已提交
188

B
barrierye 已提交
189 190 191 192
class AFSMonitor(Monitor):
    ''' AFS Monitor(by hadoop-client). '''

    def __init__(self,
B
barrierye 已提交
193
                 hadoop_bin,
B
barrierye 已提交
194 195 196 197
                 hadoop_host=None,
                 hadoop_ugi=None,
                 interval=10):
        super(AFSMonitor, self).__init__(interval)
B
barrierye 已提交
198
        self._hadoop_bin = hadoop_bin
B
barrierye 已提交
199 200
        self._hadoop_host = hadoop_host
        self._hadoop_ugi = hadoop_ugi
B
barrierye 已提交
201
        self._print_params(['_hadoop_bin', '_hadoop_host', '_hadoop_ugi'])
B
barrierye 已提交
202
        self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
B
barrierye 已提交
203
        if self._hadoop_host and self._hadoop_ugi:
B
barrierye 已提交
204 205
            self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
                self._hadoop_host, self._hadoop_ugi)
B
barrierye 已提交
206
        _LOGGER.info('AFS prefix cmd: {}'.format(self._cmd_prefix))
B
barrierye 已提交
207 208 209

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
210 211
        cmd = '{} -ls {} 2>/dev/null'.format(self._cmd_prefix, remote_filepath)
        _LOGGER.debug('check cmd: {}'.format(cmd))
B
barrierye 已提交
212
        [status, output] = commands.getstatusoutput(cmd)
B
barrierye 已提交
213
        _LOGGER.debug('resp: {}'.format(output))
B
barrierye 已提交
214
        if status == 0:
B
barrierye 已提交
215
            [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1].split()
B
barrierye 已提交
216 217 218 219 220 221 222 223 224
            timestr = mdate + mtime
            return [True, timestr]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        # remove old file before pull remote dir
        local_dirpath = os.path.join(local_tmp_path, dirname)
        if os.path.exists(local_dirpath):
B
barrierye 已提交
225
            _LOGGER.info('remove old temporary model file({}).'.format(dirname))
B
barrierye 已提交
226 227
            shutil.rmtree(local_dirpath)
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
228 229 230
        cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
                                                 remote_dirpath, local_dirpath)
        _LOGGER.debug('pull cmd: {}'.format(cmd))
B
barrierye 已提交
231
        if os.system(cmd) != 0:
B
barrierye 已提交
232 233
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
234 235


B
barrierye 已提交
236 237 238
class HDFSMonitor(Monitor):
    ''' HDFS Monitor. '''

B
barrierye 已提交
239
    def __init__(self, hdfs_bin, interval=10):
B
barrierye 已提交
240
        super(HDFSMonitor, self).__init__(interval)
B
barrierye 已提交
241 242
        self._hdfs_bin = hdfs_bin
        self._print_params(['_hdfs_bin'])
B
barrierye 已提交
243
        self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
B
barrierye 已提交
244
        _LOGGER.info('HDFS prefix cmd: {}'.format(self._cmd_prefix))
B
barrierye 已提交
245

B
barrierye 已提交
246
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
247
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
248
        cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
B
barrierye 已提交
249
        _LOGGER.debug('check cmd: {}'.format(cmd))
B
barrierye 已提交
250
        [status, timestamp] = commands.getstatusoutput(cmd)
B
barrierye 已提交
251
        _LOGGER.debug('resp: {}'.format(output))
B
barrierye 已提交
252 253 254 255 256 257 258
        if status == 0:
            return [True, timestamp]
        else:
            return [False, None]

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
259 260
        cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
                                        local_tmp_path)
B
barrierye 已提交
261
        _LOGGER.debug('pull cmd: {}'.format(cmd))
B
barrierye 已提交
262
        if os.system(cmd) != 0:
B
barrierye 已提交
263 264
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
265

B
barrierye 已提交
266 267

class FTPMonitor(Monitor):
B
barrierye 已提交
268 269
    ''' FTP Monitor. '''

B
barrierye 已提交
270
    def __init__(self, host, port, username="", password="", interval=10):
B
barrierye 已提交
271
        super(FTPMonitor, self).__init__(interval)
B
barrierye 已提交
272
        import ftplib
B
barrierye 已提交
273
        self._ftp = ftplib.FTP()
B
barrierye 已提交
274 275 276 277 278 279 280 281
        self._ftp_host = host
        self._ftp_port = port
        self._ftp_username = username
        self._ftp_password = password
        self._ftp.connect(self._ftp_host, self._ftp_port)
        self._ftp.login(self._ftp_username, self._ftp_password)
        self._print_params(
            ['_ftp_host', '_ftp_port', '_ftp_username', '_ftp_password'])
B
barrierye 已提交
282

B
barrierye 已提交
283
    def _exist_remote_file(self, path, filename, local_tmp_path):
B
barrierye 已提交
284
        import ftplib
B
barrierye 已提交
285
        try:
B
barrierye 已提交
286
            _LOGGER.debug('cwd: {}'.format(path))
B
barrierye 已提交
287 288
            self._ftp.cwd(path)
            timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
B
barrierye 已提交
289
            )
B
barrierye 已提交
290 291
            return [True, timestamp]
        except ftplib.error_perm:
B
barrierye 已提交
292
            _LOGGER.debug('remote file({}) not exist.'.format(filename))
B
barrierye 已提交
293
            return [False, None]
B
barrierye 已提交
294

B
barrierye 已提交
295 296 297 298 299 300 301 302 303 304
    def _download_remote_file(self,
                              remote_path,
                              remote_filename,
                              local_tmp_path,
                              overwrite=True):
        local_fullpath = os.path.join(local_tmp_path, remote_filename)
        if not overwrite and os.path.isfile(fullpath):
            return
        else:
            with open(local_fullpath, 'wb') as f:
B
barrierye 已提交
305
                _LOGGER.debug('cwd: {}'.format(path))
B
barrierye 已提交
306
                self._ftp.cwd(remote_path)
B
barrierye 已提交
307
                _LOGGER.debug('download remote file({})'.format(remote_path))
B
barrierye 已提交
308 309
                self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)

B
barrierye 已提交
310 311
    def _download_remote_files(self,
                               remote_path,
B
barrierye 已提交
312 313 314
                               remote_dirname,
                               local_tmp_path,
                               overwrite=True):
B
barrierye 已提交
315
        import ftplib
B
barrierye 已提交
316
        remote_dirpath = os.path.join(remote_path, remote_dirname)
B
barrierye 已提交
317 318
        # Check whether remote_dirpath is a file or a folder
        try:
B
barrierye 已提交
319
            _LOGGER.debug('cwd: {}'.format(remote_dirpath))
B
barrierye 已提交
320
            self._ftp.cwd(remote_dirpath)
B
barrierye 已提交
321
            _LOGGER.debug('{} is folder.'.format(remote_dirname))
B
barrierye 已提交
322 323 324

            local_dirpath = os.path.join(local_tmp_path, remote_dirname)
            if not os.path.exists(local_dirpath):
B
barrierye 已提交
325
                _LOGGER.info('mkdir: {}'.format(local_dirpath))
B
barrierye 已提交
326 327 328 329 330 331 332 333 334 335
                os.mkdir(local_dirpath)

            output = []
            self._ftp.dir(output.append)
            for line in output:
                [attr, _, _, _, _, _, _, _, name] = line.split()
                if attr[0] == 'd':
                    self._download_remote_files(
                        os.path.join(remote_path, remote_dirname), name,
                        os.path.join(local_tmp_path, remote_dirname), overwrite)
B
barrierye 已提交
336
                else:
B
barrierye 已提交
337 338 339
                    self._download_remote_file(remote_dirname, name,
                                               local_tmp_path, overwrite)
        except ftplib.error_perm:
B
barrierye 已提交
340
            _LOGGER.debug('{} is file.'.format(remote_dirname))
B
barrierye 已提交
341 342 343
            self._download_remote_file(remote_path, remote_dirname,
                                       local_tmp_path, overwrite)
            return
B
barrierye 已提交
344 345

    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
346
        self._download_remote_files(
B
barrierye 已提交
347 348 349 350 351 352 353 354
            remote_path, dirname, local_tmp_path, overwrite=True)


class GeneralMonitor(Monitor):
    ''' General Monitor. '''

    def __init__(self, host, interval=10):
        super(GeneralMonitor, self).__init__(interval)
B
barrierye 已提交
355 356
        self._general_host = host
        self._print_params(['_general_host'])
B
barrierye 已提交
357 358 359 360 361 362

    def _get_local_file_timestamp(self, filename):
        return os.path.getmtime(filename)

    def _exist_remote_file(self, path, filename, local_tmp_path):
        remote_filepath = os.path.join(path, filename)
B
barrierye 已提交
363 364
        url = '{}/{}'.format(self._general_host, remote_filepath)
        _LOGGER.debug('remote file url: {}'.format(url))
B
barrierye 已提交
365
        cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
366
        _LOGGER.debug('wget cmd: {}'.format(cmd))
B
barrierye 已提交
367
        if os.system(cmd) != 0:
B
barrierye 已提交
368
            _LOGGER.debug('remote file({}) not exist.'.format(filename))
B
barrierye 已提交
369 370
            return [False, None]
        else:
B
barrierye 已提交
371
            _LOGGER.debug('download remote file({}).'.format(filename))
B
barrierye 已提交
372 373 374 375
            timestamp = self._get_local_file_timestamp(
                os.path.join(local_tmp_path, filename))
            return [True, timestamp]

B
barrierye 已提交
376
    def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
B
barrierye 已提交
377
        remote_dirpath = os.path.join(remote_path, dirname)
B
barrierye 已提交
378 379
        url = '{}/{}'.format(self._general_host, remote_dirpath)
        _LOGGER.debug('remote file url: {}'.format(url))
B
barrierye 已提交
380
        cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url)
B
barrierye 已提交
381
        _LOGGER.debug('wget cmd: {}'.format(cmd))
B
barrierye 已提交
382
        if os.system(cmd) != 0:
B
barrierye 已提交
383 384
            raise Exception('pull remote dir failed. {}'.format(
                self._check_param_help('remote_model_name', dirname)))
B
barrierye 已提交
385

B
barrierye 已提交
386

B
barrierye 已提交
387
def parse_args():
B
barrierye 已提交
388
    ''' parse args. '''
B
barrierye 已提交
389 390
    parser = argparse.ArgumentParser(description="Monitor")
    parser.add_argument(
B
barrierye 已提交
391
        "--type", type=str, default='general', help="Type of remote server")
B
barrierye 已提交
392 393 394
    parser.add_argument(
        "--remote_path", type=str, required=True, help="Remote path")
    parser.add_argument(
B
barrierye 已提交
395 396 397
        "--remote_model_name",
        type=str,
        required=True,
B
barrierye 已提交
398 399
        help="Remote model name")
    parser.add_argument(
B
barrierye 已提交
400 401 402
        "--remote_donefile_name",
        type=str,
        required=True,
B
barrierye 已提交
403 404 405 406 407 408
        help="Remote donefile name")
    parser.add_argument(
        "--local_path", type=str, required=True, help="Local path")
    parser.add_argument(
        "--local_model_name", type=str, required=True, help="Local model name")
    parser.add_argument(
B
barrierye 已提交
409
        "--local_timestamp_file",
B
barrierye 已提交
410
        type=str,
B
barrierye 已提交
411 412
        default='fluid_time_file',
        help="Local timestamp file name(fluid_time_file in model file)")
B
barrierye 已提交
413
    parser.add_argument(
B
barrierye 已提交
414 415 416 417
        "--local_tmp_path",
        type=str,
        default='_serving_monitor_tmp',
        help="Local tmp path")
B
barrierye 已提交
418 419 420 421 422 423
    parser.add_argument(
        "--unpacked_filename",
        type=str,
        default=None,
        help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
    )
B
barrierye 已提交
424 425
    parser.add_argument(
        "--interval", type=int, default=10, help="Time interval")
B
barrierye 已提交
426
    # general monitor
B
barrierye 已提交
427
    parser.add_argument(
B
barrierye 已提交
428
        "--general_host", type=str, help="Host of general remote server")
B
barrierye 已提交
429
    # hdfs monitor
B
barrierye 已提交
430
    parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
B
barrierye 已提交
431
    # ftp monitor
B
barrierye 已提交
432 433 434 435 436 437
    parser.add_argument("--ftp_host", type=str, help="Host of ftp")
    parser.add_argument("--ftp_port", type=int, help="Port of ftp")
    parser.add_argument(
        "--ftp_username", type=str, default='', help="Username of ftp")
    parser.add_argument(
        "--ftp_password", type=str, default='', help="Password of ftp")
B
barrierye 已提交
438 439
    # afs monitor
    parser.add_argument(
B
barrierye 已提交
440
        "--hadoop_bin", type=str, help="Hadoop_bin_path for afs")
B
barrierye 已提交
441 442 443 444
    parser.add_argument(
        "--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
    parser.add_argument(
        "--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs")
B
barrierye 已提交
445 446
    return parser.parse_args()

B
barrierye 已提交
447

B
barrierye 已提交
448
def get_monitor(mtype):
B
barrierye 已提交
449 450 451 452 453 454 455 456
    """ generator monitor instance.

    Args:
        mtype: type of monitor

    Returns:
        monitor instance.
    """
B
barrierye 已提交
457
    if mtype == 'ftp':
B
barrierye 已提交
458 459 460 461 462 463
        return FTPMonitor(
            args.ftp_host,
            args.ftp_port,
            username=args.ftp_username,
            password=args.ftp_password,
            interval=args.interval)
B
barrierye 已提交
464 465
    elif mtype == 'hdfs':
        return HDFSMonitor(args.hdfs_bin, interval=args.interval)
B
barrierye 已提交
466 467
    elif mtype == 'general':
        return GeneralMonitor(args.general_host, interval=args.interval)
B
barrierye 已提交
468 469
    elif mtype == 'afs':
        return AFSMonitor(
B
barrierye 已提交
470
            args.hadoop_bin,
B
barrierye 已提交
471 472 473
            args.hadoop_host,
            args.hadoop_ugi,
            interval=args.interval)
B
barrierye 已提交
474 475 476 477 478 479 480 481 482 483
    else:
        raise Exception('unsupport type.')


def start_monitor(monitor, args):
    monitor.set_remote_path(args.remote_path)
    monitor.set_remote_model_name(args.remote_model_name)
    monitor.set_remote_donefile_name(args.remote_donefile_name)
    monitor.set_local_path(args.local_path)
    monitor.set_local_model_name(args.local_model_name)
B
barrierye 已提交
484
    monitor.set_local_timestamp_file(args.local_timestamp_file)
B
barrierye 已提交
485
    monitor.set_local_tmp_path(args.local_tmp_path)
B
barrierye 已提交
486
    monitor.set_unpacked_filename(args.unpacked_filename)
B
barrierye 已提交
487
    monitor.run()
B
barrierye 已提交
488

B
barrierye 已提交
489

B
barrierye 已提交
490
if __name__ == "__main__":
B
barrierye 已提交
491 492 493 494
    logging.basicConfig(
        format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
        datefmt='%Y-%m-%d %H:%M',
        level=logging.INFO)
B
barrierye 已提交
495
    args = parse_args()
B
barrierye 已提交
496 497
    monitor = get_monitor(args.type)
    start_monitor(monitor, args)