未验证 提交 c0a91b45 编写于 作者: B barriery 提交者: GitHub

Merge pull request #7 from barrierye/add-hdfs-monitor

add packaged option
......@@ -13,17 +13,18 @@ Paddle Serving目前支持下面几种类型的远端监控Monitor:
| FTP | 远端为FTP,可以通过用户名、密码访问 | `ftp_host` FTP host<br>`ftp_port` FTP port<br>`ftp_username` FTP username,默认为空<br>`ftp_password` FTP password,默认为空 |
| AFS | 远端为AFS,通过Hadoop-client执行相关命令 | `hadoop_bin` Hadoop二进制的路径<br>`hadoop_host` AFS host,默认为空<br>`hadoop_ugi` AFS ugi,默认为空 |
| Monitor通用选项 | 描述 |
| :--------------------: | :----------------------------------------------------------: |
| `type` | 指定Monitor类型 |
| `remote_path` | 指定远端的基础路径 |
| `remote_model_name` | 指定远端需要拉取的模型名 |
| `remote_donefile_name` | 指定远端标志模型更新完毕的donefile文件名 |
| `local_path` | 指定本地工作路径 |
| `local_model_name` | 指定本地模型名 |
| `local_timestamp_file` | 指定本地用于热加载的时间戳文件,该文件被认为在`local_path/local_model_name`下。默认为`fluid_time_file` |
| `local_tmp_path` | 指定本地存放临时文件的文件夹路径。默认为`_serving_monitor_tmp`,若不存在则自动创建 |
| `interval` | 指定轮询间隔时间 |
| Monitor通用选项 | 描述 | 默认值 |
| :--------------------: | :----------------------------------------------------------: | :------------------------------------------: |
| `type` | 指定Monitor类型 | 无 |
| `remote_path` | 指定远端的基础路径 | 无 |
| `remote_model_name` | 指定远端需要拉取的模型名 | 无 |
| `remote_donefile_name` | 指定远端标志模型更新完毕的donefile文件名 | 无 |
| `local_path` | 指定本地工作路径 | 无 |
| `local_model_name` | 指定本地模型名 | 无 |
| `local_timestamp_file` | 指定本地用于热加载的时间戳文件,该文件被认为在`local_path/local_model_name`下。 | `fluid_time_file` |
| `local_tmp_path` | 指定本地存放临时文件的文件夹路径。 | `_serving_monitor_tmp`(若不存在则自动创建) |
| `interval` | 指定轮询间隔时间。 | 10(秒) |
| `unpacked_filename` | Monitor支持tarfile打包的远程模型。如果远程模型是打包格式,则需要设置该选项来告知Monitor解压后的文件名。 | None |
下面通过HDFSMonitor示例来展示Paddle Serving的模型热加载功能。
......@@ -43,7 +44,9 @@ Paddle Serving目前支持下面几种类型的远端监控Monitor:
```python
import os
import sys
import time
import tarfile
import paddle
import paddle.fluid as fluid
import paddle_serving_client.io as serving_io
......@@ -73,22 +76,38 @@ exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
def push_to_hdfs(local_file_path, remote_path):
hdfs_bin = 'hdfs'
hdfs_bin = '/hadoop-3.1.2/bin/hdfs'
os.system('{} dfs -put -f {} {}'.format(
hdfs_bin, local_file_path, remote_path))
name = "uci_housing"
for pass_id in range(30):
for data_train in train_reader():
avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data_train),
fetch_list=[avg_loss])
time.sleep(60) # Simulate the production model every other period of time
serving_io.save_model("uci_housing_model", "uci_housing_client",
# Simulate the production model every other period of time
time.sleep(60)
model_name = "{}_model".format(name)
client_name = "{}_client".format(name)
serving_io.save_model(model_name, client_name,
{"x": x}, {"price": y_predict},
fluid.default_main_program())
push_to_hdfs('uci_housing_model', '/')
os.system('touch donefile')
push_to_hdfs('donefile', '/')
# Package model
tar_name = "{}.tar.gz".format(name)
tar = tarfile.open(tar_name, 'w:gz')
tar.add(model_name)
tar.close()
# Push packaged model file to hdfs
push_to_hdfs(tar_name, '/')
# Generate donefile
donefile_name = 'donefile'
os.system('touch {}'.format(donefile_name))
# Push donefile to hdfs
push_to_hdfs(donefile_name, '/')
```
hdfs上的文件如下列所示:
......@@ -96,8 +115,8 @@ hdfs上的文件如下列所示:
```bash
# hdfs dfs -ls /
Found 2 items
-rw-r--r-- 1 root supergroup 0 2020-03-30 09:27 /donefile
drwxr-xr-x - root supergroup 0 2020-03-30 09:27 /uci_housing_model
-rw-r--r-- 1 root supergroup 0 2020-04-02 02:54 /donefile
-rw-r--r-- 1 root supergroup 2101 2020-04-02 02:54 /uci_housing.tar.gz
```
### 服务端加载模型
......@@ -126,9 +145,10 @@ python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --po
```shell
python -m paddle_serving_server.monitor \
--type='hdfs' --hdfs_bin='/hadoop-3.1.2/bin/hdfs' --remote_path='/' \
--remote_model_name='uci_housing_model' --remote_donefile_name='donefile' \
--remote_model_name='uci_housing.tar.gz' --remote_donefile_name='donefile' \
--local_path='.' --local_model_name='uci_housing_model' \
--local_timestamp_file='fluid_time_file' --local_tmp_path='_tmp'
--local_timestamp_file='fluid_time_file' --local_tmp_path='_tmp' \
--unpacked_filename='uci_housing_model'
```
上面代码通过轮询方式监控远程HDFS地址`/`的时间戳文件`/donefile`,当时间戳变更则认为远程模型已经更新,将远程模型`/uci_housing_model`拉取到本地临时路径`./_tmp/uci_housing_model`下,更新本地模型`./uci_housing_model`以及Paddle Serving的时间戳文件`./uci_housing_model/fluid_time_file`
......
......@@ -23,6 +23,7 @@ import argparse
import commands
import datetime
import shutil
import tarfile
class Monitor(object):
......@@ -40,6 +41,7 @@ class Monitor(object):
self._interval = interval
self._remote_donefile_timestamp = None
self._local_tmp_path = None
self._unpacked_filename = None
def set_remote_path(self, remote_path):
self._remote_path = remote_path
......@@ -62,6 +64,13 @@ class Monitor(object):
def set_local_tmp_path(self, tmp_path):
self._local_tmp_path = tmp_path
def set_unpacked_filename(self, unpacked_filename):
self._unpacked_filename = unpacked_filename
def _check_param_help(self, param_name, param_value):
return "Please check the {}({}) parameter.".format(param_name,
param_value)
def _check_params(self):
if self._remote_path is None:
raise Exception('remote_path not set.')
......@@ -78,6 +87,30 @@ class Monitor(object):
if self._local_tmp_path is None:
raise Exception('local_tmp_path not set.')
def _decompress_model_file(self, local_tmp_path, model_name,
unpacked_filename):
if unpacked_filename is None:
return model_name
tar_model_path = os.path.join(local_tmp_path, model_name)
if not tarfile.is_tarfile(tar_model_path):
raise Exception('not a tar packaged file type. {}'.format(
self._check_param_help('remote_model_name', model_name)))
try:
tar = tarfile.open(tar_model_path)
tar.extractall(local_tmp_path)
tar.close()
except:
raise Exception(
'Decompressing failed, maybe no disk space left. {}'.foemat(
self._check_param_help('local_tmp_path', local_tmp_path)))
finally:
os.remove(tar_model_path)
if not os.path.exists(unpacked_filename):
raise Exception('file not exist. {}'.format(
self._check_param_help('unpacked_filename',
unpacked_filename)))
return unpacked_filename
def run(self):
'''
Monitor the remote model by polling and update the local model.
......@@ -98,8 +131,11 @@ class Monitor(object):
self._local_tmp_path)
print('{} [INFO] pull remote model'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
self._update_local_model(
unpacked_filename = self._decompress_model_file(
self._local_tmp_path, self._remote_model_name,
self._unpacked_filename)
self._update_local_model(
self._local_tmp_path, unpacked_filename,
self._local_path, self._local_model_name)
print('{} [INFO] update model'.format(datetime.datetime.now(
).strftime('%Y-%m-%d %H:%M:%S')))
......@@ -175,7 +211,8 @@ class AFSMonitor(Monitor):
cmd = '{} -get {} {}'.format(self._cmd_prefix, remote_dirpath,
local_dirpath)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname)))
class HDFSMonitor(Monitor):
......@@ -200,7 +237,8 @@ class HDFSMonitor(Monitor):
cmd = '{} -get -f {} {}'.format(self._prefix_cmd, remote_dirpath,
local_tmp_path)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname)))
class FTPMonitor(Monitor):
......@@ -224,33 +262,49 @@ class FTPMonitor(Monitor):
except ftplib.error_perm:
return [False, None]
def _download_remote_file(self,
remote_path,
remote_filename,
local_tmp_path,
overwrite=True):
local_fullpath = os.path.join(local_tmp_path, remote_filename)
if not overwrite and os.path.isfile(fullpath):
return
else:
with open(local_fullpath, 'wb') as f:
self._ftp.cwd(remote_path)
self._ftp.retrbinary('RETR {}'.format(remote_filename), f.write)
def _download_remote_files(self,
remote_path,
remote_dirname,
local_tmp_path,
overwrite=True):
local_dirpath = os.path.join(local_tmp_path, remote_dirname)
if not os.path.exists(local_dirpath):
os.mkdir(local_dirpath)
import ftplib
remote_dirpath = os.path.join(remote_path, remote_dirname)
output = []
self._ftp.cwd(remote_dirpath)
self._ftp.dir(output.append)
for line in output:
[attr, _, _, _, _, _, _, _, name] = line.split()
if attr[0] == 'd':
self._download_remote_files(
os.path.join(remote_path, remote_dirname), name,
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
fullpath = os.path.join(local_tmp_path, remote_dirname, name)
if not overwrite and os.path.isfile(fullpath):
continue
# Check whether remote_dirpath is a file or a folder
try:
self._ftp.cwd(remote_dirpath)
local_dirpath = os.path.join(local_tmp_path, remote_dirname)
if not os.path.exists(local_dirpath):
os.mkdir(local_dirpath)
output = []
self._ftp.dir(output.append)
for line in output:
[attr, _, _, _, _, _, _, _, name] = line.split()
if attr[0] == 'd':
self._download_remote_files(
os.path.join(remote_path, remote_dirname), name,
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
with open(fullpath, 'wb') as f:
self._ftp.cwd(remote_dirpath)
self._ftp.retrbinary('RETR {}'.format(name), f.write)
self._download_remote_file(remote_dirname, name,
local_tmp_path, overwrite)
except ftplib.error_perm:
self._download_remote_file(remote_path, remote_dirname,
local_tmp_path, overwrite)
return
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
self._download_remote_files(
......@@ -270,7 +324,7 @@ class GeneralMonitor(Monitor):
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
url = '{}/{}'.format(self._host, remote_filepath)
cmd = 'wget -N -P {} {}'.format(local_tmp_path, url)
cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
if os.system(cmd) != 0:
return [False, None]
else:
......@@ -281,9 +335,10 @@ class GeneralMonitor(Monitor):
def _pull_remote_dir(self, remote_path, dirname, local_tmp_path):
remote_dirpath = os.path.join(remote_path, dirname)
url = '{}/{}'.format(self._host, remote_dirpath)
cmd = 'wget -nH -r -P {} {} &> /dev/null'.format(local_tmp_path, url)
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url)
if os.system(cmd) != 0:
raise Exception('pull remote dir failed.')
raise Exception('pull remote dir failed. {}'.format(
self._check_param_help('remote_model_name', dirname)))
def parse_args():
......@@ -317,6 +372,12 @@ def parse_args():
type=str,
default='_serving_monitor_tmp',
help="Local tmp path")
parser.add_argument(
"--unpacked_filename",
type=str,
default=None,
help="If the model of the remote production is a packaged file, the unpacked file name should be set. Currently, only tar packaging format is supported."
)
parser.add_argument(
"--interval", type=int, default=10, help="Time interval")
# general monitor
......@@ -379,6 +440,7 @@ def start_monitor(monitor, args):
monitor.set_local_model_name(args.local_model_name)
monitor.set_local_timestamp_file(args.local_timestamp_file)
monitor.set_local_tmp_path(args.local_tmp_path)
monitor.set_unpacked_filename(args.unpacked_filename)
monitor.run()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册