diff --git a/python/paddle_serving_server/monitor.py b/python/paddle_serving_server/monitor.py index 4a2b278a96dab3fed9b9b03f2ae609a831205428..00625c5686587bb4a0caa9626ea123703939f583 100644 --- a/python/paddle_serving_server/monitor.py +++ b/python/paddle_serving_server/monitor.py @@ -22,6 +22,7 @@ import time import argparse import commands import datetime +import shutil class Monitor(object): @@ -137,17 +138,57 @@ class Monitor(object): raise Exception('update local donefile failed.') +class AFSMonitor(Monitor): + ''' AFS Monitor(by hadoop-client). ''' + + def __init__(self, + hadoop_bin_path, + hadoop_host=None, + hadoop_ugi=None, + interval=10): + super(AFSMonitor, self).__init__(interval) + self._hadoop_bin = hadoop_bin_path + self._hadoop_host = hadoop_host + self._hadoop_ugi = hadoop_ugi + self._cmd_prefix = '{} fs '.format(self._hadoop_bin) + if not self._hadoop_host and not self._hadoop_ugi: + self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format( + self._hadoop_host, self._hadoop_ugi) + + def _exist_remote_file(self, path, filename, local_tmp_path): + remote_filepath = os.path.join(path, filename) + cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath) + [status, output] = commands.getstatusoutput(cmd) + if status == 0: + [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1] + timestr = mdate + mtime + return [True, timestr] + else: + return [False, None] + + def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): + # remove old file before pull remote dir + local_dirpath = os.path.join(local_tmp_path, dirname) + if os.path.exists(local_dirpath): + shutil.rmtree(local_dirpath) + remote_dirpath = os.path.join(remote_path, dirname) + cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath, + local_dirpath) + if os.system(cmd) != 0: + raise Exception('pull remote dir failed.') + + class HDFSMonitor(Monitor): ''' HDFS Monitor. ''' def __init__(self, bin_path, interval=10): super(HDFSMonitor, self).__init__(interval) self._hdfs_bin_path = bin_path + self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path) def _exist_remote_file(self, path, filename, local_tmp_path): remote_filepath = os.path.join(path, filename) - cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path, - remote_filepath) + cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath) [status, timestamp] = commands.getstatusoutput(cmd) if status == 0: return [True, timestamp] @@ -156,8 +197,8 @@ class HDFSMonitor(Monitor): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): remote_dirpath = os.path.join(remote_path, dirname) - cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath, - local_tmp_path) + cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath, + local_tmp_path) if os.system(cmd) != 0: raise Exception('pull remote dir failed.') @@ -280,20 +321,37 @@ def parse_args(): help="Local tmp path") parser.add_argument( "--interval", type=int, default=10, help="Time interval") + # general monitor parser.add_argument( "--general_host", type=str, help="Host of general remote server") + # hdfs monitor parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path") + # ftp monitor parser.add_argument("--ftp_host", type=str, help="Host of ftp") parser.add_argument("--ftp_port", type=int, help="Port of ftp") parser.add_argument( "--ftp_username", type=str, default='', help="Username of ftp") parser.add_argument( "--ftp_password", type=str, default='', help="Password of ftp") + # afs monitor + parser.add_argument( + "--hadoop_bin_path", type=str, help="Hadoop_bin_path for afs") + parser.add_argument( + "--hadoop_host", type=str, default=None, help="Hadoop_host for afs") + parser.add_argument( + "--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs") return parser.parse_args() def get_monitor(mtype): - ''' get monitor. ''' + """ generator monitor instance. + + Args: + mtype: type of monitor + + Returns: + monitor instance. + """ if mtype == 'ftp': return FTPMonitor( args.ftp_host, @@ -305,6 +363,12 @@ def get_monitor(mtype): return HDFSMonitor(args.hdfs_bin, interval=args.interval) elif mtype == 'general': return GeneralMonitor(args.general_host, interval=args.interval) + elif mtype == 'afs': + return AFSMonitor( + args.hadoop_bin_path, + args.hadoop_host, + args.hadoop_ugi, + interval=args.interval) else: raise Exception('unsupport type.')