提交 2f65fba2 编写于 作者: B barrierye

fix AFSMonitor && add friendly info

上级 b0c2bcd7
...@@ -24,6 +24,9 @@ import commands ...@@ -24,6 +24,9 @@ import commands
import datetime import datetime
import shutil import shutil
import tarfile import tarfile
import logging
_LOGGER = logging.getLogger(__name__)
class Monitor(object): class Monitor(object):
...@@ -71,31 +74,28 @@ class Monitor(object): ...@@ -71,31 +74,28 @@ class Monitor(object):
return "Please check the {}({}) parameter.".format(param_name, return "Please check the {}({}) parameter.".format(param_name,
param_value) param_value)
def _check_params(self): def _check_params(self, params):
if self._remote_path is None: for param in params:
raise Exception('remote_path not set.') if getattr(self, param, None) is None:
if self._remote_model_name is None: raise Exception('{} not set.'.format(param))
raise Exception('remote_model_name not set.')
if self._remote_donefile_name is None: def _print_params(self, params_name):
raise Exception('remote_donefile_name not set.') self._check_params(params_name)
if self._local_model_name is None: for name in params_name:
raise Exception('local_model_name not set.') _LOGGER.info('{}: {}'.format(name, getattr(self, name)))
if self._local_path is None:
raise Exception('local_path not set.')
if self._local_timestamp_file is None:
raise Exception('local_timestamp_file not set.')
if self._local_tmp_path is None:
raise Exception('local_tmp_path not set.')
def _decompress_model_file(self, local_tmp_path, model_name, def _decompress_model_file(self, local_tmp_path, model_name,
unpacked_filename): unpacked_filename):
if unpacked_filename is None: if unpacked_filename is None:
_LOGGER.debug('remote file({}) is already unpacked.'.format(
model_name))
return model_name return model_name
tar_model_path = os.path.join(local_tmp_path, model_name) tar_model_path = os.path.join(local_tmp_path, model_name)
if not tarfile.is_tarfile(tar_model_path): if not tarfile.is_tarfile(tar_model_path):
raise Exception('not a tar packaged file type. {}'.format( raise Exception('not a tar packaged file type. {}'.format(
self._check_param_help('remote_model_name', model_name))) self._check_param_help('remote_model_name', model_name)))
try: try:
_LOGGER.info('unpack remote file({}).'.format(model_name))
tar = tarfile.open(tar_model_path) tar = tarfile.open(tar_model_path)
tar.extractall(local_tmp_path) tar.extractall(local_tmp_path)
tar.close() tar.close()
...@@ -105,6 +105,9 @@ class Monitor(object): ...@@ -105,6 +105,9 @@ class Monitor(object):
self._check_param_help('local_tmp_path', local_tmp_path))) self._check_param_help('local_tmp_path', local_tmp_path)))
finally: finally:
os.remove(tar_model_path) os.remove(tar_model_path)
_LOGGER.debug('remove packed file({}).'.format(model_name))
_LOGGER.info('using unpacked filename: {}.'.format(
unpacked_filename))
if not os.path.exists(unpacked_filename): if not os.path.exists(unpacked_filename):
raise Exception('file not exist. {}'.format( raise Exception('file not exist. {}'.format(
self._check_param_help('unpacked_filename', self._check_param_help('unpacked_filename',
...@@ -115,8 +118,14 @@ class Monitor(object): ...@@ -115,8 +118,14 @@ class Monitor(object):
''' '''
Monitor the remote model by polling and update the local model. Monitor the remote model by polling and update the local model.
''' '''
self._check_params() params = [
'_remote_path', '_remote_model_name', '_remote_donefile_name',
'_local_model_name', '_local_path', '_local_timestamp_file',
'_local_tmp_path', '_interval'
]
self._print_params(params)
if not os.path.exists(self._local_tmp_path): if not os.path.exists(self._local_tmp_path):
_LOGGER.info('mkdir: {}'.format(self._local_tmp_path))
os.makedirs(self._local_tmp_path) os.makedirs(self._local_tmp_path)
while True: while True:
[flag, timestamp] = self._exist_remote_file( [flag, timestamp] = self._exist_remote_file(
...@@ -125,30 +134,31 @@ class Monitor(object): ...@@ -125,30 +134,31 @@ class Monitor(object):
if flag: if flag:
if self._remote_donefile_timestamp is None or \ if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp: timestamp != self._remote_donefile_timestamp:
_LOGGER.info('doneilfe({}) changed.'.format(
self._remote_donefile_name))
self._remote_donefile_timestamp = timestamp self._remote_donefile_timestamp = timestamp
self._pull_remote_dir(self._remote_path, self._pull_remote_dir(self._remote_path,
self._remote_model_name, self._remote_model_name,
self._local_tmp_path) self._local_tmp_path)
print('{} [INFO] pull remote model'.format( _LOGGER.info('pull remote model({}).'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) self._remote_model_name))
unpacked_filename = self._decompress_model_file( unpacked_filename = self._decompress_model_file(
self._local_tmp_path, self._remote_model_name, self._local_tmp_path, self._remote_model_name,
self._unpacked_filename) self._unpacked_filename)
self._update_local_model( self._update_local_model(
self._local_tmp_path, unpacked_filename, self._local_tmp_path, unpacked_filename,
self._local_path, self._local_model_name) self._local_path, self._local_model_name)
print('{} [INFO] update model'.format(datetime.datetime.now( _LOGGER.info('update local model({}).'.format(
).strftime('%Y-%m-%d %H:%M:%S'))) self._local_model_name))
self._update_local_donefile(self._local_path, self._update_local_donefile(self._local_path,
self._local_model_name, self._local_model_name,
self._local_timestamp_file) self._local_timestamp_file)
print('{} [INFO] update local donefile'.format( _LOGGER.info('update model timestamp({}).'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) self._local_timestamp_file))
else: else:
print('{} [INFO] no donefile.'.format(datetime.datetime.now( _LOGGER.info('remote({}) has no donefile.'.format(
).strftime('%Y-%m-%d %H:%M:%S'))) self._remote_path))
print('{} [INFO] sleep {}s'.format(datetime.datetime.now().strftime( _LOGGER.info('sleep {}s.'.format(self._interval))
'%Y-%m-%d %H:%M:%S'), self._interval))
time.sleep(self._interval) time.sleep(self._interval)
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
...@@ -162,6 +172,7 @@ class Monitor(object): ...@@ -162,6 +172,7 @@ class Monitor(object):
tmp_model_path = os.path.join(local_tmp_path, remote_model_name) tmp_model_path = os.path.join(local_tmp_path, remote_model_name)
local_model_path = os.path.join(local_path, local_model_name) local_model_path = os.path.join(local_path, local_model_name)
cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path) cmd = 'cp -r {}/* {}'.format(tmp_model_path, local_model_path)
_LOGGER.debug('update model cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('update local model failed.') raise Exception('update local model failed.')
...@@ -170,6 +181,7 @@ class Monitor(object): ...@@ -170,6 +181,7 @@ class Monitor(object):
donefile_path = os.path.join(local_path, local_model_name, donefile_path = os.path.join(local_path, local_model_name,
local_timestamp_file) local_timestamp_file)
cmd = 'touch {}'.format(donefile_path) cmd = 'touch {}'.format(donefile_path)
_LOGGER.debug('update timestamp cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('update local donefile failed.') raise Exception('update local donefile failed.')
...@@ -186,17 +198,21 @@ class AFSMonitor(Monitor): ...@@ -186,17 +198,21 @@ class AFSMonitor(Monitor):
self._hadoop_bin = hadoop_bin self._hadoop_bin = hadoop_bin
self._hadoop_host = hadoop_host self._hadoop_host = hadoop_host
self._hadoop_ugi = hadoop_ugi self._hadoop_ugi = hadoop_ugi
self._print_params(['_hadoop_bin', '_hadoop_host', '_hadoop_ugi'])
self._cmd_prefix = '{} fs '.format(self._hadoop_bin) self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
if not self._hadoop_host and not self._hadoop_ugi: if self._hadoop_host and self._hadoop_ugi:
self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format( self._cmd_prefix += '-D fs.default.name={} -D hadoop.job.ugi={} '.format(
self._hadoop_host, self._hadoop_ugi) self._hadoop_host, self._hadoop_ugi)
_LOGGER.info('AFS prefix cmd: {}'.format(self._cmd_prefix))
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(path, filename)
cmd = '{} -ls {}'.format(self._cmd_prefix, remote_filepath) cmd = '{} -ls {} 2>/dev/null'.format(self._cmd_prefix, remote_filepath)
_LOGGER.debug('check cmd: {}'.format(cmd))
[status, output] = commands.getstatusoutput(cmd) [status, output] = commands.getstatusoutput(cmd)
_LOGGER.debug('resp: {}'.format(output))
if status == 0: if status == 0:
[_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1] [_, _, _, _, _, mdate, mtime, _] = output.split('\n')[-1].split()
timestr = mdate + mtime timestr = mdate + mtime
return [True, timestr] return [True, timestr]
else: else:
...@@ -206,10 +222,12 @@ class AFSMonitor(Monitor): ...@@ -206,10 +222,12 @@ class AFSMonitor(Monitor):
# remove old file before pull remote dir # remove old file before pull remote dir
local_dirpath = os.path.join(local_tmp_path, dirname) local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath): if os.path.exists(local_dirpath):
_LOGGER.info('remove old temporary model file({}).'.format(dirname))
shutil.rmtree(local_dirpath) shutil.rmtree(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath, cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
local_dirpath) remote_dirpath, local_dirpath)
_LOGGER.debug('pull cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format( raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname))) self._check_param_help('remote_model_name', dirname)))
...@@ -218,15 +236,19 @@ class AFSMonitor(Monitor): ...@@ -218,15 +236,19 @@ class AFSMonitor(Monitor):
class HDFSMonitor(Monitor): class HDFSMonitor(Monitor):
''' HDFS Monitor. ''' ''' HDFS Monitor. '''
def __init__(self, bin_path, interval=10): def __init__(self, hdfs_bin, interval=10):
super(HDFSMonitor, self).__init__(interval) super(HDFSMonitor, self).__init__(interval)
self._hdfs_bin_path = bin_path self._hdfs_bin = hdfs_bin
self._print_params(['_hdfs_bin'])
self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path) self._prefix_cmd = '{} dfs '.format(self._hdfs_bin_path)
_LOGGER.info('HDFS prefix cmd: {}'.format(self._cmd_prefix))
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(path, filename)
cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath) cmd = '{} -stat "%Y" {}'.format(self._prefix_cmd, remote_filepath)
_LOGGER.debug('check cmd: {}'.format(cmd))
[status, timestamp] = commands.getstatusoutput(cmd) [status, timestamp] = commands.getstatusoutput(cmd)
_LOGGER.debug('resp: {}'.format(output))
if status == 0: if status == 0:
return [True, timestamp] return [True, timestamp]
else: else:
...@@ -236,6 +258,7 @@ class HDFSMonitor(Monitor): ...@@ -236,6 +258,7 @@ class HDFSMonitor(Monitor):
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath, cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
local_tmp_path) local_tmp_path)
_LOGGER.debug('pull cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format( raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname))) self._check_param_help('remote_model_name', dirname)))
...@@ -248,18 +271,25 @@ class FTPMonitor(Monitor): ...@@ -248,18 +271,25 @@ class FTPMonitor(Monitor):
super(FTPMonitor, self).__init__(interval) super(FTPMonitor, self).__init__(interval)
import ftplib import ftplib
self._ftp = ftplib.FTP() self._ftp = ftplib.FTP()
self._ftp.connect(host, port) self._ftp_host = host
self._ftp.login(username, password) self._ftp_port = port
self._ftp_url = 'ftp://{}:{}/'.format(host, port) self._ftp_username = username
self._ftp_password = password
self._ftp.connect(self._ftp_host, self._ftp_port)
self._ftp.login(self._ftp_username, self._ftp_password)
self._print_params(
['_ftp_host', '_ftp_port', '_ftp_username', '_ftp_password'])
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
import ftplib import ftplib
try: try:
_LOGGER.debug('cwd: {}'.format(path))
self._ftp.cwd(path) self._ftp.cwd(path)
timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip( timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
) )
return [True, timestamp] return [True, timestamp]
except ftplib.error_perm: except ftplib.error_perm:
_LOGGER.debug('remote file({}) not exist.'.format(filename))
return [False, None] return [False, None]
def _download_remote_file(self, def _download_remote_file(self,
...@@ -272,7 +302,9 @@ class FTPMonitor(Monitor): ...@@ -272,7 +302,9 @@ class FTPMonitor(Monitor):
return return
else: else:
with open(local_fullpath, 'wb') as f: with open(local_fullpath, 'wb') as f:
_LOGGER.debug('cwd: {}'.format(path))
self._ftp.cwd(remote_path) self._ftp.cwd(remote_path)
_LOGGER.debug('download remote file({})'.format(remote_path))
self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write) self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)
def _download_remote_files(self, def _download_remote_files(self,
...@@ -284,10 +316,13 @@ class FTPMonitor(Monitor): ...@@ -284,10 +316,13 @@ class FTPMonitor(Monitor):
remote_dirpath = os.path.join(remote_path, remote_dirname) remote_dirpath = os.path.join(remote_path, remote_dirname)
# Check whether remote_dirpath is a file or a folder # Check whether remote_dirpath is a file or a folder
try: try:
_LOGGER.debug('cwd: {}'.format(remote_dirpath))
self._ftp.cwd(remote_dirpath) self._ftp.cwd(remote_dirpath)
_LOGGER.debug('{} is folder.'.format(remote_dirname))
local_dirpath = os.path.join(local_tmp_path, remote_dirname) local_dirpath = os.path.join(local_tmp_path, remote_dirname)
if not os.path.exists(local_dirpath): if not os.path.exists(local_dirpath):
_LOGGER.info('mkdir: {}'.format(local_dirpath))
os.mkdir(local_dirpath) os.mkdir(local_dirpath)
output = [] output = []
...@@ -302,6 +337,7 @@ class FTPMonitor(Monitor): ...@@ -302,6 +337,7 @@ class FTPMonitor(Monitor):
self._download_remote_file(remote_dirname, name, self._download_remote_file(remote_dirname, name,
local_tmp_path, overwrite) local_tmp_path, overwrite)
except ftplib.error_perm: except ftplib.error_perm:
_LOGGER.debug('{} is file.'.format(remote_dirname))
self._download_remote_file(remote_path, remote_dirname, self._download_remote_file(remote_path, remote_dirname,
local_tmp_path, overwrite) local_tmp_path, overwrite)
return return
...@@ -316,26 +352,33 @@ class GeneralMonitor(Monitor): ...@@ -316,26 +352,33 @@ class GeneralMonitor(Monitor):
def __init__(self, host, interval=10): def __init__(self, host, interval=10):
super(GeneralMonitor, self).__init__(interval) super(GeneralMonitor, self).__init__(interval)
self._host = host self._general_host = host
self._print_params(['_general_host'])
def _get_local_file_timestamp(self, filename): def _get_local_file_timestamp(self, filename):
return os.path.getmtime(filename) return os.path.getmtime(filename)
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(path, filename)
url = '{}/{}'.format(self._host, remote_filepath) url = '{}/{}'.format(self._general_host, remote_filepath)
_LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url) cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
_LOGGER.debug('remote file({}) not exist.'.format(filename))
return [False, None] return [False, None]
else: else:
_LOGGER.debug('download remote file({}).'.format(filename))
timestamp = self._get_local_file_timestamp( timestamp = self._get_local_file_timestamp(
os.path.join(local_tmp_path, filename)) os.path.join(local_tmp_path, filename))
return [True, timestamp] return [True, timestamp]
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
url = '{}/{}'.format(self._host, remote_dirpath) url = '{}/{}'.format(self._general_host, remote_dirpath)
_LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url) cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format( raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname))) self._check_param_help('remote_model_name', dirname)))
...@@ -445,6 +488,10 @@ def start_monitor(monitor, args): ...@@ -445,6 +488,10 @@ def start_monitor(monitor, args):
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M',
level=logging.INFO)
args = parse_args() args = parse_args()
monitor = get_monitor(args.type) monitor = get_monitor(args.type)
start_monitor(monitor, args) start_monitor(monitor, args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册