From 2f65fba21aa4f9631a7676b5c2027e0057ef03bb Mon Sep 17 00:00:00 2001 From: barrierye Date: Thu, 2 Apr 2020 08:06:02 +0000 Subject: [PATCH] fix AFSMonitor && add friendly info --- python/paddle_serving_server/monitor.py | 125 ++++++++++++++++-------- 1 file changed, 86 insertions(+), 39 deletions(-) diff --git a/python/paddle_serving_server/monitor.py b/python/paddle_serving_server/monitor.py index 11910cd5..5cd08039 100644 --- a/python/paddle_serving_server/monitor.py +++ b/python/paddle_serving_server/monitor.py @@ -24,6 +24,9 @@ import commands import datetime import shutil import tarfile +import logging + +_LOGGER = logging.getLogger(__name__) class Monitor(object): @@ -71,31 +74,28 @@ class Monitor(object): return "Please check the {}({}) parameter.".format(param_name, param_value) - def _check_params(self): - if self._remote_path is None: - raise Exception('remote_path not set.') - if self._remote_model_name is None: - raise Exception('remote_model_name not set.') - if self._remote_donefile_name is None: - raise Exception('remote_donefile_name not set.') - if self._local_model_name is None: - raise Exception('local_model_name not set.') - if self._local_path is None: - raise Exception('local_path not set.') - if self._local_timestamp_file is None: - raise Exception('local_timestamp_file not set.') - if self._local_tmp_path is None: - raise Exception('local_tmp_path not set.') + def _check_params(self, params): + for param in params: + if getattr(self, param, None) is None: + raise Exception('{} not set.'.format(param)) + + def _print_params(self, params_name): + self._check_params(params_name) + for name in params_name: + _LOGGER.info('{}: {}'.format(name, getattr(self, name))) def _decompress_model_file(self, local_tmp_path, model_name, unpacked_filename): if unpacked_filename is None: + _LOGGER.debug('remote file({}) is already unpacked.'.format( + model_name)) return model_name tar_model_path = os.path.join(local_tmp_path, model_name) if not tarfile.is_tarfile(tar_model_path): raise Exception('not a tar packaged file type. {}'.format( self._check_param_help('remote_model_name', model_name))) try: + _LOGGER.info('unpack remote file({}).'.format(model_name)) tar = tarfile.open(tar_model_path) tar.extractall(local_tmp_path) tar.close() @@ -105,6 +105,9 @@ class Monitor(object): self._check_param_help('local_tmp_path', local_tmp_path))) finally: os.remove(tar_model_path) + _LOGGER.debug('remove packed file({}).'.format(model_name)) + _LOGGER.info('using unpacked filename: {}.'.format( + unpacked_filename)) if not os.path.exists(unpacked_filename): raise Exception('file not exist. {}'.format( self._check_param_help('unpacked_filename', @@ -115,8 +118,14 @@ class Monitor(object): ''' Monitor the remote model by polling and update the local model. ''' - self._check_params() + params = [ + '_remote_path', '_remote_model_name', '_remote_donefile_name', + '_local_model_name', '_local_path', '_local_timestamp_file', + '_local_tmp_path', '_interval' + ] + self._print_params(params) if not os.path.exists(self._local_tmp_path): + _LOGGER.info('mkdir: {}'.format(self._local_tmp_path)) os.makedirs(self._local_tmp_path) while True: [flag, timestamp] = self._exist_remote_file( @@ -125,30 +134,31 @@ class Monitor(object): if flag: if self._remote_donefile_timestamp is None or \ timestamp != self._remote_donefile_timestamp: + _LOGGER.info('doneilfe({}) changed.'.format( + self._remote_donefile_name)) self._remote_donefile_timestamp = timestamp self._pull_remote_dir(self._remote_path, self._remote_model_name, self._local_tmp_path) - print('{} [INFO] pull remote model'.format( - datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + _LOGGER.info('pull remote model({}).'.format( + self._remote_model_name)) unpacked_filename = self._decompress_model_file( self._local_tmp_path, self._remote_model_name, self._unpacked_filename) self._update_local_model( self._local_tmp_path, unpacked_filename, self._local_path, self._local_model_name) - print('{} [INFO] update model'.format(datetime.datetime.now( - ).strftime('%Y-%m-%d %H:%M:%S'))) + _LOGGER.info('update local model({}).'.format( + self._local_model_name)) self._update_local_donefile(self._local_path, self._local_model_name, self._local_timestamp_file) - print('{} [INFO] update local donefile'.format( - datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + _LOGGER.info('update model timestamp({}).'.format( + self._local_timestamp_file)) else: - print('{} [INFO] no donefile.'.format(datetime.datetime.now( - ).strftime('%Y-%m-%d %H:%M:%S'))) - print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), self._interval)) + _LOGGER.info('remote({}) has no donefile.'.format( + self._remote_path)) + _LOGGER.info('sleep {}s.'.format(self._interval)) time.sleep(self._interval) def _exist_remote_file(self, path, filename, local_tmp_path): @@ -162,6 +172,7 @@ class Monitor(object): tmp_model_path = os.path.join(local_tmp_path, remote_model_name) local_model_path = os.path.join(local_path, local_model_name) cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path) + _LOGGER.debug('update model cmd: {}'.format(cmd)) if os.system(cmd) != 0: raise Exception('update local model failed.') @@ -170,6 +181,7 @@ class Monitor(object): donefile_path = os.path.join(local_path, local_model_name, local_timestamp_file) cmd = 'touch {}'.format(donefile_path) + _LOGGER.debug('update timestamp cmd: {}'.format(cmd)) if os.system(cmd) != 0: raise Exception('update local donefile failed.') @@ -186,17 +198,21 @@ class AFSMonitor(Monitor): self._hadoop_bin = hadoop_bin self._hadoop_host = hadoop_host self._hadoop_ugi = hadoop_ugi + self._print_params(['_hadoop_bin', '_hadoop_host', '_hadoop_ugi']) self._cmd_prefix = '{} fs '.format(self._hadoop_bin) - if not self._hadoop_host and not self._hadoop_ugi: + if self._hadoop_host and self._hadoop_ugi: self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format( self._hadoop_host, self._hadoop_ugi) + _LOGGER.info('AFS prefix cmd: {}'.format(self._cmd_prefix)) def _exist_remote_file(self, path, filename, local_tmp_path): remote_filepath = os.path.join(path, filename) - cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath) + cmd = '{} -ls {} 2>/dev/null'.format(self._cmd_prefix, remote_filepath) + _LOGGER.debug('check cmd: {}'.format(cmd)) [status, output] = commands.getstatusoutput(cmd) + _LOGGER.debug('resp: {}'.format(output)) if status == 0: - [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1] + [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1].split() timestr = mdate + mtime return [True, timestr] else: @@ -206,10 +222,12 @@ class AFSMonitor(Monitor): # remove old file before pull remote dir local_dirpath = os.path.join(local_tmp_path, dirname) if os.path.exists(local_dirpath): + _LOGGER.info('remove old temporary model file({}).'.format(dirname)) shutil.rmtree(local_dirpath) remote_dirpath = os.path.join(remote_path, dirname) - cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath, - local_dirpath) + cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix, + remote_dirpath, local_dirpath) + _LOGGER.debug('pull cmd: {}'.format(cmd)) if os.system(cmd) != 0: raise Exception('pull remote dir failed. {}'.format( self._check_param_help('remote_model_name', dirname))) @@ -218,15 +236,19 @@ class AFSMonitor(Monitor): class HDFSMonitor(Monitor): ''' HDFS Monitor. ''' - def __init__(self, bin_path, interval=10): + def __init__(self, hdfs_bin, interval=10): super(HDFSMonitor, self).__init__(interval) - self._hdfs_bin_path = bin_path + self._hdfs_bin = hdfs_bin + self._print_params(['_hdfs_bin']) self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path) + _LOGGER.info('HDFS prefix cmd: {}'.format(self._cmd_prefix)) def _exist_remote_file(self, path, filename, local_tmp_path): remote_filepath = os.path.join(path, filename) cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath) + _LOGGER.debug('check cmd: {}'.format(cmd)) [status, timestamp] = commands.getstatusoutput(cmd) + _LOGGER.debug('resp: {}'.format(output)) if status == 0: return [True, timestamp] else: @@ -236,6 +258,7 @@ class HDFSMonitor(Monitor): remote_dirpath = os.path.join(remote_path, dirname) cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath, local_tmp_path) + _LOGGER.debug('pull cmd: {}'.format(cmd)) if os.system(cmd) != 0: raise Exception('pull remote dir failed. {}'.format( self._check_param_help('remote_model_name', dirname))) @@ -248,18 +271,25 @@ class FTPMonitor(Monitor): super(FTPMonitor, self).__init__(interval) import ftplib self._ftp = ftplib.FTP() - self._ftp.connect(host, port) - self._ftp.login(username, password) - self._ftp_url = 'ftp://{}:{}/'.format(host, port) + self._ftp_host = host + self._ftp_port = port + self._ftp_username = username + self._ftp_password = password + self._ftp.connect(self._ftp_host, self._ftp_port) + self._ftp.login(self._ftp_username, self._ftp_password) + self._print_params( + ['_ftp_host', '_ftp_port', '_ftp_username', '_ftp_password']) def _exist_remote_file(self, path, filename, local_tmp_path): import ftplib try: + _LOGGER.debug('cwd: {}'.format(path)) self._ftp.cwd(path) timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip( ) return [True, timestamp] except ftplib.error_perm: + _LOGGER.debug('remote file({}) not exist.'.format(filename)) return [False, None] def _download_remote_file(self, @@ -272,7 +302,9 @@ class FTPMonitor(Monitor): return else: with open(local_fullpath, 'wb') as f: + _LOGGER.debug('cwd: {}'.format(path)) self._ftp.cwd(remote_path) + _LOGGER.debug('download remote file({})'.format(remote_path)) self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write) def _download_remote_files(self, @@ -284,10 +316,13 @@ class FTPMonitor(Monitor): remote_dirpath = os.path.join(remote_path, remote_dirname) # Check whether remote_dirpath is a file or a folder try: + _LOGGER.debug('cwd: {}'.format(remote_dirpath)) self._ftp.cwd(remote_dirpath) + _LOGGER.debug('{} is folder.'.format(remote_dirname)) local_dirpath = os.path.join(local_tmp_path, remote_dirname) if not os.path.exists(local_dirpath): + _LOGGER.info('mkdir: {}'.format(local_dirpath)) os.mkdir(local_dirpath) output = [] @@ -302,6 +337,7 @@ class FTPMonitor(Monitor): self._download_remote_file(remote_dirname, name, local_tmp_path, overwrite) except ftplib.error_perm: + _LOGGER.debug('{} is file.'.format(remote_dirname)) self._download_remote_file(remote_path, remote_dirname, local_tmp_path, overwrite) return @@ -316,26 +352,33 @@ class GeneralMonitor(Monitor): def __init__(self, host, interval=10): super(GeneralMonitor, self).__init__(interval) - self._host = host + self._general_host = host + self._print_params(['_general_host']) def _get_local_file_timestamp(self, filename): return os.path.getmtime(filename) def _exist_remote_file(self, path, filename, local_tmp_path): remote_filepath = os.path.join(path, filename) - url = '{}/{}'.format(self._host, remote_filepath) + url = '{}/{}'.format(self._general_host, remote_filepath) + _LOGGER.debug('remote file url: {}'.format(url)) cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url) + _LOGGER.debug('wget cmd: {}'.format(cmd)) if os.system(cmd) != 0: + _LOGGER.debug('remote file({}) not exist.'.format(filename)) return [False, None] else: + _LOGGER.debug('download remote file({}).'.format(filename)) timestamp = self._get_local_file_timestamp( os.path.join(local_tmp_path, filename)) return [True, timestamp] def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): remote_dirpath = os.path.join(remote_path, dirname) - url = '{}/{}'.format(self._host, remote_dirpath) + url = '{}/{}'.format(self._general_host, remote_dirpath) + _LOGGER.debug('remote file url: {}'.format(url)) cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url) + _LOGGER.debug('wget cmd: {}'.format(cmd)) if os.system(cmd) != 0: raise Exception('pull remote dir failed. {}'.format( self._check_param_help('remote_model_name', dirname))) @@ -445,6 +488,10 @@ def start_monitor(monitor, args): if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M', + level=logging.INFO) args = parse_args() monitor = get_monitor(args.type) start_monitor(monitor, args) -- GitLab