提交 ea5586a8 编写于 作者: B barrierye

fix bug in FTPMonitor

上级 9d24d14f
...@@ -142,12 +142,12 @@ class AFSMonitor(Monitor): ...@@ -142,12 +142,12 @@ class AFSMonitor(Monitor):
''' AFS Monitor(by hadoop-client). ''' ''' AFS Monitor(by hadoop-client). '''
def __init__(self, def __init__(self,
hadoop_bin_path, hadoop_bin,
hadoop_host=None, hadoop_host=None,
hadoop_ugi=None, hadoop_ugi=None,
interval=10): interval=10):
super(AFSMonitor, self).__init__(interval) super(AFSMonitor, self).__init__(interval)
self._hadoop_bin = hadoop_bin_path self._hadoop_bin = hadoop_bin
self._hadoop_host = hadoop_host self._hadoop_host = hadoop_host
self._hadoop_ugi = hadoop_ugi self._hadoop_ugi = hadoop_ugi
self._cmd_prefix = '{} fs '.format(self._hadoop_bin) self._cmd_prefix = '{} fs '.format(self._hadoop_bin)
...@@ -217,45 +217,43 @@ class FTPMonitor(Monitor): ...@@ -217,45 +217,43 @@ class FTPMonitor(Monitor):
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, path, filename, local_tmp_path):
import ftplib import ftplib
try: try:
filepath = os.path.join(path, filename) self._ftp.cwd(path)
timestamp = self._ftp.voidcmd('MDTM {}'.format(filepath))[4:].strip( timestamp = self._ftp.voidcmd('MDTM {}'.format(filename))[4:].strip(
) )
return [True, timestamp] return [True, timestamp]
except ftplib.error_perm: except ftplib.error_perm:
return [False, None] return [False, None]
def _download_remote_files(remote_path, def _download_remote_files(self,
remote_path,
remote_dirname, remote_dirname,
local_tmp_path, local_tmp_path,
overwrite=True): overwrite=True):
try: local_dirpath = os.path.join(local_tmp_path, remote_dirname)
remote_dirpath = os.path.join(remote_path, remote_dirname) if not os.path.exists(local_dirpath):
self._ftp.cwd(remote_dirpath) os.mkdir(local_dirpath)
os.mkdir(os.path.join(local_tmp_path, remote_dirname))
except OSError: remote_dirpath = os.path.join(remote_path, remote_dirname)
# folder already exists at the local_tmp_path output = []
pass self._ftp.cwd(remote_dirpath)
except ftplib.error_perm: self._ftp.dir(output.append)
raise Exception('remote_path({}) not exist.'.format(remote_path)) for line in output:
[attr, _, _, _, _, _, _, _, name] = line.split()
filelist = [x for x in self_ftp.mlsd()] if attr[0] == 'd':
for file in filelist: self._download_remote_files(
if file[1]['type'] == 'file': os.path.join(remote_path, remote_dirname), name,
fullpath = os.path.join(local_tmp_path, remote_dirname, file[0]) os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
fullpath = os.path.join(local_tmp_path, remote_dirname, name)
if not overwrite and os.path.isfile(fullpath): if not overwrite and os.path.isfile(fullpath):
continue continue
else: else:
with open(fullpath, 'wb') as f: with open(fullpath, 'wb') as f:
self._ftp.retrbinary('RETR ' + file[0], f.write) self._ftp.cwd(remote_dirpath)
elif file[1]['type'] == 'dir': self._ftp.retrbinary('RETR {}'.format(name), f.write)
self._download_remote_files(
os.path.join(remote_path, remote_dirname), file[0],
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
print('Unknown type: ' + file[1]['type'])
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path): def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
self._exist_remote_file( self._download_remote_files(
remote_path, dirname, local_tmp_path, overwrite=True) remote_path, dirname, local_tmp_path, overwrite=True)
...@@ -335,7 +333,7 @@ def parse_args(): ...@@ -335,7 +333,7 @@ def parse_args():
"--ftp_password", type=str, default='', help="Password of ftp") "--ftp_password", type=str, default='', help="Password of ftp")
# afs monitor # afs monitor
parser.add_argument( parser.add_argument(
"--hadoop_bin_path", type=str, help="Hadoop_bin_path for afs") "--hadoop_bin", type=str, help="Hadoop_bin_path for afs")
parser.add_argument( parser.add_argument(
"--hadoop_host", type=str, default=None, help="Hadoop_host for afs") "--hadoop_host", type=str, default=None, help="Hadoop_host for afs")
parser.add_argument( parser.add_argument(
...@@ -365,7 +363,7 @@ def get_monitor(mtype): ...@@ -365,7 +363,7 @@ def get_monitor(mtype):
return GeneralMonitor(args.general_host, interval=args.interval) return GeneralMonitor(args.general_host, interval=args.interval)
elif mtype == 'afs': elif mtype == 'afs':
return AFSMonitor( return AFSMonitor(
args.hadoop_bin_path, args.hadoop_bin,
args.hadoop_host, args.hadoop_host,
args.hadoop_ugi, args.hadoop_ugi,
interval=args.interval) interval=args.interval)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册