提交 00432a6d 编写于 作者: B barrierye

add afsMonitor

上级 50e84d55
...@@ -22,6 +22,7 @@ import time ...@@ -22,6 +22,7 @@ import time
import argparse import argparse
import commands import commands
import datetime import datetime
import shutil
class Monitor(object): class Monitor(object):
...@@ -137,17 +138,57 @@ class Monitor(object): ...@@ -137,17 +138,57 @@ class Monitor(object):
raise Exception('update local donefile failed.') raise Exception('update local donefile failed.')
class AFSMonitor(Monitor):
''' AFS Monitor(by hadoop-client). '''
def __init__(self,
hadoop_bin_path,
hadoop_host=None,
hadoop_ugi=None,
interval=10):
super(AFSMonitor, self).__init__(interval)
self._hadoop_bin = hadoop_bin_path
self._hadoop_host = hadoop_host
self._hadoop_ugi = hadoop_ugi
self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
if not self._hadoop_host and not self._hadoop_ugi:
self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
self._hadoop_host, self._hadoop_ugi)
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath)
[status, output] = commands.getstatusoutput(cmd)
if status == 0:
[_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1]
timestr = mdate + mtime
return [True, timestr]
else:
return [False, None]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
# remove old file before pull remote dir
local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath):
shutil.rmtree(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath,
local_dirpath)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
class HDFSMonitor(Monitor): class HDFSMonitor(Monitor):
''' HDFS Monitor. ''' ''' HDFS Monitor. '''
def __init__(self, bin_path, interval=10): def __init__(self, bin_path, interval=10):
super(HDFSMonitor, self).__init__(interval) super(HDFSMonitor, self).__init__(interval)
self._hdfs_bin_path = bin_path self._hdfs_bin_path = bin_path
self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(path, filename)
cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path, cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
remote_filepath)
[status, timestamp] = commands.getstatusoutput(cmd) [status, timestamp] = commands.getstatusoutput(cmd)
if status == 0: if status == 0:
return [True, timestamp] return [True, timestamp]
...@@ -156,7 +197,7 @@ class HDFSMonitor(Monitor): ...@@ -156,7 +197,7 @@ class HDFSMonitor(Monitor):
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath, cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
local_tmp_path) local_tmp_path)
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed.') raise Exception('pull remote dir failed.')
...@@ -280,20 +321,37 @@ def parse_args(): ...@@ -280,20 +321,37 @@ def parse_args():
help="Local tmp path") help="Local tmp path")
parser.add_argument( parser.add_argument(
"--interval", type=int, default=10, help="Time interval") "--interval", type=int, default=10, help="Time interval")
# general monitor
parser.add_argument( parser.add_argument(
"--general_host", type=str, help="Host of general remote server") "--general_host", type=str, help="Host of general remote server")
# hdfs monitor
parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path") parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
# ftp monitor
parser.add_argument("--ftp_host", type=str, help="Host of ftp") parser.add_argument("--ftp_host", type=str, help="Host of ftp")
parser.add_argument("--ftp_port", type=int, help="Port of ftp") parser.add_argument("--ftp_port", type=int, help="Port of ftp")
parser.add_argument( parser.add_argument(
"--ftp_username", type=str, default='', help="Username of ftp") "--ftp_username", type=str, default='', help="Username of ftp")
parser.add_argument( parser.add_argument(
"--ftp_password", type=str, default='', help="Password of ftp") "--ftp_password", type=str, default='', help="Password of ftp")
# afs monitor
parser.add_argument(
"--hadoop_bin_path", type=str, help="Hadoop_bin_path for afs")
parser.add_argument(
"--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
parser.add_argument(
"--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs")
return parser.parse_args() return parser.parse_args()
def get_monitor(mtype): def get_monitor(mtype):
''' get monitor. ''' """ generator monitor instance.
Args:
mtype: type of monitor
Returns:
monitor instance.
"""
if mtype == 'ftp': if mtype == 'ftp':
return FTPMonitor( return FTPMonitor(
args.ftp_host, args.ftp_host,
...@@ -305,6 +363,12 @@ def get_monitor(mtype): ...@@ -305,6 +363,12 @@ def get_monitor(mtype):
return HDFSMonitor(args.hdfs_bin, interval=args.interval) return HDFSMonitor(args.hdfs_bin, interval=args.interval)
elif mtype == 'general': elif mtype == 'general':
return GeneralMonitor(args.general_host, interval=args.interval) return GeneralMonitor(args.general_host, interval=args.interval)
elif mtype == 'afs':
return AFSMonitor(
args.hadoop_bin_path,
args.hadoop_host,
args.hadoop_ugi,
interval=args.interval)
else: else:
raise Exception('unsupport type.') raise Exception('unsupport type.')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册