提交 65fdaa1f 编写于 作者: B barrierye

add hdfs-monitor

上级 889ac104
...@@ -20,6 +20,8 @@ Usage: ...@@ -20,6 +20,8 @@ Usage:
import os import os
import time import time
import argparse import argparse
import commands
import datetime
class Monitor(object): class Monitor(object):
...@@ -36,7 +38,7 @@ class Monitor(object): ...@@ -36,7 +38,7 @@ class Monitor(object):
self._local_donefile_name = None self._local_donefile_name = None
self._interval = interval self._interval = interval
self._remote_donefile_timestamp = None self._remote_donefile_timestamp = None
self._local_tmp_dir = None self._local_tmp_path = None
def set_remote_path(self, remote_path): def set_remote_path(self, remote_path):
self._remote_path = remote_path self._remote_path = remote_path
...@@ -56,8 +58,8 @@ class Monitor(object): ...@@ -56,8 +58,8 @@ class Monitor(object):
def set_local_donefile_name(self, donefile_name): def set_local_donefile_name(self, donefile_name):
self._local_donefile_name = donefile_name self._local_donefile_name = donefile_name
def set_local_tmp_dir(self, tmp_dir): def set_local_tmp_path(self, tmp_path):
self._local_tmp_dir = tmp_dir self._local_tmp_path = tmp_path
def _check_params(self): def _check_params(self):
if self._remote_path is None: if self._remote_path is None:
...@@ -72,50 +74,107 @@ class Monitor(object): ...@@ -72,50 +74,107 @@ class Monitor(object):
raise Exception('local_path not set.') raise Exception('local_path not set.')
if self._local_donefile_name is None: if self._local_donefile_name is None:
raise Exception('local_donefile_name not set.') raise Exception('local_donefile_name not set.')
if self._local_tmp_dir is None: if self._local_tmp_path is None:
raise Exception('local_tmp_dir not set.') raise Exception('local_tmp_path not set.')
def run(self): def run(self):
''' '''
Monitor the remote model by polling and update the local model. Monitor the remote model by polling and update the local model.
''' '''
self._check_params() self._check_params()
if not os.path.exists(self._local_tmp_dir): if not os.path.exists(self._local_tmp_path):
os.makedirs(self._local_tmp_dir) os.makedirs(self._local_tmp_path)
while True: while True:
[flag, timestamp] = self._exist_remote_donefile() [flag, timestamp] = self._exist_remote_file(
self._remote_path, self._remote_donefile_name)
if flag: if flag:
if self._remote_donefile_timestamp is None or \ if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp: timestamp != self._remote_donefile_timestamp:
self._remote_donefile_timestamp = timestamp self._remote_donefile_timestamp = timestamp
self._pull_remote_model() self._pull_remote_dir(self._remote_path,
print('[INFO] pull remote model') self._remote_model_name,
self._update_local_model() self._local_tmp_path)
print('[INFO] update model') print('{} [INFO] pull remote model'.format(
self._update_local_donefile() datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('[INFO] update local donefile') self._update_local_model(
self._local_tmp_path, self._remote_model_name,
self._local_path, self._local_model_name)
print('{} [INFO] update model'.format(datetime.datetime.now(
).strftime('%Y-%m-%d %H:%M:%S')))
self._update_local_donefile(self._local_path,
self._local_model_name,
self._local_donefile_name)
print('{} [INFO] update local donefile'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else: else:
print('[INFO] no donefile.') print('{} [INFO] no donefile.'.format(datetime.datetime.now(
print('[INFO] sleep {}s'.format(self._interval)) ).strftime('%Y-%m-%d %H:%M:%S')))
print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'), self._interval))
time.sleep(self._interval) time.sleep(self._interval)
def _exist_remote_donefile(self): def _exist_remote_file(self, path, filename):
raise Exception('This function must be inherited.') raise Exception('This function must be inherited.')
def _pull_remote_model(self): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
raise Exception('This function must be inherited.') raise Exception('This function must be inherited.')
def _update_local_model(self): def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
raise Exception('This function must be inherited.') local_model_name):
tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
local_model_path = os.path.join(local_path, local_model_name)
cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
if os.system(cmd) != 0:
raise Exception('update local model failed.')
def _update_local_donefile(self, local_path, local_model_name,
local_donefile_name):
donefile_path = os.path.join(local_path, local_model_name,
local_donefile_name)
cmd = 'touch {}'.format(donefile_path)
if os.system(cmd) != 0:
raise Exception('update local donefile failed.')
def _update_local_donefile(self):
raise Exception('This function must be inherited.') class HDFSMonitor(Monitor):
''' HDFS Monitor. '''
def __init__(self, bin_path, interval=10):
super(HDFSMonitor, self).__init__(interval)
self._hdfs_bin_path = bin_path
def _exist_remote_file(self, path, filename):
remote_filepath = os.path.join(path, filename)
cmd = '{} dfs -stat "%Y" {}'.format(self._hdfs_bin_path,
remote_filepath)
[status, timestamp] = commands.getstatusoutput(cmd)
if status == 0:
return [True, timestamp]
else:
return [False, None]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} dfs -get -f {} {}'.format(self._hdfs_bin_path, remote_dirpath,
local_tmp_path)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
def _update_local_model(self, local_tmp_path, remote_model_name, local_path,
local_model_name):
tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
local_model_path = os.path.join(local_path, local_model_name)
cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
class FTPMonitor(Monitor): class FTPMonitor(Monitor):
''' FTP Monitor. '''
def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10): def __init__(self, ftp_ip, ftp_port, username="", password="", interval=10):
import ftplib
super(FTPMonitor, self).__init__(interval) super(FTPMonitor, self).__init__(interval)
import ftplib
self._ftp_ip = ftp_ip self._ftp_ip = ftp_ip
self._ftp_port = ftp_port self._ftp_port = ftp_port
self._ftp = ftplib.FTP() self._ftp = ftplib.FTP()
...@@ -125,39 +184,26 @@ class FTPMonitor(Monitor): ...@@ -125,39 +184,26 @@ class FTPMonitor(Monitor):
self._ftp.connect(ftp_ip, ftp_port) self._ftp.connect(ftp_ip, ftp_port)
self._ftp.login(username, password) self._ftp.login(username, password)
def _exist_remote_donefile(self): def _exist_remote_file(self, path, filename):
import ftplib import ftplib
try: try:
donefile_path = '{}/{}'.format(self._remote_path, filepath = os.path.join(path, filename)
self._remote_donefile_name) timestamp = self._ftp.voidcmd('MDTM {}'.format(filepath))[4:].strip(
timestamp = self._ftp.voidcmd('MDTM {}'.format(donefile_path))[ )
4:].strip()
return [True, timestamp] return [True, timestamp]
except ftplib.error_perm: except ftplib.error_perm:
return [False, None] return [False, None]
def _pull_remote_model(self): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
cmd = 'wget -nH -r -P {} ftp://{}:{}/{}/{} &> /dev/null'.format( filepath = os.path.join(remote_path, dirname)
self._local_tmp_dir, self._ftp_ip, self._ftp_port, cmd = 'wget -nH -r -P {} ftp://{}:{}/{} &> /dev/null'.format(
self._remote_path, self._remote_model_name) local_tmp_path, self._ftp_ip, self._ftp_port, filepath)
if os.system(cmd) != 0:
raise Exception('pull remote model failed.')
def _update_local_model(self):
cmd = 'cp -r {}/{}/* {}/{}'.format(
self._local_tmp_dir, self._remote_model_name, self._local_path,
self._local_model_name)
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('update local model failed.') raise Exception('pull remote dir failed.')
def _update_local_donefile(self):
cmd = 'touch {}/{}/{}'.format(self._local_path, self._local_model_name,
self._local_donefile_name)
if os.system(cmd) != 0:
raise Exception('update local donefile failed.')
def parse_args(): def parse_args():
''' parse args. '''
parser = argparse.ArgumentParser(description="Monitor") parser = argparse.ArgumentParser(description="Monitor")
parser.add_argument( parser.add_argument(
"--type", type=str, required=True, help="Type of remote server") "--type", type=str, required=True, help="Type of remote server")
...@@ -183,30 +229,37 @@ def parse_args(): ...@@ -183,30 +229,37 @@ def parse_args():
required=True, required=True,
help="Local donfile name(fluid_time_file in model file)") help="Local donfile name(fluid_time_file in model file)")
parser.add_argument( parser.add_argument(
"--local_tmp_dir", type=str, default='tmp', help="Local tmp dir") "--local_tmp_path", type=str, default='tmp', help="Local tmp path")
parser.add_argument( parser.add_argument(
"--interval", type=int, default=10, help="Time interval") "--interval", type=int, default=10, help="Time interval")
parser.add_argument("--ftp_ip", type=str, help="Ip the ftp") parser.add_argument("--ftp_ip", type=str, help="Ip the ftp")
parser.add_argument("--ftp_port", type=int, help="Port the ftp") parser.add_argument("--ftp_port", type=int, help="Port the ftp")
parser.add_argument(
"--hdfs_bin", type=str, default='hdfs', help="Hdfs binary file")
return parser.parse_args() return parser.parse_args()
def start_ftp_monitor(): def get_monitor(mtype):
args = parse_args() if mtype == 'ftp':
obj = FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval) return FTPMonitor(args.ftp_ip, args.ftp_port, interval=args.interval)
obj.set_remote_path(args.remote_path) elif mtype == 'hdfs':
obj.set_remote_model_name(args.remote_model_name) return HDFSMonitor(args.hdfs_bin, interval=args.interval)
obj.set_remote_donefile_name(args.remote_donefile_name) else:
obj.set_local_path(args.local_path) raise Exception('unsupport type.')
obj.set_local_model_name(args.local_model_name)
obj.set_local_donefile_name(args.local_donefile_name)
obj.set_local_tmp_dir(args.local_tmp_dir) def start_monitor(monitor, args):
obj.run() monitor.set_remote_path(args.remote_path)
monitor.set_remote_model_name(args.remote_model_name)
monitor.set_remote_donefile_name(args.remote_donefile_name)
monitor.set_local_path(args.local_path)
monitor.set_local_model_name(args.local_model_name)
monitor.set_local_donefile_name(args.local_donefile_name)
monitor.set_local_tmp_path(args.local_tmp_path)
monitor.run()
if __name__ == "__main__": if __name__ == "__main__":
args = parse_args() args = parse_args()
if args.type == 'ftp': monitor = get_monitor(args.type)
start_ftp_monitor() start_monitor(monitor, args)
else:
raise Exception('unsupport type.')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册