From 65fdaa1f72e3d88fceaa340b5fa33aebc3ede30d Mon Sep 17 00:00:00 2001 From: barrierye Date: Mon, 30 Mar 2020 09:55:17 +0000 Subject: [PATCH] add hdfs-monitor --- python/paddle_serving_server/monitor.py | 177 +++++++++++++++--------- 1 file changed, 115 insertions(+), 62 deletions(-) diff --git a/python/paddle_serving_server/monitor.py b/python/paddle_serving_server/monitor.py index f2b749cc..fdeb0b98 100644 --- a/python/paddle_serving_server/monitor.py +++ b/python/paddle_serving_server/monitor.py @@ -20,6 +20,8 @@ Usage: import os import time import argparse +import commands +import datetime class Monitor(object): @@ -36,7 +38,7 @@ class Monitor(object): self._local_donefile_name = None self._interval = interval self._remote_donefile_timestamp = None - self._local_tmp_dir = None + self._local_tmp_path = None def set_remote_path(self, remote_path): self._remote_path = remote_path @@ -56,8 +58,8 @@ class Monitor(object): def set_local_donefile_name(self, donefile_name): self._local_donefile_name = donefile_name - def set_local_tmp_dir(self, tmp_dir): - self._local_tmp_dir = tmp_dir + def set_local_tmp_path(self, tmp_path): + self._local_tmp_path = tmp_path def _check_params(self): if self._remote_path is None: @@ -72,50 +74,107 @@ class Monitor(object): raise Exception('local_path not set.') if self._local_donefile_name is None: raise Exception('local_donefile_name not set.') - if self._local_tmp_dir is None: - raise Exception('local_tmp_dir not set.') + if self._local_tmp_path is None: + raise Exception('local_tmp_path not set.') def run(self): ''' Monitor the remote model by polling and update the local model. ''' self._check_params() - if not os.path.exists(self._local_tmp_dir): - os.makedirs(self._local_tmp_dir) + if not os.path.exists(self._local_tmp_path): + os.makedirs(self._local_tmp_path) while True: - [flag, timestamp] = self._exist_remote_donefile() + [flag, timestamp] = self._exist_remote_file( + self._remote_path, self._remote_donefile_name) if flag: if self._remote_donefile_timestamp is None or \ timestamp != self._remote_donefile_timestamp: self._remote_donefile_timestamp = timestamp - self._pull_remote_model() - print('[INFO] pull remote model') - self._update_local_model() - print('[INFO] update model') - self._update_local_donefile() - print('[INFO] update local donefile') + self._pull_remote_dir(self._remote_path, + self._remote_model_name, + self._local_tmp_path) + print('{} [INFO] pull remote model'.format( + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + self._update_local_model( + self._local_tmp_path, self._remote_model_name, + self._local_path, self._local_model_name) + print('{} [INFO] update model'.format(datetime.datetime.now( + ).strftime('%Y-%m-%d %H:%M:%S'))) + self._update_local_donefile(self._local_path, + self._local_model_name, + self._local_donefile_name) + print('{} [INFO] update local donefile'.format( + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: - print('[INFO] no donefile.') - print('[INFO] sleep {}s'.format(self._interval)) + print('{} [INFO] no donefile.'.format(datetime.datetime.now( + ).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime( + '%Y-%m-%d %H:%M:%S'), self._interval)) time.sleep(self._interval) - def _exist_remote_donefile(self): + def _exist_remote_file(self, path, filename): raise Exception('This function must be inherited.') - def _pull_remote_model(self): + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): raise Exception('This function must be inherited.') - def _update_local_model(self): - raise Exception('This function must be inherited.') + def _update_local_model(self, local_tmp_path, remote_model_name, local_path, + local_model_name): + tmp_model_path = os.path.join(local_tmp_path, remote_model_name) + local_model_path = os.path.join(local_path, local_model_name) + cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path) + if os.system(cmd) != 0: + raise Exception('update local model failed.') + + def _update_local_donefile(self, local_path, local_model_name, + local_donefile_name): + donefile_path = os.path.join(local_path, local_model_name, + local_donefile_name) + cmd = 'touch {}'.format(donefile_path) + if os.system(cmd) != 0: + raise Exception('update local donefile failed.') - def _update_local_donefile(self): - raise Exception('This function must be inherited.') + +class HDFSMonitor(Monitor): + ''' HDFS Monitor. ''' + + def __init__(self, bin_path, interval=10): + super(HDFSMonitor, self).__init__(interval) + self._hdfs_bin_path = bin_path + + def _exist_remote_file(self, path, filename): + remote_filepath = os.path.join(path, filename) + cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path, + remote_filepath) + [status, timestamp] = commands.getstatusoutput(cmd) + if status == 0: + return [True, timestamp] + else: + return [False, None] + + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): + remote_dirpath = os.path.join(remote_path, dirname) + cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath, + local_tmp_path) + if os.system(cmd) != 0: + raise Exception('pull remote dir failed.') + + def _update_local_model(self, local_tmp_path, remote_model_name, local_path, + local_model_name): + tmp_model_path = os.path.join(local_tmp_path, remote_model_name) + local_model_path = os.path.join(local_path, local_model_name) + cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path) + if os.system(cmd) != 0: + raise Exception('pull remote dir failed.') class FTPMonitor(Monitor): + ''' FTP Monitor. ''' + def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10): - import ftplib super(FTPMonitor, self).__init__(interval) + import ftplib self._ftp_ip = ftp_ip self._ftp_port = ftp_port self._ftp = ftplib.FTP() @@ -125,39 +184,26 @@ class FTPMonitor(Monitor): self._ftp.connect(ftp_ip, ftp_port) self._ftp.login(username, password) - def _exist_remote_donefile(self): + def _exist_remote_file(self, path, filename): import ftplib try: - donefile_path = '{}/{}'.format(self._remote_path, - self._remote_donefile_name) - timestamp = self._ftp.voidcmd('MDTM {}'.format(donefile_path))[ - 4:].strip() + filepath = os.path.join(path, filename) + timestamp = self._ftp.voidcmd('MDTM {}'.format(filepath))[4:].strip( + ) return [True, timestamp] except ftplib.error_perm: return [False, None] - def _pull_remote_model(self): - cmd = 'wget -nH -r -P {} ftp://{}:{}/{}/{} &> /dev/null'.format( - self._local_tmp_dir, self._ftp_ip, self._ftp_port, - self._remote_path, self._remote_model_name) - if os.system(cmd) != 0: - raise Exception('pull remote model failed.') - - def _update_local_model(self): - cmd = 'cp -r {}/{}/* {}/{}'.format( - self._local_tmp_dir, self._remote_model_name, self._local_path, - self._local_model_name) + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): + filepath = os.path.join(remote_path, dirname) + cmd = 'wget -nH -r -P {} ftp://{}:{}/{} &> /dev/null'.format( + local_tmp_path, self._ftp_ip, self._ftp_port, filepath) if os.system(cmd) != 0: - raise Exception('update local model failed.') - - def _update_local_donefile(self): - cmd = 'touch {}/{}/{}'.format(self._local_path, self._local_model_name, - self._local_donefile_name) - if os.system(cmd) != 0: - raise Exception('update local donefile failed.') + raise Exception('pull remote dir failed.') def parse_args(): + ''' parse args. ''' parser = argparse.ArgumentParser(description="Monitor") parser.add_argument( "--type", type=str, required=True, help="Type of remote server") @@ -183,30 +229,37 @@ def parse_args(): required=True, help="Local donfile name(fluid_time_file in model file)") parser.add_argument( - "--local_tmp_dir", type=str, default='tmp', help="Local tmp dir") + "--local_tmp_path", type=str, default='tmp', help="Local tmp path") parser.add_argument( "--interval", type=int, default=10, help="Time interval") parser.add_argument("--ftp_ip", type=str, help="Ip the ftp") parser.add_argument("--ftp_port", type=int, help="Port the ftp") + parser.add_argument( + "--hdfs_bin", type=str, default='hdfs', help="Hdfs binary file") return parser.parse_args() -def start_ftp_monitor(): - args = parse_args() - obj = FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval) - obj.set_remote_path(args.remote_path) - obj.set_remote_model_name(args.remote_model_name) - obj.set_remote_donefile_name(args.remote_donefile_name) - obj.set_local_path(args.local_path) - obj.set_local_model_name(args.local_model_name) - obj.set_local_donefile_name(args.local_donefile_name) - obj.set_local_tmp_dir(args.local_tmp_dir) - obj.run() +def get_monitor(mtype): + if mtype == 'ftp': + return FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval) + elif mtype == 'hdfs': + return HDFSMonitor(args.hdfs_bin, interval=args.interval) + else: + raise Exception('unsupport type.') + + +def start_monitor(monitor, args): + monitor.set_remote_path(args.remote_path) + monitor.set_remote_model_name(args.remote_model_name) + monitor.set_remote_donefile_name(args.remote_donefile_name) + monitor.set_local_path(args.local_path) + monitor.set_local_model_name(args.local_model_name) + monitor.set_local_donefile_name(args.local_donefile_name) + monitor.set_local_tmp_path(args.local_tmp_path) + monitor.run() if __name__ == "__main__": args = parse_args() - if args.type == 'ftp': - start_ftp_monitor() - else: - raise Exception('unsupport type.') + monitor = get_monitor(args.type) + start_monitor(monitor, args) -- GitLab