Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
ff2aff64
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 2 年 前同步成功
通知
187
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ff2aff64
编写于
1月 12, 2021
作者:
Z
zhangjun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update
上级
6c77c5f9
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
1 addition
and
849 deletion
+1
-849
python/paddle_serving_server_gpu/__init__.py
python/paddle_serving_server_gpu/__init__.py
+1
-1
server-arm/python/paddle_serving_server_gpu/__init__.py
server-arm/python/paddle_serving_server_gpu/__init__.py
+0
-848
未找到文件。
python/paddle_serving_server_gpu/__init__.py
浏览文件 @
ff2aff64
...
@@ -427,7 +427,7 @@ class Server(object):
...
@@ -427,7 +427,7 @@ class Server(object):
device_version
=
"serving-gpu-"
+
cuda_version
+
"-"
device_version
=
"serving-gpu-"
+
cuda_version
+
"-"
elif
cuda_version
==
"arm"
:
elif
cuda_version
==
"arm"
:
device_version
=
"serving-"
+
cuda_version
+
"-"
device_version
=
"serving-"
+
cuda_version
+
"-"
el
if
:
el
se
:
device_version
=
"serving-gpu-cuda"
+
cuda_version
+
"-"
device_version
=
"serving-gpu-cuda"
+
cuda_version
+
"-"
folder_name
=
device_version
+
serving_server_version
folder_name
=
device_version
+
serving_server_version
...
...
server-arm/python/paddle_serving_server_gpu/__init__.py
已删除
100644 → 0
浏览文件 @
6c77c5f9
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import
os
from
.proto
import
server_configure_pb2
as
server_sdk
from
.proto
import
general_model_config_pb2
as
m_config
import
google.protobuf.text_format
import
tarfile
import
socket
import
paddle_serving_server_gpu
as
paddle_serving_server
import
time
from
.version
import
serving_server_version
from
contextlib
import
closing
import
argparse
import
collections
import
sys
if
sys
.
platform
.
startswith
(
'win'
)
is
False
:
import
fcntl
import
shutil
import
numpy
as
np
import
grpc
from
.proto
import
multi_lang_general_model_service_pb2
import
sys
sys
.
path
.
append
(
os
.
path
.
join
(
os
.
path
.
abspath
(
os
.
path
.
dirname
(
__file__
)),
'proto'
))
from
.proto
import
multi_lang_general_model_service_pb2_grpc
from
multiprocessing
import
Pool
,
Process
from
concurrent
import
futures
def
serve_args
():
parser
=
argparse
.
ArgumentParser
(
"serve"
)
parser
.
add_argument
(
"--thread"
,
type
=
int
,
default
=
2
,
help
=
"Concurrency of server"
)
parser
.
add_argument
(
"--model"
,
type
=
str
,
default
=
""
,
help
=
"Model for serving"
)
parser
.
add_argument
(
"--port"
,
type
=
int
,
default
=
9292
,
help
=
"Port of the starting gpu"
)
parser
.
add_argument
(
"--workdir"
,
type
=
str
,
default
=
"workdir"
,
help
=
"Working dir of current service"
)
parser
.
add_argument
(
"--device"
,
type
=
str
,
default
=
"gpu"
,
help
=
"Type of device"
)
parser
.
add_argument
(
"--gpu_ids"
,
type
=
str
,
default
=
""
,
help
=
"gpu ids"
)
parser
.
add_argument
(
"--name"
,
type
=
str
,
default
=
"None"
,
help
=
"Default service name"
)
parser
.
add_argument
(
"--mem_optim_off"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Memory optimize"
)
parser
.
add_argument
(
"--ir_optim"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Graph optimize"
)
parser
.
add_argument
(
"--max_body_size"
,
type
=
int
,
default
=
512
*
1024
*
1024
,
help
=
"Limit sizes of messages"
)
parser
.
add_argument
(
"--use_multilang"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Use Multi-language-service"
)
parser
.
add_argument
(
"--use_trt"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Use TensorRT"
)
parser
.
add_argument
(
"--use_lite"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Use PaddleLite"
)
parser
.
add_argument
(
"--use_xpu"
,
default
=
False
,
action
=
"store_true"
,
help
=
"Use XPU"
)
parser
.
add_argument
(
"--product_name"
,
type
=
str
,
default
=
None
,
help
=
"product_name for authentication"
)
parser
.
add_argument
(
"--container_id"
,
type
=
str
,
default
=
None
,
help
=
"container_id for authentication"
)
return
parser
.
parse_args
()
class
OpMaker
(
object
):
def
__init__
(
self
):
self
.
op_dict
=
{
"general_infer"
:
"GeneralInferOp"
,
"general_reader"
:
"GeneralReaderOp"
,
"general_response"
:
"GeneralResponseOp"
,
"general_text_reader"
:
"GeneralTextReaderOp"
,
"general_text_response"
:
"GeneralTextResponseOp"
,
"general_single_kv"
:
"GeneralSingleKVOp"
,
"general_dist_kv_infer"
:
"GeneralDistKVInferOp"
,
"general_dist_kv"
:
"GeneralDistKVOp"
}
self
.
node_name_suffix_
=
collections
.
defaultdict
(
int
)
def
create
(
self
,
node_type
,
engine_name
=
None
,
inputs
=
[],
outputs
=
[]):
if
node_type
not
in
self
.
op_dict
:
raise
Exception
(
"Op type {} is not supported right now"
.
format
(
node_type
))
node
=
server_sdk
.
DAGNode
()
# node.name will be used as the infer engine name
if
engine_name
:
node
.
name
=
engine_name
else
:
node
.
name
=
'{}_{}'
.
format
(
node_type
,
self
.
node_name_suffix_
[
node_type
])
self
.
node_name_suffix_
[
node_type
]
+=
1
node
.
type
=
self
.
op_dict
[
node_type
]
if
inputs
:
for
dep_node_str
in
inputs
:
dep_node
=
server_sdk
.
DAGNode
()
google
.
protobuf
.
text_format
.
Parse
(
dep_node_str
,
dep_node
)
dep
=
server_sdk
.
DAGNodeDependency
()
dep
.
name
=
dep_node
.
name
dep
.
mode
=
"RO"
node
.
dependencies
.
extend
([
dep
])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return
google
.
protobuf
.
text_format
.
MessageToString
(
node
)
class
OpSeqMaker
(
object
):
def
__init__
(
self
):
self
.
workflow
=
server_sdk
.
Workflow
()
self
.
workflow
.
name
=
"workflow1"
self
.
workflow
.
workflow_type
=
"Sequence"
def
add_op
(
self
,
node_str
):
node
=
server_sdk
.
DAGNode
()
google
.
protobuf
.
text_format
.
Parse
(
node_str
,
node
)
if
len
(
node
.
dependencies
)
>
1
:
raise
Exception
(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if
len
(
self
.
workflow
.
nodes
)
>=
1
:
if
len
(
node
.
dependencies
)
==
0
:
dep
=
server_sdk
.
DAGNodeDependency
()
dep
.
name
=
self
.
workflow
.
nodes
[
-
1
].
name
dep
.
mode
=
"RO"
node
.
dependencies
.
extend
([
dep
])
elif
len
(
node
.
dependencies
)
==
1
:
if
node
.
dependencies
[
0
].
name
!=
self
.
workflow
.
nodes
[
-
1
].
name
:
raise
Exception
(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'
.
format
(
node
.
dependencies
[
0
].
name
,
self
.
workflow
.
nodes
[
-
1
].
name
))
self
.
workflow
.
nodes
.
extend
([
node
])
def
get_op_sequence
(
self
):
workflow_conf
=
server_sdk
.
WorkflowConf
()
workflow_conf
.
workflows
.
extend
([
self
.
workflow
])
return
workflow_conf
class
OpGraphMaker
(
object
):
def
__init__
(
self
):
self
.
workflow
=
server_sdk
.
Workflow
()
self
.
workflow
.
name
=
"workflow1"
# Currently, SDK only supports "Sequence"
self
.
workflow
.
workflow_type
=
"Sequence"
def
add_op
(
self
,
node_str
):
node
=
server_sdk
.
DAGNode
()
google
.
protobuf
.
text_format
.
Parse
(
node_str
,
node
)
self
.
workflow
.
nodes
.
extend
([
node
])
def
get_op_graph
(
self
):
workflow_conf
=
server_sdk
.
WorkflowConf
()
workflow_conf
.
workflows
.
extend
([
self
.
workflow
])
return
workflow_conf
class
Server
(
object
):
def
__init__
(
self
):
self
.
server_handle_
=
None
self
.
infer_service_conf
=
None
self
.
model_toolkit_conf
=
None
self
.
resource_conf
=
None
self
.
memory_optimization
=
False
self
.
ir_optimization
=
False
self
.
model_conf
=
None
self
.
workflow_fn
=
"workflow.prototxt"
self
.
resource_fn
=
"resource.prototxt"
self
.
infer_service_fn
=
"infer_service.prototxt"
self
.
model_toolkit_fn
=
"model_toolkit.prototxt"
self
.
general_model_config_fn
=
"general_model.prototxt"
self
.
cube_config_fn
=
"cube.conf"
self
.
workdir
=
""
self
.
max_concurrency
=
0
self
.
num_threads
=
2
self
.
port
=
8080
self
.
reload_interval_s
=
10
self
.
max_body_size
=
64
*
1024
*
1024
self
.
module_path
=
os
.
path
.
dirname
(
paddle_serving_server
.
__file__
)
self
.
cur_path
=
os
.
getcwd
()
self
.
use_local_bin
=
False
self
.
gpuid
=
0
self
.
use_trt
=
False
self
.
use_lite
=
False
self
.
use_xpu
=
False
self
.
model_config_paths
=
None
# for multi-model in a workflow
self
.
product_name
=
None
self
.
container_id
=
None
def
get_fetch_list
(
self
):
fetch_names
=
[
var
.
alias_name
for
var
in
self
.
model_conf
.
fetch_var
]
return
fetch_names
def
set_max_concurrency
(
self
,
concurrency
):
self
.
max_concurrency
=
concurrency
def
set_num_threads
(
self
,
threads
):
self
.
num_threads
=
threads
def
set_max_body_size
(
self
,
body_size
):
if
body_size
>=
self
.
max_body_size
:
self
.
max_body_size
=
body_size
else
:
print
(
"max_body_size is less than default value, will use default value in service."
)
def
set_port
(
self
,
port
):
self
.
port
=
port
def
set_reload_interval
(
self
,
interval
):
self
.
reload_interval_s
=
interval
def
set_op_sequence
(
self
,
op_seq
):
self
.
workflow_conf
=
op_seq
def
set_op_graph
(
self
,
op_graph
):
self
.
workflow_conf
=
op_graph
def
set_memory_optimize
(
self
,
flag
=
False
):
self
.
memory_optimization
=
flag
def
set_ir_optimize
(
self
,
flag
=
False
):
self
.
ir_optimization
=
flag
def
set_product_name
(
self
,
product_name
=
None
):
if
product_name
==
None
:
raise
ValueError
(
"product_name can't be None."
)
self
.
product_name
=
product_name
def
set_container_id
(
self
,
container_id
):
if
container_id
==
None
:
raise
ValueError
(
"container_id can't be None."
)
self
.
container_id
=
container_id
def
check_local_bin
(
self
):
if
"SERVING_BIN"
in
os
.
environ
:
self
.
use_local_bin
=
True
self
.
bin_path
=
os
.
environ
[
"SERVING_BIN"
]
def
check_cuda
(
self
):
if
os
.
system
(
"ls /dev/ | grep nvidia > /dev/null"
)
==
0
:
pass
else
:
raise
SystemExit
(
"GPU not found, please check your environment or use cpu version by
\"
pip install paddle_serving_server
\"
"
)
def
set_gpuid
(
self
,
gpuid
=
0
):
self
.
gpuid
=
gpuid
def
set_trt
(
self
):
self
.
use_trt
=
True
def
set_lite
(
self
):
self
.
use_lite
=
True
def
set_xpu
(
self
):
self
.
use_xpu
=
True
def
_prepare_engine
(
self
,
model_config_paths
,
device
):
if
self
.
model_toolkit_conf
==
None
:
self
.
model_toolkit_conf
=
server_sdk
.
ModelToolkitConf
()
for
engine_name
,
model_config_path
in
model_config_paths
.
items
():
engine
=
server_sdk
.
EngineDesc
()
engine
.
name
=
engine_name
# engine.reloadable_meta = model_config_path + "/fluid_time_file"
engine
.
reloadable_meta
=
self
.
workdir
+
"/fluid_time_file"
os
.
system
(
"touch {}"
.
format
(
engine
.
reloadable_meta
))
engine
.
reloadable_type
=
"timestamp_ne"
engine
.
runtime_thread_num
=
0
engine
.
batch_infer_size
=
0
engine
.
enable_batch_align
=
0
engine
.
model_data_path
=
model_config_path
engine
.
enable_memory_optimization
=
self
.
memory_optimization
engine
.
enable_ir_optimization
=
self
.
ir_optimization
engine
.
static_optimization
=
False
engine
.
force_update_static_cache
=
False
engine
.
use_trt
=
self
.
use_trt
engine
.
use_lite
=
self
.
use_lite
engine
.
use_xpu
=
self
.
use_xpu
if
device
==
"cpu"
:
engine
.
type
=
"FLUID_CPU_ANALYSIS_DIR"
elif
device
==
"gpu"
:
engine
.
type
=
"FLUID_GPU_ANALYSIS_DIR"
elif
device
==
"arm"
:
engine
.
type
=
"FLUID_ARM_ANALYSIS_DIR"
self
.
model_toolkit_conf
.
engines
.
extend
([
engine
])
def
_prepare_infer_service
(
self
,
port
):
if
self
.
infer_service_conf
==
None
:
self
.
infer_service_conf
=
server_sdk
.
InferServiceConf
()
self
.
infer_service_conf
.
port
=
port
infer_service
=
server_sdk
.
InferService
()
infer_service
.
name
=
"GeneralModelService"
infer_service
.
workflows
.
extend
([
"workflow1"
])
self
.
infer_service_conf
.
services
.
extend
([
infer_service
])
def
_prepare_resource
(
self
,
workdir
,
cube_conf
):
self
.
workdir
=
workdir
if
self
.
resource_conf
==
None
:
with
open
(
"{}/{}"
.
format
(
workdir
,
self
.
general_model_config_fn
),
"w"
)
as
fout
:
fout
.
write
(
str
(
self
.
model_conf
))
self
.
resource_conf
=
server_sdk
.
ResourceConf
()
for
workflow
in
self
.
workflow_conf
.
workflows
:
for
node
in
workflow
.
nodes
:
if
"dist_kv"
in
node
.
name
:
self
.
resource_conf
.
cube_config_path
=
workdir
self
.
resource_conf
.
cube_config_file
=
self
.
cube_config_fn
if
cube_conf
==
None
:
raise
ValueError
(
"Please set the path of cube.conf while use dist_kv op."
)
shutil
.
copy
(
cube_conf
,
workdir
)
self
.
resource_conf
.
model_toolkit_path
=
workdir
self
.
resource_conf
.
model_toolkit_file
=
self
.
model_toolkit_fn
self
.
resource_conf
.
general_model_path
=
workdir
self
.
resource_conf
.
general_model_file
=
self
.
general_model_config_fn
if
self
.
product_name
!=
None
:
self
.
resource_conf
.
auth_product_name
=
self
.
product_name
if
self
.
container_id
!=
None
:
self
.
resource_conf
.
auth_container_id
=
self
.
container_id
def
_write_pb_str
(
self
,
filepath
,
pb_obj
):
with
open
(
filepath
,
"w"
)
as
fout
:
fout
.
write
(
str
(
pb_obj
))
def
load_model_config
(
self
,
model_config_paths
):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path
=
None
if
isinstance
(
model_config_paths
,
str
):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names
=
[
'general_infer_0'
,
'general_dist_kv_infer_0'
,
'general_dist_kv_quant_infer_0'
]
engine_name
=
None
for
node
in
self
.
workflow_conf
.
workflows
[
0
].
nodes
:
if
node
.
name
in
default_engine_names
:
engine_name
=
node
.
name
break
if
engine_name
is
None
:
raise
Exception
(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self
.
model_config_paths
=
{
engine_name
:
model_config_paths
}
workflow_oi_config_path
=
self
.
model_config_paths
[
engine_name
]
elif
isinstance
(
model_config_paths
,
dict
):
self
.
model_config_paths
=
{}
for
node_str
,
path
in
model_config_paths
.
items
():
node
=
server_sdk
.
DAGNode
()
google
.
protobuf
.
text_format
.
Parse
(
node_str
,
node
)
self
.
model_config_paths
[
node
.
name
]
=
path
print
(
"You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same."
)
workflow_oi_config_path
=
list
(
self
.
model_config_paths
.
items
())[
0
][
1
]
else
:
raise
Exception
(
"The type of model_config_paths must be str or "
"dict({op: model_path}), not {}."
.
format
(
type
(
model_config_paths
)))
self
.
model_conf
=
m_config
.
GeneralModelConfig
()
f
=
open
(
"{}/serving_server_conf.prototxt"
.
format
(
workflow_oi_config_path
),
'r'
)
self
.
model_conf
=
google
.
protobuf
.
text_format
.
Merge
(
str
(
f
.
read
()),
self
.
model_conf
)
# check config here
# print config here
def
download_bin
(
self
):
os
.
chdir
(
self
.
module_path
)
need_download
=
False
#acquire lock
version_file
=
open
(
"{}/version.py"
.
format
(
self
.
module_path
),
"r"
)
import
re
for
line
in
version_file
.
readlines
():
if
re
.
match
(
"cuda_version"
,
line
):
cuda_version
=
line
.
split
(
"
\"
"
)[
1
]
if
cuda_version
==
"trt"
:
device_version
=
"serving-gpu-"
+
cuda_version
+
"-"
elif
cuda_version
==
"arm"
:
device_version
=
"serving-"
+
cuda_version
+
"-"
else
:
device_version
=
"serving-gpu-cuda"
+
cuda_version
+
"-"
folder_name
=
device_version
+
serving_server_version
tar_name
=
folder_name
+
".tar.gz"
bin_url
=
"https://paddle-serving.bj.bcebos.com/bin/"
+
tar_name
self
.
server_path
=
os
.
path
.
join
(
self
.
module_path
,
folder_name
)
download_flag
=
"{}/{}.is_download"
.
format
(
self
.
module_path
,
folder_name
)
fcntl
.
flock
(
version_file
,
fcntl
.
LOCK_EX
)
if
os
.
path
.
exists
(
download_flag
):
os
.
chdir
(
self
.
cur_path
)
self
.
bin_path
=
self
.
server_path
+
"/serving"
return
if
not
os
.
path
.
exists
(
self
.
server_path
):
os
.
system
(
"touch {}/{}.is_download"
.
format
(
self
.
module_path
,
folder_name
))
print
(
'Frist time run, downloading PaddleServing components ...'
)
r
=
os
.
system
(
'wget '
+
bin_url
+
' --no-check-certificate'
)
if
r
!=
0
:
if
os
.
path
.
exists
(
tar_name
):
os
.
remove
(
tar_name
)
raise
SystemExit
(
'Download failed, please check your network or permission of {}.'
.
format
(
self
.
module_path
))
else
:
try
:
print
(
'Decompressing files ..'
)
tar
=
tarfile
.
open
(
tar_name
)
tar
.
extractall
()
tar
.
close
()
except
:
if
os
.
path
.
exists
(
exe_path
):
os
.
remove
(
exe_path
)
raise
SystemExit
(
'Decompressing failed, please check your permission of {} or disk space left.'
.
format
(
self
.
module_path
))
finally
:
os
.
remove
(
tar_name
)
#release lock
version_file
.
close
()
os
.
chdir
(
self
.
cur_path
)
self
.
bin_path
=
self
.
server_path
+
"/serving"
def
prepare_server
(
self
,
workdir
=
None
,
port
=
9292
,
device
=
"cpu"
,
cube_conf
=
None
):
if
workdir
==
None
:
workdir
=
"./tmp"
os
.
system
(
"mkdir {}"
.
format
(
workdir
))
else
:
os
.
system
(
"mkdir {}"
.
format
(
workdir
))
os
.
system
(
"touch {}/fluid_time_file"
.
format
(
workdir
))
if
not
self
.
port_is_available
(
port
):
raise
SystemExit
(
"Port {} is already used"
.
format
(
port
))
self
.
set_port
(
port
)
self
.
_prepare_resource
(
workdir
,
cube_conf
)
self
.
_prepare_engine
(
self
.
model_config_paths
,
device
)
self
.
_prepare_infer_service
(
port
)
self
.
workdir
=
workdir
infer_service_fn
=
"{}/{}"
.
format
(
workdir
,
self
.
infer_service_fn
)
workflow_fn
=
"{}/{}"
.
format
(
workdir
,
self
.
workflow_fn
)
resource_fn
=
"{}/{}"
.
format
(
workdir
,
self
.
resource_fn
)
model_toolkit_fn
=
"{}/{}"
.
format
(
workdir
,
self
.
model_toolkit_fn
)
self
.
_write_pb_str
(
infer_service_fn
,
self
.
infer_service_conf
)
self
.
_write_pb_str
(
workflow_fn
,
self
.
workflow_conf
)
self
.
_write_pb_str
(
resource_fn
,
self
.
resource_conf
)
self
.
_write_pb_str
(
model_toolkit_fn
,
self
.
model_toolkit_conf
)
def
port_is_available
(
self
,
port
):
with
closing
(
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
))
as
sock
:
sock
.
settimeout
(
2
)
result
=
sock
.
connect_ex
((
'0.0.0.0'
,
port
))
if
result
!=
0
:
return
True
else
:
return
False
def
run_server
(
self
):
# just run server with system command
# currently we do not load cube
self
.
check_local_bin
()
if
not
self
.
use_local_bin
:
self
.
download_bin
()
# wait for other process to download server bin
while
not
os
.
path
.
exists
(
self
.
server_path
):
time
.
sleep
(
1
)
else
:
print
(
"Use local bin : {}"
.
format
(
self
.
bin_path
))
#self.check_cuda()
if
self
.
use_lite
:
command
=
"{} "
\
"-enable_model_toolkit "
\
"-inferservice_path {} "
\
"-inferservice_file {} "
\
"-max_concurrency {} "
\
"-num_threads {} "
\
"-port {} "
\
"-reload_interval_s {} "
\
"-resource_path {} "
\
"-resource_file {} "
\
"-workflow_path {} "
\
"-workflow_file {} "
\
"-bthread_concurrency {} "
\
"-max_body_size {} "
.
format
(
self
.
bin_path
,
self
.
workdir
,
self
.
infer_service_fn
,
self
.
max_concurrency
,
self
.
num_threads
,
self
.
port
,
self
.
reload_interval_s
,
self
.
workdir
,
self
.
resource_fn
,
self
.
workdir
,
self
.
workflow_fn
,
self
.
num_threads
,
self
.
max_body_size
)
else
:
command
=
"{} "
\
"-enable_model_toolkit "
\
"-inferservice_path {} "
\
"-inferservice_file {} "
\
"-max_concurrency {} "
\
"-num_threads {} "
\
"-port {} "
\
"-reload_interval_s {} "
\
"-resource_path {} "
\
"-resource_file {} "
\
"-workflow_path {} "
\
"-workflow_file {} "
\
"-bthread_concurrency {} "
\
"-gpuid {} "
\
"-max_body_size {} "
.
format
(
self
.
bin_path
,
self
.
workdir
,
self
.
infer_service_fn
,
self
.
max_concurrency
,
self
.
num_threads
,
self
.
port
,
self
.
reload_interval_s
,
self
.
workdir
,
self
.
resource_fn
,
self
.
workdir
,
self
.
workflow_fn
,
self
.
num_threads
,
self
.
gpuid
,
self
.
max_body_size
)
print
(
"Going to Run Comand"
)
print
(
command
)
os
.
system
(
command
)
class
MultiLangServerServiceServicer
(
multi_lang_general_model_service_pb2_grpc
.
MultiLangGeneralModelServiceServicer
):
def
__init__
(
self
,
model_config_path
,
is_multi_model
,
endpoints
):
self
.
is_multi_model_
=
is_multi_model
self
.
model_config_path_
=
model_config_path
self
.
endpoints_
=
endpoints
with
open
(
self
.
model_config_path_
)
as
f
:
self
.
model_config_str_
=
str
(
f
.
read
())
self
.
_parse_model_config
(
self
.
model_config_str_
)
self
.
_init_bclient
(
self
.
model_config_path_
,
self
.
endpoints_
)
def
_init_bclient
(
self
,
model_config_path
,
endpoints
,
timeout_ms
=
None
):
from
paddle_serving_client
import
Client
self
.
bclient_
=
Client
()
if
timeout_ms
is
not
None
:
self
.
bclient_
.
set_rpc_timeout_ms
(
timeout_ms
)
self
.
bclient_
.
load_client_config
(
model_config_path
)
self
.
bclient_
.
connect
(
endpoints
)
def
_parse_model_config
(
self
,
model_config_str
):
model_conf
=
m_config
.
GeneralModelConfig
()
model_conf
=
google
.
protobuf
.
text_format
.
Merge
(
model_config_str
,
model_conf
)
self
.
feed_names_
=
[
var
.
alias_name
for
var
in
model_conf
.
feed_var
]
self
.
feed_types_
=
{}
self
.
feed_shapes_
=
{}
self
.
fetch_names_
=
[
var
.
alias_name
for
var
in
model_conf
.
fetch_var
]
self
.
fetch_types_
=
{}
self
.
lod_tensor_set_
=
set
()
for
i
,
var
in
enumerate
(
model_conf
.
feed_var
):
self
.
feed_types_
[
var
.
alias_name
]
=
var
.
feed_type
self
.
feed_shapes_
[
var
.
alias_name
]
=
var
.
shape
if
var
.
is_lod_tensor
:
self
.
lod_tensor_set_
.
add
(
var
.
alias_name
)
for
i
,
var
in
enumerate
(
model_conf
.
fetch_var
):
self
.
fetch_types_
[
var
.
alias_name
]
=
var
.
fetch_type
if
var
.
is_lod_tensor
:
self
.
lod_tensor_set_
.
add
(
var
.
alias_name
)
def
_flatten_list
(
self
,
nested_list
):
for
item
in
nested_list
:
if
isinstance
(
item
,
(
list
,
tuple
)):
for
sub_item
in
self
.
_flatten_list
(
item
):
yield
sub_item
else
:
yield
item
def
_unpack_inference_request
(
self
,
request
):
feed_names
=
list
(
request
.
feed_var_names
)
fetch_names
=
list
(
request
.
fetch_var_names
)
is_python
=
request
.
is_python
log_id
=
request
.
log_id
feed_batch
=
[]
for
feed_inst
in
request
.
insts
:
feed_dict
=
{}
for
idx
,
name
in
enumerate
(
feed_names
):
var
=
feed_inst
.
tensor_array
[
idx
]
v_type
=
self
.
feed_types_
[
name
]
data
=
None
if
is_python
:
if
v_type
==
0
:
data
=
np
.
frombuffer
(
var
.
data
,
dtype
=
"int64"
)
elif
v_type
==
1
:
data
=
np
.
frombuffer
(
var
.
data
,
dtype
=
"float32"
)
elif
v_type
==
2
:
data
=
np
.
frombuffer
(
var
.
data
,
dtype
=
"int32"
)
else
:
raise
Exception
(
"error type."
)
else
:
if
v_type
==
0
:
# int64
data
=
np
.
array
(
list
(
var
.
int64_data
),
dtype
=
"int64"
)
elif
v_type
==
1
:
# float32
data
=
np
.
array
(
list
(
var
.
float_data
),
dtype
=
"float32"
)
elif
v_type
==
2
:
data
=
np
.
array
(
list
(
var
.
int_data
),
dtype
=
"int32"
)
else
:
raise
Exception
(
"error type."
)
data
.
shape
=
list
(
feed_inst
.
tensor_array
[
idx
].
shape
)
feed_dict
[
name
]
=
data
feed_batch
.
append
(
feed_dict
)
return
feed_batch
,
fetch_names
,
is_python
,
log_id
def
_pack_inference_response
(
self
,
ret
,
fetch_names
,
is_python
):
resp
=
multi_lang_general_model_service_pb2
.
InferenceResponse
()
if
ret
is
None
:
resp
.
err_code
=
1
return
resp
results
,
tag
=
ret
resp
.
tag
=
tag
resp
.
err_code
=
0
if
not
self
.
is_multi_model_
:
results
=
{
'general_infer_0'
:
results
}
for
model_name
,
model_result
in
results
.
items
():
model_output
=
multi_lang_general_model_service_pb2
.
ModelOutput
()
inst
=
multi_lang_general_model_service_pb2
.
FetchInst
()
for
idx
,
name
in
enumerate
(
fetch_names
):
tensor
=
multi_lang_general_model_service_pb2
.
Tensor
()
v_type
=
self
.
fetch_types_
[
name
]
if
is_python
:
tensor
.
data
=
model_result
[
name
].
tobytes
()
else
:
if
v_type
==
0
:
# int64
tensor
.
int64_data
.
extend
(
model_result
[
name
].
reshape
(
-
1
)
.
tolist
())
elif
v_type
==
1
:
# float32
tensor
.
float_data
.
extend
(
model_result
[
name
].
reshape
(
-
1
)
.
tolist
())
elif
v_type
==
2
:
# int32
tensor
.
int_data
.
extend
(
model_result
[
name
].
reshape
(
-
1
)
.
tolist
())
else
:
raise
Exception
(
"error type."
)
tensor
.
shape
.
extend
(
list
(
model_result
[
name
].
shape
))
if
"{}.lod"
.
format
(
name
)
in
model_result
:
tensor
.
lod
.
extend
(
model_result
[
"{}.lod"
.
format
(
name
)]
.
tolist
())
inst
.
tensor_array
.
append
(
tensor
)
model_output
.
insts
.
append
(
inst
)
model_output
.
engine_name
=
model_name
resp
.
outputs
.
append
(
model_output
)
return
resp
def
SetTimeout
(
self
,
request
,
context
):
# This porcess and Inference process cannot be operate at the same time.
# For performance reasons, do not add thread lock temporarily.
timeout_ms
=
request
.
timeout_ms
self
.
_init_bclient
(
self
.
model_config_path_
,
self
.
endpoints_
,
timeout_ms
)
resp
=
multi_lang_general_model_service_pb2
.
SimpleResponse
()
resp
.
err_code
=
0
return
resp
def
Inference
(
self
,
request
,
context
):
feed_dict
,
fetch_names
,
is_python
,
log_id
\
=
self
.
_unpack_inference_request
(
request
)
ret
=
self
.
bclient_
.
predict
(
feed
=
feed_dict
,
fetch
=
fetch_names
,
need_variant_tag
=
True
,
log_id
=
log_id
)
return
self
.
_pack_inference_response
(
ret
,
fetch_names
,
is_python
)
def
GetClientConfig
(
self
,
request
,
context
):
resp
=
multi_lang_general_model_service_pb2
.
GetClientConfigResponse
()
resp
.
client_config_str
=
self
.
model_config_str_
return
resp
class
MultiLangServer
(
object
):
def
__init__
(
self
):
self
.
bserver_
=
Server
()
self
.
worker_num_
=
4
self
.
body_size_
=
64
*
1024
*
1024
self
.
concurrency_
=
100000
self
.
is_multi_model_
=
False
# for model ensemble
def
set_max_concurrency
(
self
,
concurrency
):
self
.
concurrency_
=
concurrency
self
.
bserver_
.
set_max_concurrency
(
concurrency
)
def
set_num_threads
(
self
,
threads
):
self
.
worker_num_
=
threads
self
.
bserver_
.
set_num_threads
(
threads
)
def
set_max_body_size
(
self
,
body_size
):
self
.
bserver_
.
set_max_body_size
(
body_size
)
if
body_size
>=
self
.
body_size_
:
self
.
body_size_
=
body_size
else
:
print
(
"max_body_size is less than default value, will use default value in service."
)
def
set_port
(
self
,
port
):
self
.
gport_
=
port
def
set_reload_interval
(
self
,
interval
):
self
.
bserver_
.
set_reload_interval
(
interval
)
def
set_op_sequence
(
self
,
op_seq
):
self
.
bserver_
.
set_op_sequence
(
op_seq
)
def
set_op_graph
(
self
,
op_graph
):
self
.
bserver_
.
set_op_graph
(
op_graph
)
def
set_memory_optimize
(
self
,
flag
=
False
):
self
.
bserver_
.
set_memory_optimize
(
flag
)
def
set_ir_optimize
(
self
,
flag
=
False
):
self
.
bserver_
.
set_ir_optimize
(
flag
)
def
set_gpuid
(
self
,
gpuid
=
0
):
self
.
bserver_
.
set_gpuid
(
gpuid
)
def
load_model_config
(
self
,
server_config_paths
,
client_config_path
=
None
):
self
.
bserver_
.
load_model_config
(
server_config_paths
)
if
client_config_path
is
None
:
if
isinstance
(
server_config_paths
,
dict
):
self
.
is_multi_model_
=
True
client_config_path
=
'{}/serving_server_conf.prototxt'
.
format
(
list
(
server_config_paths
.
items
())[
0
][
1
])
else
:
client_config_path
=
'{}/serving_server_conf.prototxt'
.
format
(
server_config_paths
)
self
.
bclient_config_path_
=
client_config_path
def
prepare_server
(
self
,
workdir
=
None
,
port
=
9292
,
device
=
"cpu"
,
cube_conf
=
None
):
if
not
self
.
_port_is_available
(
port
):
raise
SystemExit
(
"Prot {} is already used"
.
format
(
port
))
default_port
=
12000
self
.
port_list_
=
[]
for
i
in
range
(
1000
):
if
default_port
+
i
!=
port
and
self
.
_port_is_available
(
default_port
+
i
):
self
.
port_list_
.
append
(
default_port
+
i
)
break
self
.
bserver_
.
prepare_server
(
workdir
=
workdir
,
port
=
self
.
port_list_
[
0
],
device
=
device
,
cube_conf
=
cube_conf
)
self
.
set_port
(
port
)
def
_launch_brpc_service
(
self
,
bserver
):
bserver
.
run_server
()
def
_port_is_available
(
self
,
port
):
with
closing
(
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
))
as
sock
:
sock
.
settimeout
(
2
)
result
=
sock
.
connect_ex
((
'0.0.0.0'
,
port
))
return
result
!=
0
def
run_server
(
self
):
p_bserver
=
Process
(
target
=
self
.
_launch_brpc_service
,
args
=
(
self
.
bserver_
,
))
p_bserver
.
start
()
options
=
[(
'grpc.max_send_message_length'
,
self
.
body_size_
),
(
'grpc.max_receive_message_length'
,
self
.
body_size_
)]
server
=
grpc
.
server
(
futures
.
ThreadPoolExecutor
(
max_workers
=
self
.
worker_num_
),
options
=
options
,
maximum_concurrent_rpcs
=
self
.
concurrency_
)
multi_lang_general_model_service_pb2_grpc
.
add_MultiLangGeneralModelServiceServicer_to_server
(
MultiLangServerServiceServicer
(
self
.
bclient_config_path_
,
self
.
is_multi_model_
,
[
"0.0.0.0:{}"
.
format
(
self
.
port_list_
[
0
])]),
server
)
server
.
add_insecure_port
(
'[::]:{}'
.
format
(
self
.
gport_
))
server
.
start
()
p_bserver
.
join
()
server
.
wait_for_termination
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录