提交 cd2becb3 编写于 作者: B barrierye

add afsMonitor

上级 d806b60a
......@@ -22,6 +22,7 @@ import time
import argparse
import commands
import datetime
import shutil
class Monitor(object):
......@@ -137,17 +138,57 @@ class Monitor(object):
raise Exception('update local donefile failed.')
class AFSMonitor(Monitor):
''' AFS Monitor(by hadoop-client). '''
def __init__(self,
hadoop_bin_path,
hadoop_host=None,
hadoop_ugi=None,
interval=10):
super(AFSMonitor, self).__init__(interval)
self._hadoop_bin = hadoop_bin_path
self._hadoop_host = hadoop_host
self._hadoop_ugi = hadoop_ugi
self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
if not self._hadoop_host and not self._hadoop_ugi:
self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
self._hadoop_host, self._hadoop_ugi)
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath)
[status, output] = commands.getstatusoutput(cmd)
if status == 0:
[_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1]
timestr = mdate + mtime
return [True, timestr]
else:
return [False, None]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
# remove old file before pull remote dir
local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath):
shutil.rmtree(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath,
local_dirpath)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
class HDFSMonitor(Monitor):
''' HDFS Monitor. '''
def __init__(self, bin_path, interval=10):
super(HDFSMonitor, self).__init__(interval)
self._hdfs_bin_path = bin_path
self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path,
remote_filepath)
cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
[status, timestamp] = commands.getstatusoutput(cmd)
if status == 0:
return [True, timestamp]
......@@ -156,8 +197,8 @@ class HDFSMonitor(Monitor):
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath,
local_tmp_path)
cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
local_tmp_path)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
......@@ -280,20 +321,37 @@ def parse_args():
help="Local tmp path")
parser.add_argument(
"--interval", type=int, default=10, help="Time interval")
# general monitor
parser.add_argument(
"--general_host", type=str, help="Host of general remote server")
# hdfs monitor
parser.add_argument("--hdfs_bin", type=str, help="Hdfs binary file path")
# ftp monitor
parser.add_argument("--ftp_host", type=str, help="Host of ftp")
parser.add_argument("--ftp_port", type=int, help="Port of ftp")
parser.add_argument(
"--ftp_username", type=str, default='', help="Username of ftp")
parser.add_argument(
"--ftp_password", type=str, default='', help="Password of ftp")
# afs monitor
parser.add_argument(
"--hadoop_bin_path", type=str, help="Hadoop_bin_path for afs")
parser.add_argument(
"--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
parser.add_argument(
"--hadoop_ugi", type=str, default=None, help="Hadoop_ugi for afs")
return parser.parse_args()
def get_monitor(mtype):
''' get monitor. '''
""" generator monitor instance.
Args:
mtype: type of monitor
Returns:
monitor instance.
"""
if mtype == 'ftp':
return FTPMonitor(
args.ftp_host,
......@@ -305,6 +363,12 @@ def get_monitor(mtype):
return HDFSMonitor(args.hdfs_bin, interval=args.interval)
elif mtype == 'general':
return GeneralMonitor(args.general_host, interval=args.interval)
elif mtype == 'afs':
return AFSMonitor(
args.hadoop_bin_path,
args.hadoop_host,
args.hadoop_ugi,
interval=args.interval)
else:
raise Exception('unsupport type.')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册