提交 55b17479 编写于 作者: M MRXLT

Merge remote-tracking branch 'upstream/develop' into web-service

sync
...@@ -27,6 +27,10 @@ namespace predictor { ...@@ -27,6 +27,10 @@ namespace predictor {
} }
#endif #endif
#ifdef WITH_GPU
#define USE_PTHREAD
#endif
#ifdef USE_PTHREAD #ifdef USE_PTHREAD
#define THREAD_T pthread_t #define THREAD_T pthread_t
......
...@@ -91,6 +91,7 @@ class Monitor(object): ...@@ -91,6 +91,7 @@ class Monitor(object):
model_name)) model_name))
return model_name return model_name
tar_model_path = os.path.join(local_tmp_path, model_name) tar_model_path = os.path.join(local_tmp_path, model_name)
_LOGGER.info("try to unpack remote file({})".format(tar_model_path))
if not tarfile.is_tarfile(tar_model_path): if not tarfile.is_tarfile(tar_model_path):
raise Exception('not a tar packaged file type. {}'.format( raise Exception('not a tar packaged file type. {}'.format(
self._check_param_help('remote_model_name', model_name))) self._check_param_help('remote_model_name', model_name)))
...@@ -105,10 +106,11 @@ class Monitor(object): ...@@ -105,10 +106,11 @@ class Monitor(object):
self._check_param_help('local_tmp_path', local_tmp_path))) self._check_param_help('local_tmp_path', local_tmp_path)))
finally: finally:
os.remove(tar_model_path) os.remove(tar_model_path)
_LOGGER.debug('remove packed file({}).'.format(model_name)) _LOGGER.debug('remove packed file({}).'.format(tar_model_path))
_LOGGER.info('using unpacked filename: {}.'.format( _LOGGER.info('using unpacked filename: {}.'.format(
unpacked_filename)) unpacked_filename))
if not os.path.exists(unpacked_filename): if not os.path.exists(
os.path.join(local_tmp_path, unpacked_filename)):
raise Exception('file not exist. {}'.format( raise Exception('file not exist. {}'.format(
self._check_param_help('unpacked_filename', self._check_param_help('unpacked_filename',
unpacked_filename))) unpacked_filename)))
...@@ -124,13 +126,14 @@ class Monitor(object): ...@@ -124,13 +126,14 @@ class Monitor(object):
'_local_tmp_path', '_interval' '_local_tmp_path', '_interval'
] ]
self._print_params(params) self._print_params(params)
if not os.path.exists(self._local_tmp_path): local_tmp_path = os.path.join(self._local_path, self._local_tmp_path)
_LOGGER.info('mkdir: {}'.format(self._local_tmp_path)) _LOGGER.info('local_tmp_path: {}'.format(local_tmp_path))
os.makedirs(self._local_tmp_path) if not os.path.exists(local_tmp_path):
_LOGGER.info('mkdir: {}'.format(local_tmp_path))
os.makedirs(local_tmp_path)
while True: while True:
[flag, timestamp] = self._exist_remote_file( [flag, timestamp] = self._exist_remote_file(
self._remote_path, self._remote_donefile_name, self._remote_path, self._remote_donefile_name, local_tmp_path)
self._local_tmp_path)
if flag: if flag:
if self._remote_donefile_timestamp is None or \ if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp: timestamp != self._remote_donefile_timestamp:
...@@ -139,15 +142,15 @@ class Monitor(object): ...@@ -139,15 +142,15 @@ class Monitor(object):
self._remote_donefile_timestamp = timestamp self._remote_donefile_timestamp = timestamp
self._pull_remote_dir(self._remote_path, self._pull_remote_dir(self._remote_path,
self._remote_model_name, self._remote_model_name,
self._local_tmp_path) local_tmp_path)
_LOGGER.info('pull remote model({}).'.format( _LOGGER.info('pull remote model({}).'.format(
self._remote_model_name)) self._remote_model_name))
unpacked_filename = self._decompress_model_file( unpacked_filename = self._decompress_model_file(
self._local_tmp_path, self._remote_model_name, local_tmp_path, self._remote_model_name,
self._unpacked_filename) self._unpacked_filename)
self._update_local_model( self._update_local_model(local_tmp_path, unpacked_filename,
self._local_tmp_path, unpacked_filename, self._local_path,
self._local_path, self._local_model_name) self._local_model_name)
_LOGGER.info('update local model({}).'.format( _LOGGER.info('update local model({}).'.format(
self._local_model_name)) self._local_model_name))
self._update_local_donefile(self._local_path, self._update_local_donefile(self._local_path,
...@@ -220,7 +223,12 @@ class HadoopMonitor(Monitor): ...@@ -220,7 +223,12 @@ class HadoopMonitor(Monitor):
local_dirpath = os.path.join(local_tmp_path, dirname) local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath): if os.path.exists(local_dirpath):
_LOGGER.info('remove old temporary model file({}).'.format(dirname)) _LOGGER.info('remove old temporary model file({}).'.format(dirname))
if self._unpacked_filename is None:
# the remote file is model folder.
shutil.rmtree(local_dirpath) shutil.rmtree(local_dirpath)
else:
# the remote file is a packed model file
os.remove(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix, cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
remote_dirpath, local_dirpath) remote_dirpath, local_dirpath)
...@@ -301,8 +309,8 @@ class FTPMonitor(Monitor): ...@@ -301,8 +309,8 @@ class FTPMonitor(Monitor):
os.path.join(remote_path, remote_dirname), name, os.path.join(remote_path, remote_dirname), name,
os.path.join(local_tmp_path, remote_dirname), overwrite) os.path.join(local_tmp_path, remote_dirname), overwrite)
else: else:
self._download_remote_file(remote_dirname, name, self._download_remote_file(remote_dirpath, name,
local_tmp_path, overwrite) local_dirpath, overwrite)
except ftplib.error_perm: except ftplib.error_perm:
_LOGGER.debug('{} is file.'.format(remote_dirname)) _LOGGER.debug('{} is file.'.format(remote_dirname))
self._download_remote_file(remote_path, remote_dirname, self._download_remote_file(remote_path, remote_dirname,
...@@ -325,17 +333,17 @@ class GeneralMonitor(Monitor): ...@@ -325,17 +333,17 @@ class GeneralMonitor(Monitor):
def _get_local_file_timestamp(self, filename): def _get_local_file_timestamp(self, filename):
return os.path.getmtime(filename) return os.path.getmtime(filename)
def _exist_remote_file(self, path, filename, local_tmp_path): def _exist_remote_file(self, remote_path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename) remote_filepath = os.path.join(remote_path, filename)
url = '{}/{}'.format(self._general_host, remote_filepath) url = '{}/{}'.format(self._general_host, remote_filepath)
_LOGGER.debug('remote file url: {}'.format(url)) _LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url) # only for check donefile, which is not a folder.
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd)) _LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
_LOGGER.debug('remote file({}) not exist.'.format(filename)) _LOGGER.debug('remote file({}) not exist.'.format(remote_filepath))
return [False, None] return [False, None]
else: else:
_LOGGER.debug('download remote file({}).'.format(filename))
timestamp = self._get_local_file_timestamp( timestamp = self._get_local_file_timestamp(
os.path.join(local_tmp_path, filename)) os.path.join(local_tmp_path, filename))
return [True, timestamp] return [True, timestamp]
...@@ -344,7 +352,13 @@ class GeneralMonitor(Monitor): ...@@ -344,7 +352,13 @@ class GeneralMonitor(Monitor):
remote_dirpath = os.path.join(remote_path, dirname) remote_dirpath = os.path.join(remote_path, dirname)
url = '{}/{}'.format(self._general_host, remote_dirpath) url = '{}/{}'.format(self._general_host, remote_dirpath)
_LOGGER.debug('remote file url: {}'.format(url)) _LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url) if self._unpacked_filename is None:
# the remote file is model folder.
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(
os.path.join(local_tmp_path, dirname), url)
else:
# the remote file is a packed model file
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd)) _LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0: if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format( raise Exception('pull remote dir failed. {}'.format(
...@@ -352,7 +366,11 @@ class GeneralMonitor(Monitor): ...@@ -352,7 +366,11 @@ class GeneralMonitor(Monitor):
def parse_args(): def parse_args():
''' parse args. ''' """ parse args.
Returns:
parser.parse_args().
"""
parser = argparse.ArgumentParser(description="Monitor") parser = argparse.ArgumentParser(description="Monitor")
parser.add_argument( parser.add_argument(
"--type", type=str, default='general', help="Type of remote server") "--type", type=str, default='general', help="Type of remote server")
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册