From 65f9d08240714b1466a1e863b1c54b6c588c98c8 Mon Sep 17 00:00:00 2001 From: barrierye Date: Tue, 31 Mar 2020 08:22:26 +0000 Subject: [PATCH] update code & add general monitor --- python/paddle_serving_server/monitor.py | 133 ++++++++++++++++++------ 1 file changed, 101 insertions(+), 32 deletions(-) diff --git a/python/paddle_serving_server/monitor.py b/python/paddle_serving_server/monitor.py index 2df9b9f4..4a2b278a 100644 --- a/python/paddle_serving_server/monitor.py +++ b/python/paddle_serving_server/monitor.py @@ -35,7 +35,7 @@ class Monitor(object): self._remote_donefile_name = None self._local_path = None self._local_model_name = None - self._local_donefile_name = None + self._local_timestamp_file = None self._interval = interval self._remote_donefile_timestamp = None self._local_tmp_path = None @@ -55,8 +55,8 @@ class Monitor(object): def set_local_model_name(self, model_name): self._local_model_name = model_name - def set_local_donefile_name(self, donefile_name): - self._local_donefile_name = donefile_name + def set_local_timestamp_file(self, timestamp_file): + self._local_timestamp_file = timestamp_file def set_local_tmp_path(self, tmp_path): self._local_tmp_path = tmp_path @@ -72,8 +72,8 @@ class Monitor(object): raise Exception('local_model_name not set.') if self._local_path is None: raise Exception('local_path not set.') - if self._local_donefile_name is None: - raise Exception('local_donefile_name not set.') + if self._local_timestamp_file is None: + raise Exception('local_timestamp_file not set.') if self._local_tmp_path is None: raise Exception('local_tmp_path not set.') @@ -86,7 +86,8 @@ class Monitor(object): os.makedirs(self._local_tmp_path) while True: [flag, timestamp] = self._exist_remote_file( - self._remote_path, self._remote_donefile_name) + self._remote_path, self._remote_donefile_name, + self._local_tmp_path) if flag: if self._remote_donefile_timestamp is None or \ timestamp != self._remote_donefile_timestamp: @@ -103,7 +104,7 @@ class Monitor(object): ).strftime('%Y-%m-%d %H:%M:%S'))) self._update_local_donefile(self._local_path, self._local_model_name, - self._local_donefile_name) + self._local_timestamp_file) print('{} [INFO] update local donefile'.format( datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: @@ -113,7 +114,7 @@ class Monitor(object): '%Y-%m-%d %H:%M:%S'), self._interval)) time.sleep(self._interval) - def _exist_remote_file(self, path, filename): + def _exist_remote_file(self, path, filename, local_tmp_path): raise Exception('This function must be inherited.') def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): @@ -128,9 +129,9 @@ class Monitor(object): raise Exception('update local model failed.') def _update_local_donefile(self, local_path, local_model_name, - local_donefile_name): + local_timestamp_file): donefile_path = os.path.join(local_path, local_model_name, - local_donefile_name) + local_timestamp_file) cmd = 'touch {}'.format(donefile_path) if os.system(cmd) != 0: raise Exception('update local donefile failed.') @@ -143,7 +144,7 @@ class HDFSMonitor(Monitor): super(HDFSMonitor, self).__init__(interval) self._hdfs_bin_path = bin_path - def _exist_remote_file(self, path, filename): + def _exist_remote_file(self, path, filename, local_tmp_path): remote_filepath = os.path.join(path, filename) cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path, remote_filepath) @@ -164,19 +165,15 @@ class HDFSMonitor(Monitor): class FTPMonitor(Monitor): ''' FTP Monitor. ''' - def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10): + def __init__(self, host, port, username="", password="", interval=10): super(FTPMonitor, self).__init__(interval) import ftplib - self._ftp_ip = ftp_ip - self._ftp_port = ftp_port self._ftp = ftplib.FTP() - self._connect(ftp_ip, ftp_port, username, password) - - def _connect(self, ftp_ip, ftp_port, username, password): - self._ftp.connect(ftp_ip, ftp_port) + self._ftp.connect(host, port) self._ftp.login(username, password) + self._ftp_url = 'ftp://{}:{}/'.format(host, port) - def _exist_remote_file(self, path, filename): + def _exist_remote_file(self, path, filename, local_tmp_path): import ftplib try: filepath = os.path.join(path, filename) @@ -186,10 +183,66 @@ class FTPMonitor(Monitor): except ftplib.error_perm: return [False, None] + def _download_remote_files(remote_path, + remote_dirname, + local_tmp_path, + overwrite=True): + try: + remote_dirpath = os.path.join(remote_path, remote_dirname) + self._ftp.cwd(remote_dirpath) + os.mkdir(os.path.join(local_tmp_path, remote_dirname)) + except OSError: + # folder already exists at the local_tmp_path + pass + except ftplib.error_perm: + raise Exception('remote_path({}) not exist.'.format(remote_path)) + + filelist = [x for x in self_ftp.mlsd()] + for file in filelist: + if file[1]['type'] == 'file': + fullpath = os.path.join(local_tmp_path, remote_dirname, file[0]) + if not overwrite and os.path.isfile(fullpath): + continue + else: + with open(fullpath, 'wb') as f: + self._ftp.retrbinary('RETR ' + file[0], f.write) + elif file[1]['type'] == 'dir': + self._download_remote_files( + os.path.join(remote_path, remote_dirname), file[0], + os.path.join(local_tmp_path, remote_dirname), overwrite) + else: + print('Unknown type: ' + file[1]['type']) + + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): + self._exist_remote_file( + remote_path, dirname, local_tmp_path, overwrite=True) + + +class GeneralMonitor(Monitor): + ''' General Monitor. ''' + + def __init__(self, host, interval=10): + super(GeneralMonitor, self).__init__(interval) + self._host = host + + def _get_local_file_timestamp(self, filename): + return os.path.getmtime(filename) + + def _exist_remote_file(self, path, filename, local_tmp_path): + remote_filepath = os.path.join(path, filename) + url = '{}/{}'.format(self._host, remote_filepath) + cmd = 'wget -N -P {} {}'.format(local_tmp_path, url) + if os.system(cmd) != 0: + return [False, None] + else: + timestamp = self._get_local_file_timestamp( + os.path.join(local_tmp_path, filename)) + return [True, timestamp] + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): - filepath = os.path.join(remote_path, dirname) - cmd = 'wget -nH -r -P {} ftp://{}:{}/{} &> /dev/null'.format( - local_tmp_path, self._ftp_ip, self._ftp_port, filepath) + remote_dirpath = os.path.join(remote_path, dirname) + url = '{}/{}'.format(self._host, remote_dirpath) + cmd = 'wget -nH -r -P {} {} &> /dev/null'.format(local_tmp_path, url) if os.system(cmd) != 0: raise Exception('pull remote dir failed.') @@ -198,7 +251,7 @@ def parse_args(): ''' parse args. ''' parser = argparse.ArgumentParser(description="Monitor") parser.add_argument( - "--type", type=str, required=True, help="Type of remote server") + "--type", type=str, default='general', help="Type of remote server") parser.add_argument( "--remote_path", type=str, required=True, help="Remote path") parser.add_argument( @@ -216,26 +269,42 @@ def parse_args(): parser.add_argument( "--local_model_name", type=str, required=True, help="Local model name") parser.add_argument( - "--local_donefile_name", + "--local_timestamp_file", type=str, - required=True, - help="Local donfile name(fluid_time_file in model file)") + default='fluid_time_file', + help="Local timestamp file name(fluid_time_file in model file)") parser.add_argument( - "--local_tmp_path", type=str, default='tmp', help="Local tmp path") + "--local_tmp_path", + type=str, + default='_serving_monitor_tmp', + help="Local tmp path") parser.add_argument( "--interval", type=int, default=10, help="Time interval") - parser.add_argument("--ftp_ip", type=str, help="Ip the ftp") - parser.add_argument("--ftp_port", type=int, help="Port the ftp") parser.add_argument( - "--hdfs_bin", type=str, default='hdfs', help="Hdfs binary file path") + "--general_host", type=str, help="Host of general remote server") + parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path") + parser.add_argument("--ftp_host", type=str, help="Host of ftp") + parser.add_argument("--ftp_port", type=int, help="Port of ftp") + parser.add_argument( + "--ftp_username", type=str, default='', help="Username of ftp") + parser.add_argument( + "--ftp_password", type=str, default='', help="Password of ftp") return parser.parse_args() def get_monitor(mtype): + ''' get monitor. ''' if mtype == 'ftp': - return FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval) + return FTPMonitor( + args.ftp_host, + args.ftp_port, + username=args.ftp_username, + password=args.ftp_password, + interval=args.interval) elif mtype == 'hdfs': return HDFSMonitor(args.hdfs_bin, interval=args.interval) + elif mtype == 'general': + return GeneralMonitor(args.general_host, interval=args.interval) else: raise Exception('unsupport type.') @@ -246,7 +315,7 @@ def start_monitor(monitor, args): monitor.set_remote_donefile_name(args.remote_donefile_name) monitor.set_local_path(args.local_path) monitor.set_local_model_name(args.local_model_name) - monitor.set_local_donefile_name(args.local_donefile_name) + monitor.set_local_timestamp_file(args.local_timestamp_file) monitor.set_local_tmp_path(args.local_tmp_path) monitor.run() -- GitLab