Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
d8c7c40c
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 1 年 前同步成功
通知
186
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d8c7c40c
编写于
12月 01, 2020
作者:
J
Jiawei Wang
提交者:
TeslaZhao
12月 01, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Merge pull request #896 from TeslaZhao/develop
Supporting the local predictor in pipeline server
上级
0849586d
变更
35
展开全部
隐藏空白更改
内联
并排
Showing
35 changed file
with
1570 addition
and
852 deletion
+1570
-852
doc/FAQ.md
doc/FAQ.md
+4
-0
python/examples/bert/bert_client.py
python/examples/bert/bert_client.py
+2
-2
python/examples/bert/bert_web_service.py
python/examples/bert/bert_web_service.py
+3
-2
python/examples/imdb/benchmark.py
python/examples/imdb/benchmark.py
+1
-1
python/examples/imdb/test_client.py
python/examples/imdb/test_client.py
+1
-1
python/examples/imdb/text_classify_service.py
python/examples/imdb/text_classify_service.py
+1
-1
python/examples/pipeline/imdb_model_ensemble/config.yml
python/examples/pipeline/imdb_model_ensemble/config.yml
+94
-16
python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py
...ples/pipeline/imdb_model_ensemble/test_pipeline_client.py
+5
-4
python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py
...ples/pipeline/imdb_model_ensemble/test_pipeline_server.py
+35
-41
python/examples/pipeline/ocr/README.md
python/examples/pipeline/ocr/README.md
+0
-22
python/examples/pipeline/ocr/README_CN.md
python/examples/pipeline/ocr/README_CN.md
+0
-20
python/examples/pipeline/ocr/config.yml
python/examples/pipeline/ocr/config.yml
+48
-7
python/examples/pipeline/ocr/hybrid_service_pipeline_server.py
...n/examples/pipeline/ocr/hybrid_service_pipeline_server.py
+0
-135
python/examples/pipeline/ocr/local_service_pipeline_server.py
...on/examples/pipeline/ocr/local_service_pipeline_server.py
+0
-136
python/examples/pipeline/ocr/pipeline_rpc_client.py
python/examples/pipeline/ocr/pipeline_rpc_client.py
+2
-2
python/examples/pipeline/ocr/remote_service_pipeline_server.py
...n/examples/pipeline/ocr/remote_service_pipeline_server.py
+0
-129
python/examples/pipeline/ocr/web_service.py
python/examples/pipeline/ocr/web_service.py
+10
-10
python/examples/pipeline/simple_web_service/README.md
python/examples/pipeline/simple_web_service/README.md
+1
-1
python/examples/pipeline/simple_web_service/README_CN.md
python/examples/pipeline/simple_web_service/README_CN.md
+1
-1
python/examples/pipeline/simple_web_service/config.yml
python/examples/pipeline/simple_web_service/config.yml
+24
-5
python/examples/pipeline/simple_web_service/web_service.py
python/examples/pipeline/simple_web_service/web_service.py
+11
-6
python/paddle_serving_app/local_predict.py
python/paddle_serving_app/local_predict.py
+67
-9
python/paddle_serving_app/reader/__init__.py
python/paddle_serving_app/reader/__init__.py
+1
-1
python/paddle_serving_app/reader/pddet/image_tool.py
python/paddle_serving_app/reader/pddet/image_tool.py
+5
-6
python/paddle_serving_server/web_service.py
python/paddle_serving_server/web_service.py
+6
-4
python/paddle_serving_server_gpu/web_service.py
python/paddle_serving_server_gpu/web_service.py
+6
-4
python/pipeline/channel.py
python/pipeline/channel.py
+100
-68
python/pipeline/dag.py
python/pipeline/dag.py
+293
-33
python/pipeline/gateway/proto/gateway.proto
python/pipeline/gateway/proto/gateway.proto
+9
-6
python/pipeline/gateway/proxy_server.go
python/pipeline/gateway/proxy_server.go
+2
-1
python/pipeline/local_service_handler.py
python/pipeline/local_service_handler.py
+136
-41
python/pipeline/operator.py
python/pipeline/operator.py
+577
-105
python/pipeline/pipeline_client.py
python/pipeline/pipeline_client.py
+25
-24
python/pipeline/pipeline_server.py
python/pipeline/pipeline_server.py
+93
-4
python/pipeline/proto/pipeline_service.proto
python/pipeline/proto/pipeline_service.proto
+7
-4
未找到文件。
doc/FAQ.md
浏览文件 @
d8c7c40c
...
...
@@ -41,6 +41,10 @@
**A:**
通过pip命令安装自己编译出的whl包,并设置SERVING_BIN环境变量为编译出的serving二进制文件路径。
#### Q: 使用Java客户端,mvn compile过程出现"No compiler is provided in this environment. Perhaps you are running on a JRE rather than a JDK?"错误
**A:**
没有安装JDK,或者JAVA_HOME路径配置错误(正确配置是JDK路径,常见错误配置成JRE路径,例如正确路径参考JAVA_HOME="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/")。Java JDK安装参考https://segmentfault.com/a/1190000015389941
## 部署问题
...
...
python/examples/bert/bert_client.py
浏览文件 @
d8c7c40c
...
...
@@ -23,7 +23,7 @@ args = benchmark_args()
reader
=
ChineseBertReader
({
"max_seq_len"
:
128
})
fetch
=
[
"pooled_output"
]
endpoint_list
=
[
'127.0.0.1:
8861
'
]
endpoint_list
=
[
'127.0.0.1:
9292
'
]
client
=
Client
()
client
.
load_client_config
(
args
.
model
)
client
.
connect
(
endpoint_list
)
...
...
@@ -33,5 +33,5 @@ for line in sys.stdin:
for
key
in
feed_dict
.
keys
():
feed_dict
[
key
]
=
np
.
array
(
feed_dict
[
key
]).
reshape
((
128
,
1
))
#print(feed_dict)
result
=
client
.
predict
(
feed
=
feed_dict
,
fetch
=
fetch
)
result
=
client
.
predict
(
feed
=
feed_dict
,
fetch
=
fetch
,
batch
=
True
)
print
(
result
)
python/examples/bert/bert_web_service.py
浏览文件 @
d8c7c40c
...
...
@@ -29,13 +29,14 @@ class BertService(WebService):
def
preprocess
(
self
,
feed
=
[],
fetch
=
[]):
feed_res
=
[]
is_batch
=
True
for
ins
in
feed
:
feed_dict
=
self
.
reader
.
process
(
ins
[
"words"
].
encode
(
"utf-8"
))
for
key
in
feed_dict
.
keys
():
feed_dict
[
key
]
=
np
.
array
(
feed_dict
[
key
]).
reshape
(
(
1
,
len
(
feed_dict
[
key
]),
1
))
(
len
(
feed_dict
[
key
]),
1
))
feed_res
.
append
(
feed_dict
)
return
feed_res
,
fetch
return
feed_res
,
fetch
,
is_batch
bert_service
=
BertService
(
name
=
"bert"
)
...
...
python/examples/imdb/benchmark.py
浏览文件 @
d8c7c40c
...
...
@@ -18,7 +18,7 @@ import sys
import
time
import
requests
import
numpy
as
np
from
paddle_serving_app.reader
import
IMDBDataset
from
paddle_serving_app.reader
.imdb_reader
import
IMDBDataset
from
paddle_serving_client
import
Client
from
paddle_serving_client.utils
import
MultiThreadRunner
from
paddle_serving_client.utils
import
MultiThreadRunner
,
benchmark_args
,
show_latency
...
...
python/examples/imdb/test_client.py
浏览文件 @
d8c7c40c
...
...
@@ -13,7 +13,7 @@
# limitations under the License.
# pylint: disable=doc-string-missing
from
paddle_serving_client
import
Client
from
paddle_serving_app.reader
import
IMDBDataset
from
paddle_serving_app.reader
.imdb_reader
import
IMDBDataset
import
sys
import
numpy
as
np
...
...
python/examples/imdb/text_classify_service.py
浏览文件 @
d8c7c40c
...
...
@@ -14,7 +14,7 @@
# pylint: disable=doc-string-missing
from
paddle_serving_server.web_service
import
WebService
from
paddle_serving_app.reader
import
IMDBDataset
from
paddle_serving_app.reader
.imdb_reader
import
IMDBDataset
import
sys
import
numpy
as
np
...
...
python/examples/pipeline/imdb_model_ensemble/config.yml
浏览文件 @
d8c7c40c
rpc_port
:
18080
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port
:
18070
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port
:
18071
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num
:
4
build_dag_each_worker
:
false
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker
:
False
dag
:
is_thread_op
:
true
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op
:
True
#重试次数
retry
:
1
use_profile
:
false
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile
:
False
#channel的最大长度,默认为0
channel_size
:
0
#tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据
tracer
:
#每次trace的时间间隔,单位秒/s
interval_s
:
10
op
:
bow
:
concurrency
:
2
remote_service_conf
:
client_type
:
brpc
model_config
:
imdb_bow_model
devices
:
"
"
rpc_port
:
9393
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
1
#client连接类型,brpc
client_type
:
brpc
#Serving交互重试次数,默认不重试
retry
:
1
#Serving交互超时时间, 单位ms
timeout
:
3000
#Serving IPs
server_endpoints
:
[
"
127.0.0.1:9393"
]
#bow模型client端配置
client_config
:
"
imdb_bow_client_conf/serving_client_conf.prototxt"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list
:
[
"
prediction"
]
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size
:
1
#批量查询超时,与batch_size配合使用
auto_batching_timeout
:
2000
cnn
:
concurrency
:
2
remote_service_conf
:
client_type
:
brpc
model_config
:
imdb_cnn_model
devices
:
"
"
rpc_port
:
9292
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
1
#client连接类型,brpc
client_type
:
brpc
#Serving交互重试次数,默认不重试
retry
:
1
#超时时间, 单位ms
timeout
:
3000
#Serving IPs
server_endpoints
:
[
"
127.0.0.1:9292"
]
#cnn模型client端配置
client_config
:
"
imdb_cnn_client_conf/serving_client_conf.prototxt"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list
:
[
"
prediction"
]
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size
:
1
#批量查询超时,与batch_size配合使用
auto_batching_timeout
:
2000
combine
:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
1
#Serving交互重试次数,默认不重试
retry
:
1
#超时时间, 单位ms
timeout
:
3000
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size
:
1
#批量查询超时,与batch_size配合使用
auto_batching_timeout
:
2000
python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py
浏览文件 @
d8c7c40c
...
...
@@ -15,21 +15,22 @@ from paddle_serving_server.pipeline import PipelineClient
import
numpy
as
np
client
=
PipelineClient
()
client
.
connect
([
'127.0.0.1:180
8
0'
])
client
.
connect
([
'127.0.0.1:180
7
0'
])
words
=
'i am very sad | 0'
futures
=
[]
for
i
in
range
(
4
):
for
i
in
range
(
100
):
futures
.
append
(
client
.
predict
(
feed_dict
=
{
"words"
:
words
},
feed_dict
=
{
"words"
:
words
,
"logid"
:
10000
+
i
},
fetch
=
[
"prediction"
],
asyn
=
True
,
profile
=
False
))
for
f
in
futures
:
res
=
f
.
result
()
if
res
[
"ecode"
]
!=
0
:
if
res
.
err_no
!=
0
:
print
(
"predict failed: {}"
.
format
(
res
))
print
(
res
)
python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py
浏览文件 @
d8c7c40c
...
...
@@ -15,10 +15,14 @@
from
paddle_serving_server.pipeline
import
Op
,
RequestOp
,
ResponseOp
from
paddle_serving_server.pipeline
import
PipelineServer
from
paddle_serving_server.pipeline.proto
import
pipeline_service_pb2
from
paddle_serving_server.pipeline.channel
import
ChannelDataEcode
from
paddle_serving_server.pipeline.channel
import
ChannelDataE
rr
code
import
numpy
as
np
from
paddle_serving_app.reader
import
IMDBDataset
from
paddle_serving_app.reader
.imdb_reader
import
IMDBDataset
import
logging
try
:
from
paddle_serving_server.web_service
import
WebService
except
ImportError
:
from
paddle_serving_server_gpu.web_service
import
WebService
_LOGGER
=
logging
.
getLogger
()
user_handler
=
logging
.
StreamHandler
()
...
...
@@ -43,76 +47,66 @@ class ImdbRequestOp(RequestOp):
word_ids
,
_
=
self
.
imdb_dataset
.
get_words_and_label
(
words
)
word_len
=
len
(
word_ids
)
dictdata
[
key
]
=
np
.
array
(
word_ids
).
reshape
(
word_len
,
1
)
dictdata
[
"{}.lod"
.
format
(
key
)]
=
[
0
,
word_len
]
return
dictdata
dictdata
[
"{}.lod"
.
format
(
key
)]
=
np
.
array
([
0
,
word_len
])
log_id
=
None
if
request
.
logid
is
not
None
:
log_id
=
request
.
logid
return
dictdata
,
log_id
,
None
,
""
class
CombineOp
(
Op
):
def
preprocess
(
self
,
input_data
):
def
preprocess
(
self
,
input_data
,
data_id
,
log_id
):
#_LOGGER.info("Enter CombineOp::preprocess")
combined_prediction
=
0
for
op_name
,
data
in
input_data
.
items
():
_LOGGER
.
info
(
"{}: {}"
.
format
(
op_name
,
data
[
"prediction"
]))
combined_prediction
+=
data
[
"prediction"
]
data
=
{
"prediction"
:
combined_prediction
/
2
}
return
data
return
data
,
False
,
None
,
""
class
ImdbResponseOp
(
ResponseOp
):
# Here ImdbResponseOp is consistent with the default ResponseOp implementation
def
pack_response_package
(
self
,
channeldata
):
resp
=
pipeline_service_pb2
.
Response
()
resp
.
e
code
=
channeldata
.
e
code
if
resp
.
e
code
==
ChannelDataE
code
.
OK
.
value
:
resp
.
e
rr_no
=
channeldata
.
error_
code
if
resp
.
e
rr_no
==
ChannelDataErr
code
.
OK
.
value
:
feed
=
channeldata
.
parse
()
# ndarray to string
for
name
,
var
in
feed
.
items
():
resp
.
value
.
append
(
var
.
__repr__
())
resp
.
key
.
append
(
name
)
else
:
resp
.
err
or_info
=
channeldata
.
error_info
resp
.
err
_msg
=
channeldata
.
error_info
return
resp
read_op
=
ImdbRequestOp
()
bow_op
=
Op
(
name
=
"bow"
,
input_ops
=
[
read_op
],
server_endpoints
=
[
"127.0.0.1:9393"
],
fetch_list
=
[
"prediction"
],
client_config
=
"imdb_bow_client_conf/serving_client_conf.prototxt"
,
client_type
=
'brpc'
,
concurrency
=
1
,
timeout
=-
1
,
retry
=
1
,
batch_size
=
1
,
auto_batching_timeout
=
None
)
cnn_op
=
Op
(
name
=
"cnn"
,
input_ops
=
[
read_op
],
server_endpoints
=
[
"127.0.0.1:9292"
],
fetch_list
=
[
"prediction"
],
client_config
=
"imdb_cnn_client_conf/serving_client_conf.prototxt"
,
client_type
=
'brpc'
,
concurrency
=
1
,
timeout
=-
1
,
retry
=
1
,
batch_size
=
1
,
auto_batching_timeout
=
None
)
combine_op
=
CombineOp
(
name
=
"combine"
,
input_ops
=
[
bow_op
,
cnn_op
],
concurrency
=
1
,
timeout
=-
1
,
retry
=
1
,
batch_size
=
2
,
auto_batching_timeout
=
None
)
class
BowOp
(
Op
):
def
init_op
(
self
):
pass
class
CnnOp
(
Op
):
def
init_op
(
self
):
pass
bow_op
=
BowOp
(
"bow"
,
input_ops
=
[
read_op
])
cnn_op
=
CnnOp
(
"cnn"
,
input_ops
=
[
read_op
])
combine_op
=
CombineOp
(
"combine"
,
input_ops
=
[
bow_op
,
cnn_op
])
# fetch output of bow_op
#
response_op = ImdbResponseOp(input_ops=[bow_op])
#response_op = ImdbResponseOp(input_ops=[bow_op])
# fetch output of combine_op
response_op
=
ImdbResponseOp
(
input_ops
=
[
combine_op
])
# use default ResponseOp implementation
#
response_op = ResponseOp(input_ops=[combine_op])
#response_op = ResponseOp(input_ops=[combine_op])
server
=
PipelineServer
()
server
.
set_response_op
(
response_op
)
...
...
python/examples/pipeline/ocr/README.md
浏览文件 @
d8c7c40c
...
...
@@ -28,31 +28,9 @@ python web_service.py &>log.txt &
python pipeline_http_client.py
```
<!--
## More (PipelineServing)
You can choose one of the following versions to start Service.
### Remote Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### Local Service Version
```
python local_service_pipeline_server.py &>pipeline.log &
```
### Hybrid Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## Client Prediction
### RPC
...
...
python/examples/pipeline/ocr/README_CN.md
浏览文件 @
d8c7c40c
...
...
@@ -31,26 +31,6 @@ python pipeline_http_client.py
<!--
## 其他 (PipelineServing)
你可以选择下面任意一种版本启动服务。
### 远程服务版本
```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### 本地服务版本
```
python local_service_pipeline_server.py &>pipeline.log &
```
### 混合服务版本
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## 启动客户端
### RPC
...
...
python/examples/pipeline/ocr/config.yml
浏览文件 @
d8c7c40c
rpc_port
:
18080
worker_num
:
4
build_dag_each_worker
:
false
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port
:
18090
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port
:
9999
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num
:
1
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker
:
false
dag
:
is_thread_op
:
false
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op
:
False
#重试次数
retry
:
1
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile
:
false
op
:
det
:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
2
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf
:
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type
:
local_predictor
#det模型路径
model_config
:
ocr_det_model
devices
:
"
"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list
:
[
"
concat_1.tmp_0"
]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices
:
"
0"
rec
:
concurrency
:
1
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
2
#超时时间, 单位ms
timeout
:
-1
#Serving交互重试次数,默认不重试
retry
:
1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf
:
#client类型,包括brpc, grpc和local_predictor。local_predictor不启动Serving服务,进程内预测
client_type
:
local_predictor
#rec模型路径
model_config
:
ocr_rec_model
devices
:
"
"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list
:
[
"
ctc_greedy_decoder_0.tmp_0"
,
"
softmax_0.tmp_0"
]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices
:
"
0"
python/examples/pipeline/ocr/hybrid_service_pipeline_server.py
已删除
100644 → 0
浏览文件 @
0849586d
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from
paddle_serving_server_gpu.pipeline
import
Op
,
RequestOp
,
ResponseOp
from
paddle_serving_server_gpu.pipeline
import
PipelineServer
from
paddle_serving_server_gpu.pipeline.proto
import
pipeline_service_pb2
from
paddle_serving_server_gpu.pipeline.channel
import
ChannelDataEcode
from
paddle_serving_server_gpu.pipeline
import
LocalRpcServiceHandler
import
numpy
as
np
import
cv2
import
time
import
base64
import
json
from
paddle_serving_app.reader
import
OCRReader
from
paddle_serving_app.reader
import
Sequential
,
ResizeByFactor
from
paddle_serving_app.reader
import
Div
,
Normalize
,
Transpose
from
paddle_serving_app.reader
import
DBPostProcess
,
FilterBoxes
,
GetRotateCropImage
,
SortedBoxes
import
time
import
re
import
base64
import
logging
_LOGGER
=
logging
.
getLogger
()
class
DetOp
(
Op
):
def
init_op
(
self
):
self
.
det_preprocess
=
Sequential
([
ResizeByFactor
(
32
,
960
),
Div
(
255
),
Normalize
([
0.485
,
0.456
,
0.406
],
[
0.229
,
0.224
,
0.225
]),
Transpose
(
(
2
,
0
,
1
))
])
self
.
filter_func
=
FilterBoxes
(
10
,
10
)
self
.
post_func
=
DBPostProcess
({
"thresh"
:
0.3
,
"box_thresh"
:
0.5
,
"max_candidates"
:
1000
,
"unclip_ratio"
:
1.5
,
"min_size"
:
3
})
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
data
=
base64
.
b64decode
(
input_dict
[
"image"
].
encode
(
'utf8'
))
data
=
np
.
fromstring
(
data
,
np
.
uint8
)
# Note: class variables(self.var) can only be used in process op mode
self
.
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
det_img
=
self
.
det_preprocess
(
self
.
im
)
_
,
self
.
new_h
,
self
.
new_w
=
det_img
.
shape
return
{
"image"
:
det_img
}
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
det_out
=
fetch_dict
[
"concat_1.tmp_0"
]
ratio_list
=
[
float
(
self
.
new_h
)
/
self
.
ori_h
,
float
(
self
.
new_w
)
/
self
.
ori_w
]
dt_boxes_list
=
self
.
post_func
(
det_out
,
[
ratio_list
])
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
im
}
return
out_dict
class
RecOp
(
Op
):
def
init_op
(
self
):
self
.
ocr_reader
=
OCRReader
()
self
.
get_rotate_crop_image
=
GetRotateCropImage
()
self
.
sorted_boxes
=
SortedBoxes
()
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
im
=
input_dict
[
"image"
]
dt_boxes
=
input_dict
[
"dt_boxes"
]
dt_boxes
=
self
.
sorted_boxes
(
dt_boxes
)
feed_list
=
[]
img_list
=
[]
max_wh_ratio
=
0
for
i
,
dtbox
in
enumerate
(
dt_boxes
):
boximg
=
self
.
get_rotate_crop_image
(
im
,
dt_boxes
[
i
])
img_list
.
append
(
boximg
)
h
,
w
=
boximg
.
shape
[
0
:
2
]
wh_ratio
=
w
*
1.0
/
h
max_wh_ratio
=
max
(
max_wh_ratio
,
wh_ratio
)
for
img
in
img_list
:
norm_img
=
self
.
ocr_reader
.
resize_norm_img
(
img
,
max_wh_ratio
)
feed
=
{
"image"
:
norm_img
}
feed_list
.
append
(
feed
)
return
feed_list
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
rec_res
=
self
.
ocr_reader
.
postprocess
(
fetch_dict
,
with_score
=
True
)
res_lst
=
[]
for
res
in
rec_res
:
res_lst
.
append
(
res
[
0
])
res
=
{
"res"
:
str
(
res_lst
)}
return
res
read_op
=
RequestOp
()
det_op
=
DetOp
(
name
=
"det"
,
input_ops
=
[
read_op
],
local_rpc_service_handler
=
LocalRpcServiceHandler
(
model_config
=
"ocr_det_model"
,
workdir
=
"det_workdir"
,
# defalut: "workdir"
thread_num
=
2
,
# defalut: 2
devices
=
"0"
,
# gpu0. defalut: "" (cpu)
mem_optim
=
True
,
# defalut: True
ir_optim
=
False
,
# defalut: False
available_port_generator
=
None
),
# defalut: None
concurrency
=
1
)
rec_op
=
RecOp
(
name
=
"rec"
,
input_ops
=
[
det_op
],
server_endpoints
=
[
"127.0.0.1:12001"
],
fetch_list
=
[
"ctc_greedy_decoder_0.tmp_0"
,
"softmax_0.tmp_0"
],
client_config
=
"ocr_rec_client/serving_client_conf.prototxt"
,
concurrency
=
1
)
response_op
=
ResponseOp
(
input_ops
=
[
rec_op
])
server
=
PipelineServer
(
"ocr"
)
server
.
set_response_op
(
response_op
)
server
.
prepare_server
(
'config.yml'
)
server
.
run_server
()
python/examples/pipeline/ocr/local_service_pipeline_server.py
已删除
100644 → 0
浏览文件 @
0849586d
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from
paddle_serving_server.pipeline
import
Op
,
RequestOp
,
ResponseOp
from
paddle_serving_server.pipeline
import
PipelineServer
from
paddle_serving_server.pipeline.proto
import
pipeline_service_pb2
from
paddle_serving_server.pipeline.channel
import
ChannelDataEcode
from
paddle_serving_server.pipeline
import
LocalServiceHandler
import
numpy
as
np
import
cv2
import
time
import
base64
import
json
from
paddle_serving_app.reader
import
OCRReader
from
paddle_serving_app.reader
import
Sequential
,
ResizeByFactor
from
paddle_serving_app.reader
import
Div
,
Normalize
,
Transpose
from
paddle_serving_app.reader
import
DBPostProcess
,
FilterBoxes
,
GetRotateCropImage
,
SortedBoxes
import
time
import
re
import
base64
import
logging
_LOGGER
=
logging
.
getLogger
()
class
DetOp
(
Op
):
def
init_op
(
self
):
self
.
det_preprocess
=
Sequential
([
ResizeByFactor
(
32
,
960
),
Div
(
255
),
Normalize
([
0.485
,
0.456
,
0.406
],
[
0.229
,
0.224
,
0.225
]),
Transpose
(
(
2
,
0
,
1
))
])
self
.
filter_func
=
FilterBoxes
(
10
,
10
)
self
.
post_func
=
DBPostProcess
({
"thresh"
:
0.3
,
"box_thresh"
:
0.5
,
"max_candidates"
:
1000
,
"unclip_ratio"
:
1.5
,
"min_size"
:
3
})
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
data
=
base64
.
b64decode
(
input_dict
[
"image"
].
encode
(
'utf8'
))
data
=
np
.
fromstring
(
data
,
np
.
uint8
)
# Note: class variables(self.var) can only be used in process op mode
self
.
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
print
(
self
.
im
)
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
det_img
=
self
.
det_preprocess
(
self
.
im
)
_
,
self
.
new_h
,
self
.
new_w
=
det_img
.
shape
print
(
"image"
,
det_img
)
return
{
"image"
:
det_img
}
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
det_out
=
fetch_dict
[
"concat_1.tmp_0"
]
ratio_list
=
[
float
(
self
.
new_h
)
/
self
.
ori_h
,
float
(
self
.
new_w
)
/
self
.
ori_w
]
dt_boxes_list
=
self
.
post_func
(
det_out
,
[
ratio_list
])
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
im
}
return
out_dict
class
RecOp
(
Op
):
def
init_op
(
self
):
self
.
ocr_reader
=
OCRReader
()
self
.
get_rotate_crop_image
=
GetRotateCropImage
()
self
.
sorted_boxes
=
SortedBoxes
()
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
im
=
input_dict
[
"image"
]
dt_boxes
=
input_dict
[
"dt_boxes"
]
dt_boxes
=
self
.
sorted_boxes
(
dt_boxes
)
feed_list
=
[]
img_list
=
[]
max_wh_ratio
=
0
for
i
,
dtbox
in
enumerate
(
dt_boxes
):
boximg
=
self
.
get_rotate_crop_image
(
im
,
dt_boxes
[
i
])
img_list
.
append
(
boximg
)
h
,
w
=
boximg
.
shape
[
0
:
2
]
wh_ratio
=
w
*
1.0
/
h
max_wh_ratio
=
max
(
max_wh_ratio
,
wh_ratio
)
for
img
in
img_list
:
norm_img
=
self
.
ocr_reader
.
resize_norm_img
(
img
,
max_wh_ratio
)
feed
=
{
"image"
:
norm_img
}
feed_list
.
append
(
feed
)
return
feed_list
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
rec_res
=
self
.
ocr_reader
.
postprocess
(
fetch_dict
,
with_score
=
True
)
res_lst
=
[]
for
res
in
rec_res
:
res_lst
.
append
(
res
[
0
])
res
=
{
"res"
:
str
(
res_lst
)}
return
res
read_op
=
RequestOp
()
det_op
=
DetOp
(
name
=
"det"
,
input_ops
=
[
read_op
],
client_type
=
"local_predictor"
,
local_service_handler
=
LocalServiceHandler
(
model_config
=
"ocr_det_model"
,
workdir
=
"det_workdir"
,
# defalut: "workdir"
thread_num
=
2
,
# defalut: 2
mem_optim
=
True
,
# defalut: True
ir_optim
=
False
,
# defalut: False
available_port_generator
=
None
),
# defalut: None
concurrency
=
1
)
rec_op
=
RecOp
(
name
=
"rec"
,
input_ops
=
[
det_op
],
client_type
=
"local_predictor"
,
local_service_handler
=
LocalServiceHandler
(
model_config
=
"ocr_rec_model"
),
concurrency
=
1
)
response_op
=
ResponseOp
(
input_ops
=
[
rec_op
])
server
=
PipelineServer
(
"ocr"
)
server
.
set_response_op
(
response_op
)
server
.
prepare_server
(
'config.yml'
)
server
.
run_server
()
python/examples/pipeline/ocr/pipeline_rpc_client.py
浏览文件 @
d8c7c40c
...
...
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
paddle_serving_server.pipeline
import
PipelineClient
from
paddle_serving_server
_gpu
.pipeline
import
PipelineClient
import
numpy
as
np
import
requests
import
json
...
...
@@ -20,7 +20,7 @@ import base64
import
os
client
=
PipelineClient
()
client
.
connect
([
'127.0.0.1:180
8
0'
])
client
.
connect
([
'127.0.0.1:180
9
0'
])
def
cv2_to_base64
(
image
):
...
...
python/examples/pipeline/ocr/remote_service_pipeline_server.py
已删除
100644 → 0
浏览文件 @
0849586d
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from
paddle_serving_server_gpu.pipeline
import
Op
,
RequestOp
,
ResponseOp
from
paddle_serving_server_gpu.pipeline
import
PipelineServer
from
paddle_serving_server_gpu.pipeline.proto
import
pipeline_service_pb2
from
paddle_serving_server_gpu.pipeline.channel
import
ChannelDataEcode
import
numpy
as
np
import
cv2
import
time
import
base64
import
json
from
paddle_serving_app.reader
import
OCRReader
from
paddle_serving_app.reader
import
Sequential
,
ResizeByFactor
from
paddle_serving_app.reader
import
Div
,
Normalize
,
Transpose
from
paddle_serving_app.reader
import
DBPostProcess
,
FilterBoxes
,
GetRotateCropImage
,
SortedBoxes
import
time
import
re
import
base64
import
logging
_LOGGER
=
logging
.
getLogger
()
class
DetOp
(
Op
):
def
init_op
(
self
):
self
.
det_preprocess
=
Sequential
([
ResizeByFactor
(
32
,
960
),
Div
(
255
),
Normalize
([
0.485
,
0.456
,
0.406
],
[
0.229
,
0.224
,
0.225
]),
Transpose
(
(
2
,
0
,
1
))
])
self
.
filter_func
=
FilterBoxes
(
10
,
10
)
self
.
post_func
=
DBPostProcess
({
"thresh"
:
0.3
,
"box_thresh"
:
0.5
,
"max_candidates"
:
1000
,
"unclip_ratio"
:
1.5
,
"min_size"
:
3
})
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
data
=
base64
.
b64decode
(
input_dict
[
"image"
].
encode
(
'utf8'
))
data
=
np
.
fromstring
(
data
,
np
.
uint8
)
# Note: class variables(self.var) can only be used in process op mode
self
.
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
det_img
=
self
.
det_preprocess
(
self
.
im
)
_
,
self
.
new_h
,
self
.
new_w
=
det_img
.
shape
return
{
"image"
:
det_img
}
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
det_out
=
fetch_dict
[
"concat_1.tmp_0"
]
ratio_list
=
[
float
(
self
.
new_h
)
/
self
.
ori_h
,
float
(
self
.
new_w
)
/
self
.
ori_w
]
dt_boxes_list
=
self
.
post_func
(
det_out
,
[
ratio_list
])
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
im
}
return
out_dict
class
RecOp
(
Op
):
def
init_op
(
self
):
self
.
ocr_reader
=
OCRReader
()
self
.
get_rotate_crop_image
=
GetRotateCropImage
()
self
.
sorted_boxes
=
SortedBoxes
()
def
preprocess
(
self
,
input_dicts
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
im
=
input_dict
[
"image"
]
dt_boxes
=
input_dict
[
"dt_boxes"
]
dt_boxes
=
self
.
sorted_boxes
(
dt_boxes
)
feed_list
=
[]
img_list
=
[]
max_wh_ratio
=
0
for
i
,
dtbox
in
enumerate
(
dt_boxes
):
boximg
=
self
.
get_rotate_crop_image
(
im
,
dt_boxes
[
i
])
img_list
.
append
(
boximg
)
h
,
w
=
boximg
.
shape
[
0
:
2
]
wh_ratio
=
w
*
1.0
/
h
max_wh_ratio
=
max
(
max_wh_ratio
,
wh_ratio
)
for
img
in
img_list
:
norm_img
=
self
.
ocr_reader
.
resize_norm_img
(
img
,
max_wh_ratio
)
feed
=
{
"image"
:
norm_img
}
feed_list
.
append
(
feed
)
return
feed_list
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
rec_res
=
self
.
ocr_reader
.
postprocess
(
fetch_dict
,
with_score
=
True
)
res_lst
=
[]
for
res
in
rec_res
:
res_lst
.
append
(
res
[
0
])
res
=
{
"res"
:
str
(
res_lst
)}
return
res
read_op
=
RequestOp
()
det_op
=
DetOp
(
name
=
"det"
,
input_ops
=
[
read_op
],
server_endpoints
=
[
"127.0.0.1:12000"
],
fetch_list
=
[
"concat_1.tmp_0"
],
client_config
=
"ocr_det_client/serving_client_conf.prototxt"
,
concurrency
=
1
)
rec_op
=
RecOp
(
name
=
"rec"
,
input_ops
=
[
det_op
],
server_endpoints
=
[
"127.0.0.1:12001"
],
fetch_list
=
[
"ctc_greedy_decoder_0.tmp_0"
,
"softmax_0.tmp_0"
],
client_config
=
"ocr_rec_client/serving_client_conf.prototxt"
,
concurrency
=
1
)
response_op
=
ResponseOp
(
input_ops
=
[
rec_op
])
server
=
PipelineServer
(
"ocr"
)
server
.
set_response_op
(
response_op
)
server
.
prepare_server
(
'config.yml'
)
server
.
run_server
()
python/examples/pipeline/ocr/web_service.py
浏览文件 @
d8c7c40c
...
...
@@ -14,7 +14,7 @@
try
:
from
paddle_serving_server.web_service
import
WebService
,
Op
except
ImportError
:
from
paddle_serving_server.web_service
import
WebService
,
Op
from
paddle_serving_server
_gpu
.web_service
import
WebService
,
Op
import
logging
import
numpy
as
np
import
cv2
...
...
@@ -43,7 +43,7 @@ class DetOp(Op):
"min_size"
:
3
})
def
preprocess
(
self
,
input_dicts
):
def
preprocess
(
self
,
input_dicts
,
data_id
,
log_id
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
data
=
base64
.
b64decode
(
input_dict
[
"image"
].
encode
(
'utf8'
))
data
=
np
.
fromstring
(
data
,
np
.
uint8
)
...
...
@@ -52,9 +52,9 @@ class DetOp(Op):
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
det_img
=
self
.
det_preprocess
(
self
.
im
)
_
,
self
.
new_h
,
self
.
new_w
=
det_img
.
shape
return
{
"image"
:
det_img
[
np
.
newaxis
,
:]
}
return
{
"image"
:
det_img
[
np
.
newaxis
,
:]
.
copy
()},
False
,
None
,
""
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
def
postprocess
(
self
,
input_dicts
,
fetch_dict
,
log_id
):
det_out
=
fetch_dict
[
"concat_1.tmp_0"
]
ratio_list
=
[
float
(
self
.
new_h
)
/
self
.
ori_h
,
float
(
self
.
new_w
)
/
self
.
ori_w
...
...
@@ -63,7 +63,7 @@ class DetOp(Op):
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
im
}
print
(
"out dict"
,
out_dict
)
return
out_dict
return
out_dict
,
None
,
""
class
RecOp
(
Op
):
...
...
@@ -72,7 +72,7 @@ class RecOp(Op):
self
.
get_rotate_crop_image
=
GetRotateCropImage
()
self
.
sorted_boxes
=
SortedBoxes
()
def
preprocess
(
self
,
input_dicts
):
def
preprocess
(
self
,
input_dicts
,
data_id
,
log_id
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
im
=
input_dict
[
"image"
]
dt_boxes
=
input_dict
[
"dt_boxes"
]
...
...
@@ -93,15 +93,15 @@ class RecOp(Op):
norm_img
=
self
.
ocr_reader
.
resize_norm_img
(
img
,
max_wh_ratio
)
imgs
[
id
]
=
norm_img
feed
=
{
"image"
:
imgs
.
copy
()}
return
feed
return
feed
,
False
,
None
,
""
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
def
postprocess
(
self
,
input_dicts
,
fetch_dict
,
log_id
):
rec_res
=
self
.
ocr_reader
.
postprocess
(
fetch_dict
,
with_score
=
True
)
res_lst
=
[]
for
res
in
rec_res
:
res_lst
.
append
(
res
[
0
])
res
=
{
"res"
:
str
(
res_lst
)}
return
res
return
res
,
None
,
""
class
OcrService
(
WebService
):
...
...
@@ -112,5 +112,5 @@ class OcrService(WebService):
uci_service
=
OcrService
(
name
=
"ocr"
)
uci_service
.
prepare_pipeline_config
(
"
brpc_
config.yml"
)
uci_service
.
prepare_pipeline_config
(
"config.yml"
)
uci_service
.
run_service
()
python/examples/pipeline/simple_web_service/README.md
浏览文件 @
d8c7c40c
...
...
@@ -15,5 +15,5 @@ python web_service.py &>log.txt &
## Http test
```
curl -X POST -k http://localhost:1808
0
/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
curl -X POST -k http://localhost:1808
2
/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
python/examples/pipeline/simple_web_service/README_CN.md
浏览文件 @
d8c7c40c
...
...
@@ -15,5 +15,5 @@ python web_service.py &>log.txt &
## 测试
```
curl -X POST -k http://localhost:1808
0
/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
curl -X POST -k http://localhost:1808
2
/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
python/examples/pipeline/simple_web_service/config.yml
浏览文件 @
d8c7c40c
worker_num
:
4
http_port
:
18080
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num
:
1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port
:
18082
dag
:
is_thread_op
:
false
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op
:
False
op
:
uci
:
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf
:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency
:
2
#uci模型路径
model_config
:
uci_housing_model
devices
:
"
"
# "0,1"
client_type
:
brpc
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices
:
"
0"
# "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type
:
local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list
:
[
"
price"
]
python/examples/pipeline/simple_web_service/web_service.py
浏览文件 @
d8c7c40c
...
...
@@ -25,20 +25,25 @@ class UciOp(Op):
def
init_op
(
self
):
self
.
separator
=
","
def
preprocess
(
self
,
input_dicts
):
def
preprocess
(
self
,
input_dicts
,
data_id
,
log_id
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
_LOGGER
.
info
(
input_dict
)
_LOGGER
.
error
(
"UciOp::preprocess >>> log_id:{}, input:{}"
.
format
(
log_id
,
input_dict
))
x_value
=
input_dict
[
"x"
]
proc_dict
=
{}
if
isinstance
(
x_value
,
(
str
,
unicode
)):
input_dict
[
"x"
]
=
np
.
array
(
[
float
(
x
.
strip
())
for
x
in
x_value
.
split
(
self
.
separator
)]).
reshape
(
1
,
13
)
return
input_dict
_LOGGER
.
error
(
"input_dict:{}"
.
format
(
input_dict
))
def
postprocess
(
self
,
input_dicts
,
fetch_dict
):
# _LOGGER.info(fetch_dict)
return
input_dict
,
False
,
None
,
""
def
postprocess
(
self
,
input_dicts
,
fetch_dict
,
log_id
):
_LOGGER
.
info
(
"UciOp::postprocess >>> log_id:{}, fetch_dict:{}"
.
format
(
log_id
,
fetch_dict
))
fetch_dict
[
"price"
]
=
str
(
fetch_dict
[
"price"
][
0
][
0
])
return
fetch_dict
return
fetch_dict
,
None
,
""
class
UciService
(
WebService
):
...
...
python/paddle_serving_app/local_predict.py
浏览文件 @
d8c7c40c
...
...
@@ -32,6 +32,12 @@ logger.setLevel(logging.INFO)
class
LocalPredictor
(
object
):
"""
Prediction in the current process of the local environment, in process
call, Compared with RPC/HTTP, LocalPredictor has better performance,
because of no network and packaging load.
"""
def
__init__
(
self
):
self
.
feed_names_
=
[]
self
.
fetch_names_
=
[]
...
...
@@ -42,13 +48,41 @@ class LocalPredictor(object):
self
.
fetch_names_to_idx_
=
{}
self
.
fetch_names_to_type_
=
{}
def
load_model_config
(
self
,
model_path
,
gpu
=
False
,
profile
=
True
,
cpu_num
=
1
):
def
load_model_config
(
self
,
model_path
,
use_gpu
=
False
,
gpu_id
=
0
,
use_profile
=
False
,
thread_num
=
1
,
mem_optim
=
True
,
ir_optim
=
False
,
use_trt
=
False
,
use_feed_fetch_ops
=
False
):
"""
Load model config and set the engine config for the paddle predictor
Args:
model_path: model config path.
use_gpu: calculating with gpu, False default.
gpu_id: gpu id, 0 default.
use_profile: use predictor profiles, False default.
thread_num: thread nums, default 1.
mem_optim: memory optimization, True default.
ir_optim: open calculation chart optimization, False default.
use_trt: use nvidia TensorRT optimization, False default
use_feed_fetch_ops: use feed/fetch ops, False default.
"""
client_config
=
"{}/serving_server_conf.prototxt"
.
format
(
model_path
)
model_conf
=
m_config
.
GeneralModelConfig
()
f
=
open
(
client_config
,
'r'
)
model_conf
=
google
.
protobuf
.
text_format
.
Merge
(
str
(
f
.
read
()),
model_conf
)
config
=
AnalysisConfig
(
model_path
)
logger
.
info
(
"load_model_config params: model_path:{}, use_gpu:{},
\
gpu_id:{}, use_profile:{}, thread_num:{}, mem_optim:{}, ir_optim:{},
\
use_trt:{}, use_feed_fetch_ops:{}"
.
format
(
model_path
,
use_gpu
,
gpu_id
,
use_profile
,
thread_num
,
mem_optim
,
ir_optim
,
use_trt
,
use_feed_fetch_ops
))
self
.
feed_names_
=
[
var
.
alias_name
for
var
in
model_conf
.
feed_var
]
self
.
fetch_names_
=
[
var
.
alias_name
for
var
in
model_conf
.
fetch_var
]
...
...
@@ -64,19 +98,43 @@ class LocalPredictor(object):
self
.
fetch_names_to_idx_
[
var
.
alias_name
]
=
i
self
.
fetch_names_to_type_
[
var
.
alias_name
]
=
var
.
fetch_type
if
not
gpu
:
config
.
disable_gpu
()
else
:
config
.
enable_use_gpu
(
100
,
0
)
if
profile
:
if
use_profile
:
config
.
enable_profile
()
if
mem_optim
:
config
.
enable_memory_optim
()
config
.
switch_ir_optim
(
ir_optim
)
config
.
set_cpu_math_library_num_threads
(
thread_num
)
config
.
switch_use_feed_fetch_ops
(
use_feed_fetch_ops
)
config
.
delete_pass
(
"conv_transpose_eltwiseadd_bn_fuse_pass"
)
config
.
set_cpu_math_library_num_threads
(
cpu_num
)
config
.
switch_ir_optim
(
False
)
config
.
switch_use_feed_fetch_ops
(
False
)
if
not
use_gpu
:
config
.
disable_gpu
()
else
:
config
.
enable_use_gpu
(
100
,
gpu_id
)
if
use_trt
:
config
.
enable_tensorrt_engine
(
workspace_size
=
1
<<
20
,
max_batch_size
=
32
,
min_subgraph_size
=
3
,
use_static
=
False
,
use_calib_mode
=
False
)
self
.
predictor
=
create_paddle_predictor
(
config
)
def
predict
(
self
,
feed
=
None
,
fetch
=
None
,
batch
=
False
,
log_id
=
0
):
"""
Predict locally
Args:
feed: feed var
fetch: fetch var
batch: batch data or not, False default.If batch is False, a new
dimension is added to header of the shape[np.newaxis].
log_id: for logging
Returns:
fetch_map: dict
"""
if
feed
is
None
or
fetch
is
None
:
raise
ValueError
(
"You should specify feed and fetch for prediction"
)
fetch_list
=
[]
...
...
python/paddle_serving_app/reader/__init__.py
浏览文件 @
d8c7c40c
...
...
@@ -18,5 +18,5 @@ from .image_reader import RCNNPostprocess, SegPostprocess, PadStride, BlazeFaceP
from
.image_reader
import
DBPostProcess
,
FilterBoxes
,
GetRotateCropImage
,
SortedBoxes
from
.lac_reader
import
LACReader
from
.senta_reader
import
SentaReader
from
.imdb_reader
import
IMDBDataset
#
from .imdb_reader import IMDBDataset
from
.ocr_reader
import
OCRReader
python/paddle_serving_app/reader/pddet/image_tool.py
浏览文件 @
d8c7c40c
...
...
@@ -22,18 +22,17 @@ import yaml
import
copy
import
argparse
import
logging
import
paddle.fluid
as
fluid
import
json
FORMAT
=
'%(asctime)s-%(levelname)s: %(message)s'
logging
.
basicConfig
(
level
=
logging
.
INFO
,
format
=
FORMAT
)
logger
=
logging
.
getLogger
(
__name__
)
precision_map
=
{
'trt_int8'
:
fluid
.
core
.
AnalysisConfig
.
Precision
.
Int8
,
'trt_fp32'
:
fluid
.
core
.
AnalysisConfig
.
Precision
.
Float32
,
'trt_fp16'
:
fluid
.
core
.
AnalysisConfig
.
Precision
.
Half
}
#
precision_map = {
#
'trt_int8': fluid.core.AnalysisConfig.Precision.Int8,
#
'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32,
#
'trt_fp16': fluid.core.AnalysisConfig.Precision.Half
#
}
class
Resize
(
object
):
...
...
python/paddle_serving_server/web_service.py
浏览文件 @
d8c7c40c
...
...
@@ -112,13 +112,14 @@ class WebService(object):
if
"fetch"
not
in
request
.
json
:
abort
(
400
)
try
:
feed
,
fetch
=
self
.
preprocess
(
request
.
json
[
"feed"
],
request
.
json
[
"fetch"
])
feed
,
fetch
,
is_batch
=
self
.
preprocess
(
request
.
json
[
"feed"
],
request
.
json
[
"fetch"
])
if
isinstance
(
feed
,
dict
)
and
"fetch"
in
feed
:
del
feed
[
"fetch"
]
if
len
(
feed
)
==
0
:
raise
ValueError
(
"empty input"
)
fetch_map
=
self
.
client
.
predict
(
feed
=
feed
,
fetch
=
fetch
,
batch
=
True
)
fetch_map
=
self
.
client
.
predict
(
feed
=
feed
,
fetch
=
fetch
,
batch
=
is_batch
)
result
=
self
.
postprocess
(
feed
=
request
.
json
[
"feed"
],
fetch
=
fetch
,
fetch_map
=
fetch_map
)
result
=
{
"result"
:
result
}
...
...
@@ -188,7 +189,8 @@ class WebService(object):
def
preprocess
(
self
,
feed
=
[],
fetch
=
[]):
print
(
"This API will be deprecated later. Please do not use it"
)
return
feed
,
fetch
is_batch
=
True
return
feed
,
fetch
,
is_batch
def
postprocess
(
self
,
feed
=
[],
fetch
=
[],
fetch_map
=
None
):
print
(
"This API will be deprecated later. Please do not use it"
)
...
...
python/paddle_serving_server_gpu/web_service.py
浏览文件 @
d8c7c40c
...
...
@@ -167,13 +167,14 @@ class WebService(object):
if
"fetch"
not
in
request
.
json
:
abort
(
400
)
try
:
feed
,
fetch
=
self
.
preprocess
(
request
.
json
[
"feed"
],
request
.
json
[
"fetch"
])
feed
,
fetch
,
is_batch
=
self
.
preprocess
(
request
.
json
[
"feed"
],
request
.
json
[
"fetch"
])
if
isinstance
(
feed
,
dict
)
and
"fetch"
in
feed
:
del
feed
[
"fetch"
]
if
len
(
feed
)
==
0
:
raise
ValueError
(
"empty input"
)
fetch_map
=
self
.
client
.
predict
(
feed
=
feed
,
fetch
=
fetch
)
fetch_map
=
self
.
client
.
predict
(
feed
=
feed
,
fetch
=
fetch
,
batch
=
is_batch
)
result
=
self
.
postprocess
(
feed
=
request
.
json
[
"feed"
],
fetch
=
fetch
,
fetch_map
=
fetch_map
)
result
=
{
"result"
:
result
}
...
...
@@ -249,7 +250,8 @@ class WebService(object):
def
preprocess
(
self
,
feed
=
[],
fetch
=
[]):
print
(
"This API will be deprecated later. Please do not use it"
)
return
feed
,
fetch
is_batch
=
True
return
feed
,
fetch
,
is_batch
def
postprocess
(
self
,
feed
=
[],
fetch
=
[],
fetch_map
=
None
):
print
(
"This API will be deprecated later. Please do not use it"
)
...
...
python/pipeline/channel.py
浏览文件 @
d8c7c40c
...
...
@@ -32,7 +32,10 @@ import copy
_LOGGER
=
logging
.
getLogger
(
__name__
)
class
ChannelDataEcode
(
enum
.
Enum
):
class
ChannelDataErrcode
(
enum
.
Enum
):
"""
ChannelData error code
"""
OK
=
0
TIMEOUT
=
1
NOT_IMPLEMENTED
=
2
...
...
@@ -42,9 +45,21 @@ class ChannelDataEcode(enum.Enum):
CLOSED_ERROR
=
6
NO_SERVICE
=
7
UNKNOW
=
8
PRODUCT_ERROR
=
9
class
ProductErrCode
(
enum
.
Enum
):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
pass
class
ChannelDataType
(
enum
.
Enum
):
"""
Channel data type
"""
DICT
=
0
CHANNEL_NPDATA
=
1
ERROR
=
2
...
...
@@ -56,20 +71,23 @@ class ChannelData(object):
npdata
=
None
,
dictdata
=
None
,
data_id
=
None
,
ecode
=
None
,
log_id
=
None
,
error_code
=
None
,
error_info
=
None
,
prod_error_code
=
None
,
prod_error_info
=
None
,
client_need_profile
=
False
):
'''
There are several ways to use it:
1. ChannelData(ChannelDataType.CHANNEL_NPDATA.value, npdata, data_id)
2. ChannelData(ChannelDataType.DICT.value, dictdata, data_id)
3. ChannelData(e
code, error_info, data
_id)
1. ChannelData(ChannelDataType.CHANNEL_NPDATA.value, npdata, data_id
, log_id
)
2. ChannelData(ChannelDataType.DICT.value, dictdata, data_id
, log_id
)
3. ChannelData(e
rror_code, error_info, prod_error_code, prod_error_info, data_id, log
_id)
Protobufs are not pickle-able:
https://stackoverflow.com/questions/55344376/how-to-import-protobuf-module
'''
if
ecode
is
not
None
:
if
e
rror_code
is
not
None
or
prod_error_
code
is
not
None
:
if
data_id
is
None
or
error_info
is
None
:
_LOGGER
.
critical
(
"Failed to generate ChannelData: data_id"
" and error_info cannot be None"
)
...
...
@@ -77,25 +95,30 @@ class ChannelData(object):
datatype
=
ChannelDataType
.
ERROR
.
value
else
:
if
datatype
==
ChannelDataType
.
CHANNEL_NPDATA
.
value
:
ecode
,
error_info
=
ChannelData
.
check_npdata
(
npdata
)
if
e
code
!=
ChannelDataE
code
.
OK
.
value
:
e
rror_
code
,
error_info
=
ChannelData
.
check_npdata
(
npdata
)
if
e
rror_code
!=
ChannelDataErr
code
.
OK
.
value
:
datatype
=
ChannelDataType
.
ERROR
.
value
_LOGGER
.
error
(
"(logid={}) {}"
.
format
(
data_id
,
error_info
))
_LOGGER
.
error
(
"(data_id={} log_id={}) {}"
.
format
(
data_id
,
log_id
,
error_info
))
elif
datatype
==
ChannelDataType
.
DICT
.
value
:
ecode
,
error_info
=
ChannelData
.
check_dictdata
(
dictdata
)
if
e
code
!=
ChannelDataE
code
.
OK
.
value
:
e
rror_
code
,
error_info
=
ChannelData
.
check_dictdata
(
dictdata
)
if
e
rror_code
!=
ChannelDataErr
code
.
OK
.
value
:
datatype
=
ChannelDataType
.
ERROR
.
value
_LOGGER
.
error
(
"(logid={}) {}"
.
format
(
data_id
,
error_info
))
_LOGGER
.
error
(
"(data_id={} log_id={}) {}"
.
format
(
data_id
,
log_id
,
error_info
))
else
:
_LOGGER
.
critical
(
"(
logid={}) datatype not match"
.
format
(
data
_id
))
_LOGGER
.
critical
(
"(
data_id={} log_id={}) datatype not match"
.
format
(
data_id
,
log
_id
))
os
.
_exit
(
-
1
)
self
.
datatype
=
datatype
self
.
npdata
=
npdata
self
.
dictdata
=
dictdata
self
.
id
=
data_id
self
.
ecode
=
ecode
self
.
log_id
=
log_id
self
.
error_code
=
error_code
self
.
error_info
=
error_info
self
.
prod_error_code
=
prod_error_code
self
.
prod_error_info
=
prod_error_info
self
.
client_need_profile
=
client_need_profile
self
.
profile_data_set
=
set
()
...
...
@@ -106,67 +129,67 @@ class ChannelData(object):
@
staticmethod
def
check_dictdata
(
dictdata
):
e
code
=
ChannelDataE
code
.
OK
.
value
e
rror_code
=
ChannelDataErr
code
.
OK
.
value
error_info
=
None
if
isinstance
(
dictdata
,
list
):
# batch data
for
sample
in
dictdata
:
if
not
isinstance
(
sample
,
dict
):
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the type of "
\
"data must be dict, but get {}."
.
format
(
type
(
sample
))
break
elif
not
isinstance
(
dictdata
,
dict
):
# batch size = 1
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the type of data must "
\
"be dict, but get {}."
.
format
(
type
(
dictdata
))
return
ecode
,
error_info
return
e
rror_
code
,
error_info
@
staticmethod
def
check_batch_npdata
(
batch
):
e
code
=
ChannelDataE
code
.
OK
.
value
e
rror_code
=
ChannelDataErr
code
.
OK
.
value
error_info
=
None
for
npdata
in
batch
:
ecode
,
error_info
=
ChannelData
.
check_npdata
(
npdata
)
if
e
code
!=
ChannelDataE
code
.
OK
.
value
:
e
rror_
code
,
error_info
=
ChannelData
.
check_npdata
(
npdata
)
if
e
rror_code
!=
ChannelDataErr
code
.
OK
.
value
:
break
return
ecode
,
error_info
return
e
rror_
code
,
error_info
@
staticmethod
def
check_npdata
(
npdata
):
e
code
=
ChannelDataE
code
.
OK
.
value
e
rror_code
=
ChannelDataErr
code
.
OK
.
value
error_info
=
None
if
isinstance
(
npdata
,
list
):
# batch data
for
sample
in
npdata
:
if
not
isinstance
(
sample
,
dict
):
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the "
\
"value of data must be dict, but get {}."
.
format
(
type
(
sample
))
break
for
_
,
value
in
sample
.
items
():
if
not
isinstance
(
value
,
np
.
ndarray
):
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the"
\
" value of data must be np.ndarray, but get {}."
.
format
(
type
(
value
))
return
ecode
,
error_info
return
e
rror_
code
,
error_info
elif
isinstance
(
npdata
,
dict
):
# batch_size = 1
for
_
,
value
in
npdata
.
items
():
if
not
isinstance
(
value
,
np
.
ndarray
):
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the value "
\
"of data must be np.ndarray, but get {}."
.
format
(
type
(
value
))
break
else
:
e
code
=
ChannelDataE
code
.
TYPE_ERROR
.
value
e
rror_code
=
ChannelDataErr
code
.
TYPE_ERROR
.
value
error_info
=
"Failed to check data: the value of data "
\
"must be dict, but get {}."
.
format
(
type
(
npdata
))
return
ecode
,
error_info
return
e
rror_
code
,
error_info
def
parse
(
self
):
feed
=
None
...
...
@@ -191,8 +214,9 @@ class ChannelData(object):
return
1
def
__str__
(
self
):
return
"type[{}], ecode[{}], id[{}]"
.
format
(
ChannelDataType
(
self
.
datatype
).
name
,
self
.
ecode
,
self
.
id
)
return
"type[{}], error_code[{}], data_id[{}], log_id[{}], dict_data[{}]"
.
format
(
ChannelDataType
(
self
.
datatype
).
name
,
self
.
error_code
,
self
.
id
,
self
.
log_id
,
str
(
self
.
dictdata
))
class
ProcessChannel
(
object
):
...
...
@@ -289,14 +313,14 @@ class ProcessChannel(object):
def
push
(
self
,
channeldata
,
op_name
=
None
):
_LOGGER
.
debug
(
self
.
_log
(
"(
logid={}) Op({}) Pushing data"
.
format
(
channeldata
.
id
,
op_name
)))
self
.
_log
(
"(
data_id={} log_id={}) Op({}) Enter channel::push"
.
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
if
len
(
self
.
_producers
)
==
0
:
_LOGGER
.
critical
(
self
.
_log
(
"(
log
id={}) Op({}) Failed to push data: expected number"
"(
data_id={} log_
id={}) Op({}) Failed to push data: expected number"
" of producers to be greater than 0, but the it is 0."
.
format
(
channeldata
.
id
,
op_name
)))
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
os
.
_exit
(
-
1
)
elif
len
(
self
.
_producers
)
==
1
:
with
self
.
_cv
:
...
...
@@ -310,19 +334,21 @@ class ProcessChannel(object):
raise
ChannelStopError
()
self
.
_cv
.
notify_all
()
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Pushed data into internal queue."
.
format
(
channeldata
.
id
,
op_name
)))
self
.
_log
(
"(data_id={} log_id={}) Op({}) Pushed data into internal queue."
.
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
return
True
elif
op_name
is
None
:
_LOGGER
.
critical
(
self
.
_log
(
"(
log
id={}) Op({}) Failed to push data: there are multiple "
"(
data_id={} log_
id={}) Op({}) Failed to push data: there are multiple "
"producers, so op_name cannot be None."
.
format
(
channeldata
.
id
,
op_name
)))
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
os
.
_exit
(
-
1
)
producer_num
=
len
(
self
.
_producers
)
data_id
=
channeldata
.
id
log_id
=
channeldata
.
log_id
put_data
=
None
with
self
.
_cv
:
if
data_id
not
in
self
.
_input_buf
:
...
...
@@ -347,8 +373,8 @@ class ProcessChannel(object):
if
put_data
is
None
:
_LOGGER
.
debug
(
self
.
_log
(
"(
log
id={}) Op({}) Pushed data into input_buffer."
.
format
(
data_id
,
op_name
)))
"(
data_id={} log_
id={}) Op({}) Pushed data into input_buffer."
.
format
(
data_id
,
log_id
,
op_name
)))
else
:
while
self
.
_stop
.
value
==
0
:
try
:
...
...
@@ -361,8 +387,8 @@ class ProcessChannel(object):
_LOGGER
.
debug
(
self
.
_log
(
"(
log
id={}) Op({}) Pushed data into internal_queue."
.
format
(
data_id
,
op_name
)))
"(
data_id={} log_
id={}) Op({}) Pushed data into internal_queue."
.
format
(
data_id
,
log_id
,
op_name
)))
self
.
_cv
.
notify_all
()
return
True
...
...
@@ -404,8 +430,8 @@ class ProcessChannel(object):
if
self
.
_stop
.
value
==
1
:
raise
ChannelStopError
()
_LOGGER
.
debug
(
self
.
_log
(
"(
logid={}) Op({}) Got data"
.
format
(
resp
.
values
()[
0
]
.
id
,
op_name
)))
self
.
_log
(
"(
data_id={} log_id={}) Op({}) Got data"
.
format
(
resp
.
values
()[
0
].
id
,
resp
.
values
()[
0
].
log_
id
,
op_name
)))
return
resp
elif
op_name
is
None
:
_LOGGER
.
critical
(
...
...
@@ -434,8 +460,9 @@ class ProcessChannel(object):
self
.
_output_buf
.
append
(
channeldata
)
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Pop ready item into output_buffer"
.
format
(
channeldata
.
values
()[
0
].
id
,
op_name
)))
"(data_id={} log_id={}) Op({}) Pop ready item into output_buffer"
.
format
(
channeldata
.
values
()[
0
].
id
,
channeldata
.
values
()[
0
].
log_id
,
op_name
)))
break
except
Queue
.
Empty
:
if
timeout
is
not
None
:
...
...
@@ -487,8 +514,9 @@ class ProcessChannel(object):
self
.
_cv
.
notify_all
()
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Got data from output_buffer"
.
format
(
resp
.
values
()[
0
].
id
,
op_name
)))
self
.
_log
(
"(data_id={} log_id={}) Op({}) Got data from output_buffer"
.
format
(
resp
.
values
()[
0
].
id
,
resp
.
values
()[
0
].
log_id
,
op_name
)))
return
resp
def
stop
(
self
):
...
...
@@ -586,14 +614,14 @@ class ThreadChannel(Queue.PriorityQueue):
def
push
(
self
,
channeldata
,
op_name
=
None
):
_LOGGER
.
debug
(
self
.
_log
(
"(
logid={}) Op({}) Pushing data"
.
format
(
channeldata
.
id
,
op_name
)))
self
.
_log
(
"(
data_id={} log_id={}) Op({}) Pushing data"
.
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
if
len
(
self
.
_producers
)
==
0
:
_LOGGER
.
critical
(
self
.
_log
(
"(
log
id={}) Op({}) Failed to push data: expected number of "
"(
data_id={} log_
id={}) Op({}) Failed to push data: expected number of "
"producers to be greater than 0, but the it is 0."
.
format
(
channeldata
.
id
,
op_name
)))
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
os
.
_exit
(
-
1
)
elif
len
(
self
.
_producers
)
==
1
:
with
self
.
_cv
:
...
...
@@ -607,19 +635,21 @@ class ThreadChannel(Queue.PriorityQueue):
raise
ChannelStopError
()
self
.
_cv
.
notify_all
()
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Pushed data into internal_queue."
.
format
(
channeldata
.
id
,
op_name
)))
self
.
_log
(
"(data_id={} log_id={}) Op({}) Pushed data into internal_queue."
.
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
return
True
elif
op_name
is
None
:
_LOGGER
.
critical
(
self
.
_log
(
"(
log
id={}) Op({}) Failed to push data: there are multiple"
"(
data_id={} log_
id={}) Op({}) Failed to push data: there are multiple"
" producers, so op_name cannot be None."
.
format
(
channeldata
.
id
,
op_name
)))
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
os
.
_exit
(
-
1
)
producer_num
=
len
(
self
.
_producers
)
data_id
=
channeldata
.
id
log_id
=
channeldata
.
log_id
put_data
=
None
with
self
.
_cv
:
if
data_id
not
in
self
.
_input_buf
:
...
...
@@ -639,8 +669,8 @@ class ThreadChannel(Queue.PriorityQueue):
if
put_data
is
None
:
_LOGGER
.
debug
(
self
.
_log
(
"(
log
id={}) Op({}) Pushed data into input_buffer."
.
format
(
data_id
,
op_name
)))
"(
data_id={} log_
id={}) Op({}) Pushed data into input_buffer."
.
format
(
data_id
,
log_id
,
op_name
)))
else
:
while
self
.
_stop
is
False
:
try
:
...
...
@@ -653,8 +683,8 @@ class ThreadChannel(Queue.PriorityQueue):
_LOGGER
.
debug
(
self
.
_log
(
"(
log
id={}) Op({}) Pushed data into internal_queue."
.
format
(
data_id
,
op_name
)))
"(
data_id={} log_
id={}) Op({}) Pushed data into internal_queue."
.
format
(
data_id
,
log_id
,
op_name
)))
self
.
_cv
.
notify_all
()
return
True
...
...
@@ -697,8 +727,8 @@ class ThreadChannel(Queue.PriorityQueue):
if
self
.
_stop
:
raise
ChannelStopError
()
_LOGGER
.
debug
(
self
.
_log
(
"(
logid={}) Op({}) Got data"
.
format
(
resp
.
values
()[
0
]
.
id
,
op_name
)))
self
.
_log
(
"(
data_id={} log_id={}) Op({}) Got data"
.
format
(
resp
.
values
()[
0
].
id
,
resp
.
values
()[
0
].
log_
id
,
op_name
)))
return
resp
elif
op_name
is
None
:
_LOGGER
.
critical
(
...
...
@@ -727,8 +757,9 @@ class ThreadChannel(Queue.PriorityQueue):
self
.
_output_buf
.
append
(
channeldata
)
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Pop ready item into output_buffer"
.
format
(
channeldata
.
values
()[
0
].
id
,
op_name
)))
"(data_id={} log_id={}) Op({}) Pop ready item into output_buffer"
.
format
(
channeldata
.
values
()[
0
].
id
,
channeldata
.
values
()[
0
].
log_id
,
op_name
)))
break
except
Queue
.
Empty
:
if
timeout
is
not
None
:
...
...
@@ -780,8 +811,9 @@ class ThreadChannel(Queue.PriorityQueue):
self
.
_cv
.
notify_all
()
_LOGGER
.
debug
(
self
.
_log
(
"(logid={}) Op({}) Got data from output_buffer"
.
format
(
resp
.
values
()[
0
].
id
,
op_name
)))
self
.
_log
(
"(data_id={} log_id={}) Op({}) Got data from output_buffer"
.
format
(
resp
.
values
()[
0
].
id
,
resp
.
values
()[
0
].
log_id
,
op_name
)))
return
resp
def
stop
(
self
):
...
...
python/pipeline/dag.py
浏览文件 @
d8c7c40c
此差异已折叠。
点击以展开。
python/pipeline/gateway/proto/gateway.proto
浏览文件 @
d8c7c40c
...
...
@@ -19,22 +19,25 @@ option go_package = ".;pipeline_serving";
import
"google/api/annotations.proto"
;
message
Response
{
repeated
string
key
=
1
;
repeated
string
value
=
2
;
int32
ecode
=
3
;
string
error_info
=
4
;
int32
err_no
=
1
;
string
err_msg
=
2
;
repeated
string
key
=
3
;
repeated
string
value
=
4
;
};
message
Request
{
repeated
string
key
=
1
;
repeated
string
value
=
2
;
string
name
=
3
;
}
string
method
=
4
;
int64
logid
=
5
;
string
clientip
=
6
;
};
service
PipelineService
{
rpc
inference
(
Request
)
returns
(
Response
)
{
option
(
google.api.http
)
=
{
post
:
"/{name=*}/
prediction
"
post
:
"/{name=*}/
{method=*}
"
body
:
"*"
};
}
...
...
python/pipeline/gateway/proxy_server.go
浏览文件 @
d8c7c40c
...
...
@@ -38,7 +38,8 @@ func run_proxy_server(grpc_port int, http_port int) error {
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
mux
:=
runtime
.
NewServeMux
()
//EmitDefaults=true, does not filter out the default inputs
mux
:=
runtime
.
NewServeMux
(
runtime
.
WithMarshalerOption
(
runtime
.
MIMEWildcard
,
&
runtime
.
JSONPb
{
OrigName
:
true
,
EmitDefaults
:
true
}))
opts
:=
[]
grpc
.
DialOption
{
grpc
.
WithInsecure
()}
err
:=
gw
.
RegisterPipelineServiceHandlerFromEndpoint
(
ctx
,
mux
,
*
pipelineEndpoint
,
opts
)
if
err
!=
nil
{
...
...
python/pipeline/local_service_handler.py
浏览文件 @
d8c7c40c
...
...
@@ -15,111 +15,203 @@
import
os
import
logging
import
multiprocessing
try
:
from
paddle_serving_server_gpu
import
OpMaker
,
OpSeqMaker
,
Server
PACKAGE_VERSION
=
"GPU"
except
ImportError
:
from
paddle_serving_server
import
OpMaker
,
OpSeqMaker
,
Server
PACKAGE_VERSION
=
"CPU"
#from paddle_serving_server_gpu import OpMaker, OpSeqMaker
#from paddle_serving_server_gpu import Server as GpuServer
#from paddle_serving_server import Server as CpuServer
from
.
import
util
from
paddle_serving_app.local_predict
import
LocalPredictor
#
from paddle_serving_app.local_predict import LocalPredictor
_LOGGER
=
logging
.
getLogger
(
__name__
)
_workdir_name_gen
=
util
.
NameGenerator
(
"workdir_"
)
class
LocalServiceHandler
(
object
):
"""
LocalServiceHandler is the processor of the local service, contains
three client types, brpc, grpc and local_predictor.If you use the
brpc or grpc, serveing startup ability is provided.If you use
local_predictor, local predict ability is provided by paddle_serving_app.
"""
def
__init__
(
self
,
model_config
,
client_type
=
'local_predictor'
,
workdir
=
""
,
thread_num
=
2
,
devices
=
""
,
fetch_names
=
None
,
mem_optim
=
True
,
ir_optim
=
False
,
available_port_generator
=
None
):
available_port_generator
=
None
,
use_trt
=
False
,
use_profile
=
False
):
"""
Initialization of localservicehandler
Args:
model_config: model config path
client_type: brpc, grpc and local_predictor[default]
workdir: work directory
thread_num: number of threads, concurrent quantity.
devices: gpu id list[gpu], "" default[cpu]
fetch_names: get fetch names out of LocalServiceHandler in
local_predictor mode. fetch_names_ is compatible for Client().
mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default.
available_port_generator: generate available ports
use_trt: use nvidia tensorRt engine, False default.
use_profile: use profiling, False default.
Returns:
None
"""
if
available_port_generator
is
None
:
available_port_generator
=
util
.
GetAvailablePortGenerator
()
self
.
_model_config
=
model_config
self
.
_port_list
=
[]
self
.
_device_type
=
"cpu"
if
devices
==
""
:
# cpu
devices
=
[
-
1
]
self
.
_device_type
=
"cpu"
self
.
_port_list
.
append
(
available_port_generator
.
next
())
_LOGGER
.
info
(
"Model({}) will be launch in cpu device. Port({})"
.
format
(
model_config
,
self
.
_port_list
))
else
:
# gpu
if
PACKAGE_VERSION
==
"CPU"
:
raise
ValueError
(
"You are using the CPU version package("
"paddle-serving-server), unable to set devices"
)
self
.
_device_type
=
"gpu"
devices
=
[
int
(
x
)
for
x
in
devices
.
split
(
","
)]
for
_
in
devices
:
self
.
_port_list
.
append
(
available_port_generator
.
next
())
_LOGGER
.
info
(
"Model({}) will be launch in gpu device: {}. Port({})"
.
format
(
model_config
,
devices
,
self
.
_port_list
))
self
.
client_type
=
client_type
self
.
_
client_type
=
client_type
self
.
_workdir
=
workdir
self
.
_devices
=
devices
self
.
_thread_num
=
thread_num
self
.
_mem_optim
=
mem_optim
self
.
_ir_optim
=
ir_optim
self
.
local_predictor_client
=
None
self
.
_
local_predictor_client
=
None
self
.
_rpc_service_list
=
[]
self
.
_server_pros
=
[]
self
.
_fetch_vars
=
None
self
.
_use_trt
=
use_trt
self
.
_use_profile
=
use_profile
self
.
fetch_names_
=
fetch_names
def
get_fetch_list
(
self
):
return
self
.
_fetch_vars
return
self
.
fetch_names_
def
get_port_list
(
self
):
return
self
.
_port_list
def
get_client
(
self
):
# for local_predictor_only
if
self
.
local_predictor_client
is
None
:
self
.
local_predictor_client
=
LocalPredictor
()
self
.
local_predictor_client
.
load_model_config
(
"{}"
.
format
(
self
.
_model_config
),
gpu
=
False
,
profile
=
False
)
return
self
.
local_predictor_client
def
get_client
(
self
):
"""
Function get_client is only used for local predictor case, creates one
LocalPredictor object, and initializes the paddle predictor by function
load_model_config.
Args:
None
Returns:
_local_predictor_client
"""
from
paddle_serving_app.local_predict
import
LocalPredictor
if
self
.
_local_predictor_client
is
None
:
self
.
_local_predictor_client
=
LocalPredictor
()
use_gpu
=
False
if
self
.
_device_type
==
"gpu"
:
use_gpu
=
True
self
.
_local_predictor_client
.
load_model_config
(
model_path
=
self
.
_model_config
,
use_gpu
=
use_gpu
,
gpu_id
=
self
.
_devices
[
0
],
use_profile
=
self
.
_use_profile
,
thread_num
=
self
.
_thread_num
,
mem_optim
=
self
.
_mem_optim
,
ir_optim
=
self
.
_ir_optim
,
use_trt
=
self
.
_use_trt
)
return
self
.
_local_predictor_client
def
get_client_config
(
self
):
return
os
.
path
.
join
(
self
.
_model_config
,
"serving_server_conf.prototxt"
)
def
_prepare_one_server
(
self
,
workdir
,
port
,
gpuid
,
thread_num
,
mem_optim
,
ir_optim
):
device
=
"gpu"
if
gpuid
==
-
1
:
device
=
"cpu"
op_maker
=
OpMaker
()
read_op
=
op_maker
.
create
(
'general_reader'
)
general_infer_op
=
op_maker
.
create
(
'general_infer'
)
general_response_op
=
op_maker
.
create
(
'general_response'
)
op_seq_maker
=
OpSeqMaker
()
op_seq_maker
.
add_op
(
read_op
)
op_seq_maker
.
add_op
(
general_infer_op
)
op_seq_maker
.
add_op
(
general_response_op
)
server
=
Server
()
"""
According to _device_type, generating one CpuServer or GpuServer, and
setting the model config amd startup params.
Args:
workdir: work directory
port: network port
gpuid: gpu id
thread_num: thread num
mem_optim: use memory/graphics memory optimization
ir_optim: use calculation chart optimization
Returns:
server: CpuServer/GpuServer
"""
if
self
.
_device_type
==
"cpu"
:
from
paddle_serving_server
import
OpMaker
,
OpSeqMaker
,
Server
op_maker
=
OpMaker
()
read_op
=
op_maker
.
create
(
'general_reader'
)
general_infer_op
=
op_maker
.
create
(
'general_infer'
)
general_response_op
=
op_maker
.
create
(
'general_response'
)
op_seq_maker
=
OpSeqMaker
()
op_seq_maker
.
add_op
(
read_op
)
op_seq_maker
.
add_op
(
general_infer_op
)
op_seq_maker
.
add_op
(
general_response_op
)
server
=
Server
()
else
:
#gpu
from
paddle_serving_server_gpu
import
OpMaker
,
OpSeqMaker
,
Server
op_maker
=
OpMaker
()
read_op
=
op_maker
.
create
(
'general_reader'
)
general_infer_op
=
op_maker
.
create
(
'general_infer'
)
general_response_op
=
op_maker
.
create
(
'general_response'
)
op_seq_maker
=
OpSeqMaker
()
op_seq_maker
.
add_op
(
read_op
)
op_seq_maker
.
add_op
(
general_infer_op
)
op_seq_maker
.
add_op
(
general_response_op
)
server
=
Server
()
if
gpuid
>=
0
:
server
.
set_gpuid
(
gpuid
)
server
.
set_op_sequence
(
op_seq_maker
.
get_op_sequence
())
server
.
set_num_threads
(
thread_num
)
server
.
set_memory_optimize
(
mem_optim
)
server
.
set_ir_optimize
(
ir_optim
)
server
.
load_model_config
(
self
.
_model_config
)
if
gpuid
>=
0
:
server
.
set_gpuid
(
gpuid
)
server
.
prepare_server
(
workdir
=
workdir
,
port
=
port
,
device
=
device
)
if
self
.
_fetch_vars
is
None
:
self
.
_fetch_vars
=
server
.
get_fetch_list
()
server
.
prepare_server
(
workdir
=
workdir
,
port
=
port
,
device
=
self
.
_device_type
)
if
self
.
fetch_names_
is
None
:
self
.
fetch_names_
=
server
.
get_fetch_list
()
return
server
def
_start_one_server
(
self
,
service_idx
):
"""
Start one server
Args:
service_idx: server index
Returns:
None
"""
self
.
_rpc_service_list
[
service_idx
].
run_server
()
def
prepare_server
(
self
):
"""
Prepare all servers to be started, and append them into list.
"""
for
i
,
device_id
in
enumerate
(
self
.
_devices
):
if
self
.
_workdir
!=
""
:
workdir
=
"{}_{}"
.
format
(
self
.
_workdir
,
i
)
...
...
@@ -135,6 +227,9 @@ class LocalServiceHandler(object):
ir_optim
=
self
.
_ir_optim
))
def
start_server
(
self
):
"""
Start multiple processes and start one server in each process
"""
for
i
,
service
in
enumerate
(
self
.
_rpc_service_list
):
p
=
multiprocessing
.
Process
(
target
=
self
.
_start_one_server
,
args
=
(
i
,
))
...
...
python/pipeline/operator.py
浏览文件 @
d8c7c40c
此差异已折叠。
点击以展开。
python/pipeline/pipeline_client.py
浏览文件 @
d8c7c40c
...
...
@@ -18,7 +18,9 @@ import numpy as np
from
numpy
import
*
import
logging
import
functools
from
.channel
import
ChannelDataEcode
import
json
import
socket
from
.channel
import
ChannelDataErrcode
from
.proto
import
pipeline_service_pb2
from
.proto
import
pipeline_service_pb2_grpc
...
...
@@ -26,6 +28,10 @@ _LOGGER = logging.getLogger(__name__)
class
PipelineClient
(
object
):
"""
PipelineClient provides the basic capabilities of the pipeline SDK
"""
def
__init__
(
self
):
self
.
_channel
=
None
self
.
_profile_key
=
"pipeline.profile"
...
...
@@ -42,6 +48,23 @@ class PipelineClient(object):
def
_pack_request_package
(
self
,
feed_dict
,
profile
):
req
=
pipeline_service_pb2
.
Request
()
logid
=
feed_dict
.
get
(
"logid"
)
if
logid
is
None
:
req
.
logid
=
0
else
:
req
.
logid
=
long
(
logid
)
feed_dict
.
pop
(
"logid"
)
clientip
=
feed_dict
.
get
(
"clientip"
)
if
clientip
is
None
:
hostname
=
socket
.
gethostname
()
ip
=
socket
.
gethostbyname
(
hostname
)
req
.
clientip
=
ip
else
:
req
.
clientip
=
clientip
feed_dict
.
pop
(
"clientip"
)
np
.
set_printoptions
(
threshold
=
sys
.
maxsize
)
for
key
,
value
in
feed_dict
.
items
():
req
.
key
.
append
(
key
)
...
...
@@ -60,29 +83,7 @@ class PipelineClient(object):
return
req
def
_unpack_response_package
(
self
,
resp
,
fetch
):
if
resp
.
ecode
!=
0
:
return
{
"ecode"
:
resp
.
ecode
,
"ecode_desc"
:
ChannelDataEcode
(
resp
.
ecode
),
"error_info"
:
resp
.
error_info
,
}
fetch_map
=
{
"ecode"
:
resp
.
ecode
}
for
idx
,
key
in
enumerate
(
resp
.
key
):
if
key
==
self
.
_profile_key
:
if
resp
.
value
[
idx
]
!=
""
:
sys
.
stderr
.
write
(
resp
.
value
[
idx
])
continue
if
fetch
is
not
None
and
key
not
in
fetch
:
continue
data
=
resp
.
value
[
idx
]
try
:
evaled_data
=
eval
(
data
)
if
isinstance
(
evaled_data
,
np
.
ndarray
):
data
=
evaled_data
except
Exception
as
e
:
pass
fetch_map
[
key
]
=
data
return
fetch_map
return
resp
def
predict
(
self
,
feed_dict
,
fetch
=
None
,
asyn
=
False
,
profile
=
False
):
if
not
isinstance
(
feed_dict
,
dict
):
...
...
python/pipeline/pipeline_server.py
浏览文件 @
d8c7c40c
...
...
@@ -32,6 +32,10 @@ _LOGGER = logging.getLogger(__name__)
class
PipelineServicer
(
pipeline_service_pb2_grpc
.
PipelineServiceServicer
):
"""
Pipeline Servicer entrance.
"""
def
__init__
(
self
,
name
,
response_op
,
dag_conf
,
worker_idx
=-
1
):
super
(
PipelineServicer
,
self
).
__init__
()
self
.
_name
=
name
...
...
@@ -42,10 +46,16 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
_LOGGER
.
info
(
"[PipelineServicer] succ init"
)
def
inference
(
self
,
request
,
context
):
_LOGGER
.
info
(
"(log_id={}) inference request name:{} self.name:{}"
.
format
(
request
.
logid
,
request
.
name
,
self
.
_name
))
if
request
.
name
!=
""
and
request
.
name
!=
self
.
_name
:
_LOGGER
.
error
(
"(log_id={}) name dismatch error. request.name:{},"
"server.name={}"
.
format
(
request
.
logid
,
request
.
name
,
self
.
_name
))
resp
=
pipeline_service_pb2
.
Response
()
resp
.
ecode
=
channel
.
ChannelDataEcode
.
NO_SERVICE
.
value
resp
.
error_info
=
"Failed to inference: Service name error."
resp
.
err_no
=
channel
.
ChannelDataErrcode
.
NO_SERVICE
.
value
resp
.
err_msg
=
"Failed to inference: Service name error."
resp
.
result
=
""
return
resp
resp
=
self
.
_dag_executor
.
call
(
request
)
return
resp
...
...
@@ -53,7 +63,9 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
@
contextlib
.
contextmanager
def
_reserve_port
(
port
):
"""Find and reserve a port for all subprocesses to use."""
"""
Find and reserve a port for all subprocesses to use.
"""
sock
=
socket
.
socket
(
socket
.
AF_INET6
,
socket
.
SOCK_STREAM
)
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEPORT
,
1
)
if
sock
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEPORT
)
==
0
:
...
...
@@ -66,6 +78,10 @@ def _reserve_port(port):
class
PipelineServer
(
object
):
"""
Pipeline Server : grpc gateway + grpc server.
"""
def
__init__
(
self
,
name
=
None
):
self
.
_name
=
name
# for grpc-gateway path
self
.
_rpc_port
=
None
...
...
@@ -74,6 +90,16 @@ class PipelineServer(object):
self
.
_proxy_server
=
None
def
_grpc_gateway
(
self
,
grpc_port
,
http_port
):
"""
Running a gateway server, linking libproxy_server.so
Args:
grpc_port: GRPC port
http_port: HTTP port
Returns:
None
"""
import
os
from
ctypes
import
cdll
from
.
import
gateway
...
...
@@ -83,6 +109,17 @@ class PipelineServer(object):
proxy_server
.
run_proxy_server
(
grpc_port
,
http_port
)
def
_run_grpc_gateway
(
self
,
grpc_port
,
http_port
):
"""
Starting the GRPC gateway in a new process. Exposing one
available HTTP port outside, and reflecting the data to RPC port.
Args:
grpc_port: GRPC port
http_port: HTTP port
Returns:
None
"""
if
http_port
<=
0
:
_LOGGER
.
info
(
"Ignore grpc_gateway configuration."
)
return
...
...
@@ -99,6 +136,15 @@ class PipelineServer(object):
self
.
_proxy_server
.
start
()
def
set_response_op
(
self
,
response_op
):
"""
Set the response OP.
Args:
response_op: ResponseOp or its subclass object
Returns:
None
"""
if
not
isinstance
(
response_op
,
operator
.
ResponseOp
):
raise
Exception
(
"Failed to set response_op: response_op "
"must be ResponseOp type."
)
...
...
@@ -109,6 +155,17 @@ class PipelineServer(object):
self
.
_used_op
,
_
=
dag
.
DAG
.
get_use_ops
(
self
.
_response_op
)
def
prepare_server
(
self
,
yml_file
=
None
,
yml_dict
=
None
):
"""
Reading configures from the yml file(config.yaml), and launching
local services.
Args:
yml_file: Reading configures from yaml files
yml_dict: Reading configures from yaml dict.
Returns:
None
"""
conf
=
ServerYamlConfChecker
.
load_server_yaml_conf
(
yml_file
=
yml_file
,
yml_dict
=
yml_dict
)
...
...
@@ -158,6 +215,15 @@ class PipelineServer(object):
self
.
_start_local_rpc_service
()
def
_init_ops
(
self
,
op_conf
):
"""
Initializing all OPs from dicetory.
Args:
op_conf: the op configures in yaml dict.
Returns:
None.
"""
default_conf
=
{
"concurrency"
:
1
,
"timeout"
:
-
1
,
...
...
@@ -187,12 +253,22 @@ class PipelineServer(object):
op
.
launch_local_rpc_service
()
def
run_server
(
self
):
"""
If _build_dag_each_worker is True, Starting _worker_num processes and
running one GRPC server in each process. Otherwise, Staring one GRPC
server.
Args:
None
Returns:
None
"""
if
self
.
_build_dag_each_worker
:
with
_reserve_port
(
self
.
_rpc_port
)
as
port
:
bind_address
=
'localhost:{}'
.
format
(
port
)
workers
=
[]
for
i
in
range
(
self
.
_worker_num
):
show_info
=
(
i
==
0
)
worker
=
multiprocessing
.
Process
(
target
=
self
.
_run_server_func
,
args
=
(
bind_address
,
self
.
_response_op
,
self
.
_conf
,
i
))
...
...
@@ -220,6 +296,15 @@ class PipelineServer(object):
server
.
wait_for_termination
()
def
_run_server_func
(
self
,
bind_address
,
response_op
,
dag_conf
,
worker_idx
):
"""
Running one GRPC server with PipelineServicer.
Args:
bind_address: binding IP/Port
response_op: ResponseOp or its subclass object
dag_conf: DAG config
worker_idx: Process index.
"""
options
=
[(
'grpc.so_reuseport'
,
1
),
(
'grpc.max_send_message_length'
,
256
*
1024
*
1024
),
(
'grpc.max_send_message_length'
,
256
*
1024
*
1024
)]
...
...
@@ -235,6 +320,10 @@ class PipelineServer(object):
class
ServerYamlConfChecker
(
object
):
"""
Checking validities of server yaml files.
"""
def
__init__
(
self
):
pass
...
...
python/pipeline/proto/pipeline_service.proto
浏览文件 @
d8c7c40c
...
...
@@ -19,13 +19,16 @@ message Request {
repeated
string
key
=
1
;
repeated
string
value
=
2
;
optional
string
name
=
3
;
optional
string
method
=
4
;
optional
int64
logid
=
5
;
optional
string
clientip
=
6
;
};
message
Response
{
repeated
string
key
=
1
;
repeated
string
value
=
2
;
re
quired
int32
ecode
=
3
;
optional
string
error_info
=
4
;
optional
int32
err_no
=
1
;
optional
string
err_msg
=
2
;
re
peated
string
key
=
3
;
repeated
string
value
=
4
;
};
service
PipelineService
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录