提交 4493dc04 编写于 作者: T TeslaZhao

supporting local_predictor of pipeline in thread/process mode, fixing pipeline...

supporting local_predictor of pipeline in thread/process mode, fixing pipeline examples, and adding lots of comments
上级 62f73236
rpc_port: 18080 #rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port: 18070
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18071
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 4 worker_num: 4
build_dag_each_worker: false
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: False
dag: dag:
is_thread_op: true #op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: True
#重试次数
retry: 1 retry: 1
use_profile: false
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile: False
#channel的最大长度,默认为0
channel_size: 0
#tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据
tracer:
#每次trace的时间间隔,单位秒/s
interval_s: 10
op: op:
bow: bow:
concurrency: 2 #并发数,is_thread_op=True时,为线程并发;否则为进程并发
remote_service_conf: concurrency: 1
client_type: brpc
model_config: imdb_bow_model #client连接类型,brpc
devices: "" client_type: brpc
rpc_port : 9393
#Serving交互重试次数,默认不重试
retry: 1
#Serving交互超时时间, 单位ms
timeout: 3000
#Serving IPs
server_endpoints: ["127.0.0.1:9393"]
#bow模型client端配置
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 1
#批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
cnn: cnn:
concurrency: 2 #并发数,is_thread_op=True时,为线程并发;否则为进程并发
remote_service_conf: concurrency: 1
client_type: brpc
model_config: imdb_cnn_model #client连接类型,brpc
devices: "" client_type: brpc
rpc_port : 9292
#Serving交互重试次数,默认不重试
retry: 1
#超时时间, 单位ms
timeout: 3000
#Serving IPs
server_endpoints: ["127.0.0.1:9292"]
#cnn模型client端配置
client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt"
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 1
#批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
combine:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#Serving交互重试次数,默认不重试
retry: 1
#超时时间, 单位ms
timeout: 3000
#批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 1
#批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
...@@ -15,21 +15,22 @@ from paddle_serving_server.pipeline import PipelineClient ...@@ -15,21 +15,22 @@ from paddle_serving_server.pipeline import PipelineClient
import numpy as np import numpy as np
client = PipelineClient() client = PipelineClient()
client.connect(['127.0.0.1:18080']) client.connect(['127.0.0.1:18070'])
words = 'i am very sad | 0' words = 'i am very sad | 0'
futures = [] futures = []
for i in range(4): for i in range(100):
futures.append( futures.append(
client.predict( client.predict(
feed_dict={"words": words}, feed_dict={"words": words,
"logid": 10000 + i},
fetch=["prediction"], fetch=["prediction"],
asyn=True, asyn=True,
profile=False)) profile=False))
for f in futures: for f in futures:
res = f.result() res = f.result()
if res["ecode"] != 0: if res.err_no != 0:
print("predict failed: {}".format(res)) print("predict failed: {}".format(res))
print(res) print(res)
...@@ -15,10 +15,14 @@ ...@@ -15,10 +15,14 @@
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode from paddle_serving_server.pipeline.channel import ChannelDataErrcode
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader.imdb_reader import IMDBDataset
import logging import logging
try:
from paddle_serving_server.web_service import WebService
except ImportError:
from paddle_serving_server_gpu.web_service import WebService
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
user_handler = logging.StreamHandler() user_handler = logging.StreamHandler()
...@@ -43,76 +47,66 @@ class ImdbRequestOp(RequestOp): ...@@ -43,76 +47,66 @@ class ImdbRequestOp(RequestOp):
word_ids, _ = self.imdb_dataset.get_words_and_label(words) word_ids, _ = self.imdb_dataset.get_words_and_label(words)
word_len = len(word_ids) word_len = len(word_ids)
dictdata[key] = np.array(word_ids).reshape(word_len, 1) dictdata[key] = np.array(word_ids).reshape(word_len, 1)
dictdata["{}.lod".format(key)] = [0, word_len] dictdata["{}.lod".format(key)] = np.array([0, word_len])
return dictdata
log_id = None
if request.logid is not None:
log_id = request.logid
return dictdata, log_id, None, ""
class CombineOp(Op): class CombineOp(Op):
def preprocess(self, input_data): def preprocess(self, input_data, data_id, log_id):
#_LOGGER.info("Enter CombineOp::preprocess")
combined_prediction = 0 combined_prediction = 0
for op_name, data in input_data.items(): for op_name, data in input_data.items():
_LOGGER.info("{}: {}".format(op_name, data["prediction"])) _LOGGER.info("{}: {}".format(op_name, data["prediction"]))
combined_prediction += data["prediction"] combined_prediction += data["prediction"]
data = {"prediction": combined_prediction / 2} data = {"prediction": combined_prediction / 2}
return data return data, False, None, ""
class ImdbResponseOp(ResponseOp): class ImdbResponseOp(ResponseOp):
# Here ImdbResponseOp is consistent with the default ResponseOp implementation # Here ImdbResponseOp is consistent with the default ResponseOp implementation
def pack_response_package(self, channeldata): def pack_response_package(self, channeldata):
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.ecode = channeldata.ecode resp.err_no = channeldata.error_code
if resp.ecode == ChannelDataEcode.OK.value: if resp.err_no == ChannelDataErrcode.OK.value:
feed = channeldata.parse() feed = channeldata.parse()
# ndarray to string # ndarray to string
for name, var in feed.items(): for name, var in feed.items():
resp.value.append(var.__repr__()) resp.value.append(var.__repr__())
resp.key.append(name) resp.key.append(name)
else: else:
resp.error_info = channeldata.error_info resp.err_msg = channeldata.error_info
return resp return resp
read_op = ImdbRequestOp() read_op = ImdbRequestOp()
bow_op = Op(name="bow",
input_ops=[read_op],
server_endpoints=["127.0.0.1:9393"], class BowOp(Op):
fetch_list=["prediction"], def init_op(self):
client_config="imdb_bow_client_conf/serving_client_conf.prototxt", pass
client_type='brpc',
concurrency=1,
timeout=-1, class CnnOp(Op):
retry=1, def init_op(self):
batch_size=1, pass
auto_batching_timeout=None)
cnn_op = Op(name="cnn",
input_ops=[read_op], bow_op = BowOp("bow", input_ops=[read_op])
server_endpoints=["127.0.0.1:9292"], cnn_op = CnnOp("cnn", input_ops=[read_op])
fetch_list=["prediction"], combine_op = CombineOp("combine", input_ops=[bow_op, cnn_op])
client_config="imdb_cnn_client_conf/serving_client_conf.prototxt",
client_type='brpc',
concurrency=1,
timeout=-1,
retry=1,
batch_size=1,
auto_batching_timeout=None)
combine_op = CombineOp(
name="combine",
input_ops=[bow_op, cnn_op],
concurrency=1,
timeout=-1,
retry=1,
batch_size=2,
auto_batching_timeout=None)
# fetch output of bow_op # fetch output of bow_op
# response_op = ImdbResponseOp(input_ops=[bow_op]) #response_op = ImdbResponseOp(input_ops=[bow_op])
# fetch output of combine_op # fetch output of combine_op
response_op = ImdbResponseOp(input_ops=[combine_op]) response_op = ImdbResponseOp(input_ops=[combine_op])
# use default ResponseOp implementation # use default ResponseOp implementation
# response_op = ResponseOp(input_ops=[combine_op]) #response_op = ResponseOp(input_ops=[combine_op])
server = PipelineServer() server = PipelineServer()
server.set_response_op(response_op) server.set_response_op(response_op)
......
...@@ -28,31 +28,9 @@ python web_service.py &>log.txt & ...@@ -28,31 +28,9 @@ python web_service.py &>log.txt &
python pipeline_http_client.py python pipeline_http_client.py
``` ```
<!-- <!--
## More (PipelineServing) ## More (PipelineServing)
You can choose one of the following versions to start Service.
### Remote Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### Local Service Version
```
python local_service_pipeline_server.py &>pipeline.log &
```
### Hybrid Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## Client Prediction ## Client Prediction
### RPC ### RPC
......
...@@ -31,26 +31,6 @@ python pipeline_http_client.py ...@@ -31,26 +31,6 @@ python pipeline_http_client.py
<!-- <!--
## 其他 (PipelineServing) ## 其他 (PipelineServing)
你可以选择下面任意一种版本启动服务。
### 远程服务版本
```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### 本地服务版本
```
python local_service_pipeline_server.py &>pipeline.log &
```
### 混合服务版本
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## 启动客户端 ## 启动客户端
### RPC ### RPC
......
rpc_port: 18080 #rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
worker_num: 4 rpc_port: 18090
build_dag_each_worker: false
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 9999 http_port: 9999
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: false
dag: dag:
is_thread_op: false #op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
#重试次数
retry: 1 retry: 1
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile: false use_profile: false
op: op:
det: det:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 2 concurrency: 2
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf: local_service_conf:
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor client_type: local_predictor
#det模型路径
model_config: ocr_det_model model_config: ocr_det_model
devices: ""
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["concat_1.tmp_0"]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0"
rec: rec:
concurrency: 1 #并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 2
#超时时间, 单位ms
timeout: -1 timeout: -1
#Serving交互重试次数,默认不重试
retry: 1 retry: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf: local_service_conf:
#client类型,包括brpc, grpc和local_predictor。local_predictor不启动Serving服务,进程内预测
client_type: local_predictor client_type: local_predictor
#rec模型路径
model_config: ocr_rec_model model_config: ocr_rec_model
devices: ""
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
devices="0", # gpu0. defalut: "" (cpu)
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode
from paddle_serving_server.pipeline import LocalServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
print(self.im)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
print("image", det_img)
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
client_type="local_predictor",
local_service_handler=LocalServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
client_type="local_predictor",
local_service_handler=LocalServiceHandler(model_config="ocr_rec_model"),
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_server.pipeline import PipelineClient from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np import numpy as np
import requests import requests
import json import json
...@@ -20,7 +20,7 @@ import base64 ...@@ -20,7 +20,7 @@ import base64
import os import os
client = PipelineClient() client = PipelineClient()
client.connect(['127.0.0.1:18080']) client.connect(['127.0.0.1:18090'])
def cv2_to_base64(image): def cv2_to_base64(image):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
server_endpoints=["127.0.0.1:12000"],
fetch_list=["concat_1.tmp_0"],
client_config="ocr_det_client/serving_client_conf.prototxt",
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
try: try:
from paddle_serving_server.web_service import WebService, Op from paddle_serving_server.web_service import WebService, Op
except ImportError: except ImportError:
from paddle_serving_server.web_service import WebService, Op from paddle_serving_server_gpu.web_service import WebService, Op
import logging import logging
import numpy as np import numpy as np
import cv2 import cv2
...@@ -43,7 +43,7 @@ class DetOp(Op): ...@@ -43,7 +43,7 @@ class DetOp(Op):
"min_size": 3 "min_size": 3
}) })
def preprocess(self, input_dicts): def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8')) data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8) data = np.fromstring(data, np.uint8)
...@@ -52,9 +52,9 @@ class DetOp(Op): ...@@ -52,9 +52,9 @@ class DetOp(Op):
self.ori_h, self.ori_w, _ = self.im.shape self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im) det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape _, self.new_h, self.new_w = det_img.shape
return {"image": det_img[np.newaxis, :]} return {"image": det_img[np.newaxis, :].copy()}, False, None, ""
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict, log_id):
det_out = fetch_dict["concat_1.tmp_0"] det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [ ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
...@@ -63,7 +63,7 @@ class DetOp(Op): ...@@ -63,7 +63,7 @@ class DetOp(Op):
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im} out_dict = {"dt_boxes": dt_boxes, "image": self.im}
print("out dict", out_dict) print("out dict", out_dict)
return out_dict return out_dict, None, ""
class RecOp(Op): class RecOp(Op):
...@@ -72,7 +72,7 @@ class RecOp(Op): ...@@ -72,7 +72,7 @@ class RecOp(Op):
self.get_rotate_crop_image = GetRotateCropImage() self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes() self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts): def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
im = input_dict["image"] im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"] dt_boxes = input_dict["dt_boxes"]
...@@ -93,15 +93,15 @@ class RecOp(Op): ...@@ -93,15 +93,15 @@ class RecOp(Op):
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
imgs[id] = norm_img imgs[id] = norm_img
feed = {"image": imgs.copy()} feed = {"image": imgs.copy()}
return feed return feed, False, None, ""
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict, log_id):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = [] res_lst = []
for res in rec_res: for res in rec_res:
res_lst.append(res[0]) res_lst.append(res[0])
res = {"res": str(res_lst)} res = {"res": str(res_lst)}
return res return res, None, ""
class OcrService(WebService): class OcrService(WebService):
...@@ -112,5 +112,5 @@ class OcrService(WebService): ...@@ -112,5 +112,5 @@ class OcrService(WebService):
uci_service = OcrService(name="ocr") uci_service = OcrService(name="ocr")
uci_service.prepare_pipeline_config("brpc_config.yml") uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service() uci_service.run_service()
...@@ -15,5 +15,5 @@ python web_service.py &>log.txt & ...@@ -15,5 +15,5 @@ python web_service.py &>log.txt &
## Http test ## Http test
``` ```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' curl -X POST -k http://localhost:18082/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
``` ```
...@@ -15,5 +15,5 @@ python web_service.py &>log.txt & ...@@ -15,5 +15,5 @@ python web_service.py &>log.txt &
## 测试 ## 测试
``` ```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' curl -X POST -k http://localhost:18082/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
``` ```
worker_num: 4 #worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
http_port: 18080 ##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18082
dag: dag:
is_thread_op: false #op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op: op:
uci: uci:
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf: local_service_conf:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 2
#uci模型路径
model_config: uci_housing_model model_config: uci_housing_model
devices: "" # "0,1"
client_type: brpc #计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["price"]
...@@ -25,20 +25,25 @@ class UciOp(Op): ...@@ -25,20 +25,25 @@ class UciOp(Op):
def init_op(self): def init_op(self):
self.separator = "," self.separator = ","
def preprocess(self, input_dicts): def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
_LOGGER.info(input_dict) _LOGGER.error("UciOp::preprocess >>> log_id:{}, input:{}".format(
log_id, input_dict))
x_value = input_dict["x"] x_value = input_dict["x"]
proc_dict = {}
if isinstance(x_value, (str, unicode)): if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array( input_dict["x"] = np.array(
[float(x.strip()) [float(x.strip())
for x in x_value.split(self.separator)]).reshape(1, 13) for x in x_value.split(self.separator)]).reshape(1, 13)
return input_dict _LOGGER.error("input_dict:{}".format(input_dict))
def postprocess(self, input_dicts, fetch_dict): return input_dict, False, None, ""
# _LOGGER.info(fetch_dict)
def postprocess(self, input_dicts, fetch_dict, log_id):
_LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format(
log_id, fetch_dict))
fetch_dict["price"] = str(fetch_dict["price"][0][0]) fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict return fetch_dict, None, ""
class UciService(WebService): class UciService(WebService):
......
...@@ -32,6 +32,12 @@ logger.setLevel(logging.INFO) ...@@ -32,6 +32,12 @@ logger.setLevel(logging.INFO)
class LocalPredictor(object): class LocalPredictor(object):
"""
Prediction in the current process of the local environment, in process
call, Compared with RPC/HTTP, LocalPredictor has better performance,
because of no network and packaging load.
"""
def __init__(self): def __init__(self):
self.feed_names_ = [] self.feed_names_ = []
self.fetch_names_ = [] self.fetch_names_ = []
...@@ -42,13 +48,41 @@ class LocalPredictor(object): ...@@ -42,13 +48,41 @@ class LocalPredictor(object):
self.fetch_names_to_idx_ = {} self.fetch_names_to_idx_ = {}
self.fetch_names_to_type_ = {} self.fetch_names_to_type_ = {}
def load_model_config(self, model_path, gpu=False, profile=True, cpu_num=1): def load_model_config(self,
model_path,
use_gpu=False,
gpu_id=0,
use_profile=False,
thread_num=1,
mem_optim=True,
ir_optim=False,
use_trt=False,
use_feed_fetch_ops=False):
"""
Load model config and set the engine config for the paddle predictor
Args:
model_path: model config path.
use_gpu: calculating with gpu, False default.
gpu_id: gpu id, 0 default.
use_profile: use predictor profiles, False default.
thread_num: thread nums, default 1.
mem_optim: memory optimization, True default.
ir_optim: open calculation chart optimization, False default.
use_trt: use nvidia TensorRT optimization, False default
use_feed_fetch_ops: use feed/fetch ops, False default.
"""
client_config = "{}/serving_server_conf.prototxt".format(model_path) client_config = "{}/serving_server_conf.prototxt".format(model_path)
model_conf = m_config.GeneralModelConfig() model_conf = m_config.GeneralModelConfig()
f = open(client_config, 'r') f = open(client_config, 'r')
model_conf = google.protobuf.text_format.Merge( model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf) str(f.read()), model_conf)
config = AnalysisConfig(model_path) config = AnalysisConfig(model_path)
logger.info("load_model_config params: model_path:{}, use_gpu:{},\
gpu_id:{}, use_profile:{}, thread_num:{}, mem_optim:{}, ir_optim:{},\
use_trt:{}, use_feed_fetch_ops:{}".format(
model_path, use_gpu, gpu_id, use_profile, thread_num, mem_optim,
ir_optim, use_trt, use_feed_fetch_ops))
self.feed_names_ = [var.alias_name for var in model_conf.feed_var] self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var] self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
...@@ -64,19 +98,43 @@ class LocalPredictor(object): ...@@ -64,19 +98,43 @@ class LocalPredictor(object):
self.fetch_names_to_idx_[var.alias_name] = i self.fetch_names_to_idx_[var.alias_name] = i
self.fetch_names_to_type_[var.alias_name] = var.fetch_type self.fetch_names_to_type_[var.alias_name] = var.fetch_type
if not gpu: if use_profile:
config.disable_gpu()
else:
config.enable_use_gpu(100, 0)
if profile:
config.enable_profile() config.enable_profile()
if mem_optim:
config.enable_memory_optim()
config.switch_ir_optim(ir_optim)
config.set_cpu_math_library_num_threads(thread_num)
config.switch_use_feed_fetch_ops(use_feed_fetch_ops)
config.delete_pass("conv_transpose_eltwiseadd_bn_fuse_pass") config.delete_pass("conv_transpose_eltwiseadd_bn_fuse_pass")
config.set_cpu_math_library_num_threads(cpu_num)
config.switch_ir_optim(False) if not use_gpu:
config.switch_use_feed_fetch_ops(False) config.disable_gpu()
else:
config.enable_use_gpu(100, gpu_id)
if use_trt:
config.enable_tensorrt_engine(
workspace_size=1 << 20,
max_batch_size=32,
min_subgraph_size=3,
use_static=False,
use_calib_mode=False)
self.predictor = create_paddle_predictor(config) self.predictor = create_paddle_predictor(config)
def predict(self, feed=None, fetch=None, batch=False, log_id=0): def predict(self, feed=None, fetch=None, batch=False, log_id=0):
"""
Predict locally
Args:
feed: feed var
fetch: fetch var
batch: batch data or not, False default.If batch is False, a new
dimension is added to header of the shape[np.newaxis].
log_id: for logging
Returns:
fetch_map: dict
"""
if feed is None or fetch is None: if feed is None or fetch is None:
raise ValueError("You should specify feed and fetch for prediction") raise ValueError("You should specify feed and fetch for prediction")
fetch_list = [] fetch_list = []
......
...@@ -18,5 +18,5 @@ from .image_reader import RCNNPostprocess, SegPostprocess, PadStride, BlazeFaceP ...@@ -18,5 +18,5 @@ from .image_reader import RCNNPostprocess, SegPostprocess, PadStride, BlazeFaceP
from .image_reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes from .image_reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
from .lac_reader import LACReader from .lac_reader import LACReader
from .senta_reader import SentaReader from .senta_reader import SentaReader
from .imdb_reader import IMDBDataset #from .imdb_reader import IMDBDataset
from .ocr_reader import OCRReader from .ocr_reader import OCRReader
...@@ -22,18 +22,17 @@ import yaml ...@@ -22,18 +22,17 @@ import yaml
import copy import copy
import argparse import argparse
import logging import logging
import paddle.fluid as fluid
import json import json
FORMAT = '%(asctime)s-%(levelname)s: %(message)s' FORMAT = '%(asctime)s-%(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT) logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
precision_map = { #precision_map = {
'trt_int8': fluid.core.AnalysisConfig.Precision.Int8, # 'trt_int8': fluid.core.AnalysisConfig.Precision.Int8,
'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32, # 'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32,
'trt_fp16': fluid.core.AnalysisConfig.Precision.Half # 'trt_fp16': fluid.core.AnalysisConfig.Precision.Half
} #}
class Resize(object): class Resize(object):
......
...@@ -57,6 +57,9 @@ class ProductErrCode(enum.Enum): ...@@ -57,6 +57,9 @@ class ProductErrCode(enum.Enum):
class ChannelDataType(enum.Enum): class ChannelDataType(enum.Enum):
"""
Channel data type
"""
DICT = 0 DICT = 0
CHANNEL_NPDATA = 1 CHANNEL_NPDATA = 1
ERROR = 2 ERROR = 2
...@@ -211,9 +214,9 @@ class ChannelData(object): ...@@ -211,9 +214,9 @@ class ChannelData(object):
return 1 return 1
def __str__(self): def __str__(self):
return "type[{}], error_code[{}], data_id[{}], log_id[{}]".format( return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_data[{}]".format(
ChannelDataType(self.datatype).name, self.error_code, self.id, ChannelDataType(self.datatype).name, self.error_code, self.id,
self.log_id) self.log_id, str(self.dictdata))
class ProcessChannel(object): class ProcessChannel(object):
...@@ -310,8 +313,8 @@ class ProcessChannel(object): ...@@ -310,8 +313,8 @@ class ProcessChannel(object):
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("(data_id={} log_id={}) Op({}) Pushing data".format( self._log("(data_id={} log_id={}) Op({}) Enter channel::push".
channeldata.id, channeldata.log_id, op_name))) format(channeldata.id, channeldata.log_id, op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
......
...@@ -39,7 +39,23 @@ _LOGGER = logging.getLogger(__name__) ...@@ -39,7 +39,23 @@ _LOGGER = logging.getLogger(__name__)
class DAGExecutor(object): class DAGExecutor(object):
"""
DAG Executor, the service entrance of DAG.
"""
def __init__(self, response_op, server_conf, worker_idx): def __init__(self, response_op, server_conf, worker_idx):
"""
Initialize DAGExecutor.
Args:
response_op: Response OP
server_conf: server conf. config.yaml
worker_idx: DAGExecutor index, PipelineServer creates many
DAGExecutors when _build_dag_each_worker is true.
Returns:
None.
"""
build_dag_each_worker = server_conf["build_dag_each_worker"] build_dag_each_worker = server_conf["build_dag_each_worker"]
server_worker_num = server_conf["worker_num"] server_worker_num = server_conf["worker_num"]
dag_conf = server_conf["dag"] dag_conf = server_conf["dag"]
...@@ -76,7 +92,9 @@ class DAGExecutor(object): ...@@ -76,7 +92,9 @@ class DAGExecutor(object):
if self._tracer is not None: if self._tracer is not None:
self._tracer.start() self._tracer.start()
# generate id: data_id == request_id == log_id # generate id
# data_id: Server Unique ID, automatically generated by the framework
# log_id: Trace one product request, can be empty, not unique.
base_counter = 0 base_counter = 0
gen_id_step = 1 gen_id_step = 1
if build_dag_each_worker: if build_dag_each_worker:
...@@ -96,6 +114,15 @@ class DAGExecutor(object): ...@@ -96,6 +114,15 @@ class DAGExecutor(object):
self._client_profile_value = "1" self._client_profile_value = "1"
def start(self): def start(self):
"""
Starting one thread for receiving data from the last channel background.
Args:
None
Returns:
None
"""
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=DAGExecutor._recive_out_channel_func, args=(self, )) target=DAGExecutor._recive_out_channel_func, args=(self, ))
self._recive_func.daemon = True self._recive_func.daemon = True
...@@ -103,11 +130,30 @@ class DAGExecutor(object): ...@@ -103,11 +130,30 @@ class DAGExecutor(object):
_LOGGER.debug("[DAG Executor] Start recive thread") _LOGGER.debug("[DAG Executor] Start recive thread")
def stop(self): def stop(self):
"""
Stopping DAG
Args:
None
Returns:
None
"""
self._dag.stop() self._dag.stop()
self._dag.join() self._dag.join()
_LOGGER.info("[DAG Executor] Stop") _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self): def _get_next_data_id(self):
"""
Generate data_id incrementally and Uniquely
Args:
None
Returns:
data_id: uniq id
cond_v: condition variable
"""
data_id = self._id_generator.next() data_id = self._id_generator.next()
cond_v = threading.Condition() cond_v = threading.Condition()
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
...@@ -116,6 +162,15 @@ class DAGExecutor(object): ...@@ -116,6 +162,15 @@ class DAGExecutor(object):
return data_id, cond_v return data_id, cond_v
def _set_in_channel(self, in_channel): def _set_in_channel(self, in_channel):
"""
Set in_channel of DAG
Args:
in_channel: input channel of DAG
Returns:
None
"""
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor] Failed to set in_channel: " _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
"in_channel must be Channel type, but get {}". "in_channel must be Channel type, but get {}".
...@@ -123,8 +178,18 @@ class DAGExecutor(object): ...@@ -123,8 +178,18 @@ class DAGExecutor(object):
os._exit(-1) os._exit(-1)
in_channel.add_producer(self.name) in_channel.add_producer(self.name)
self._in_channel = in_channel self._in_channel = in_channel
_LOGGER.info("[DAG] set in channel succ, name [{}]".format(self.name))
def _set_out_channel(self, out_channel): def _set_out_channel(self, out_channel):
"""
Set out_channel of DAG
Args:
out_channel: output channel of DAG
Returns:
None
"""
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor] Failed to set out_channel: " _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
"must be Channel type, but get {}".format( "must be Channel type, but get {}".format(
...@@ -134,6 +199,17 @@ class DAGExecutor(object): ...@@ -134,6 +199,17 @@ class DAGExecutor(object):
self._out_channel = out_channel self._out_channel = out_channel
def _recive_out_channel_func(self): def _recive_out_channel_func(self):
"""
Receiving data from the output channel, and pushing data into
_fetch_buffer. Function _get_channeldata_from_fetch_buffer gets
data by retry time.
Args:
None
Returns:
None
"""
cv = None cv = None
while True: while True:
try: try:
...@@ -150,7 +226,6 @@ class DAGExecutor(object): ...@@ -150,7 +226,6 @@ class DAGExecutor(object):
self._fetch_buffer[data_id] = closed_errror_data self._fetch_buffer[data_id] = closed_errror_data
cv.notify_all() cv.notify_all()
break break
if len(channeldata_dict) != 1: if len(channeldata_dict) != 1:
_LOGGER.critical( _LOGGER.critical(
"[DAG Executor] Failed to fetch result: out_channel " "[DAG Executor] Failed to fetch result: out_channel "
...@@ -174,6 +249,16 @@ class DAGExecutor(object): ...@@ -174,6 +249,16 @@ class DAGExecutor(object):
cond_v.notify_all() cond_v.notify_all()
def _get_channeldata_from_fetch_buffer(self, data_id, cond_v): def _get_channeldata_from_fetch_buffer(self, data_id, cond_v):
"""
Getting the channel data from _fetch_buffer.
Args:
data_id: search key
cond_v: conditional variable
Returns:
ready_data: one channel data processed
"""
ready_data = None ready_data = None
with cond_v: with cond_v:
...@@ -190,10 +275,20 @@ class DAGExecutor(object): ...@@ -190,10 +275,20 @@ class DAGExecutor(object):
ready_data = self._fetch_buffer[data_id] ready_data = self._fetch_buffer[data_id]
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
self._fetch_buffer.pop(data_id) self._fetch_buffer.pop(data_id)
_LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id)) _LOGGER.debug("(data_id={}) [resp thread] Got data".format(data_id))
return ready_data return ready_data
def _pack_channeldata(self, rpc_request, data_id): def _pack_channeldata(self, rpc_request, data_id):
"""
Unpacking data from RPC request. and creating one channelData.
Args:
rpc_request: one RPC request
data_id: data id, unique
Returns:
ChannelData: one channel data to be processed
"""
dictdata = None dictdata = None
log_id = None log_id = None
try: try:
...@@ -215,6 +310,8 @@ class DAGExecutor(object): ...@@ -215,6 +310,8 @@ class DAGExecutor(object):
# in rpc_request # in rpc_request
if prod_errcode is not None: if prod_errcode is not None:
# product errors occured # product errors occured
_LOGGER.error("unpack_rpc_func prod_errcode:{}".format(
prod_errcode))
return ChannelData( return ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value, error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="", error_info="",
...@@ -226,8 +323,6 @@ class DAGExecutor(object): ...@@ -226,8 +323,6 @@ class DAGExecutor(object):
profile_value = None profile_value = None
profile_value = dictdata.get(self._client_profile_key) profile_value = dictdata.get(self._client_profile_key)
client_need_profile = (profile_value == self._client_profile_value) client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile))
return ChannelData( return ChannelData(
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
dictdata=dictdata, dictdata=dictdata,
...@@ -236,11 +331,26 @@ class DAGExecutor(object): ...@@ -236,11 +331,26 @@ class DAGExecutor(object):
client_need_profile=client_need_profile) client_need_profile=client_need_profile)
def call(self, rpc_request): def call(self, rpc_request):
"""
DAGExcutor enterance function. There are 5 steps:
1._get_next_data_id: Generate an incremental ID
2._pack_channeldata: pack the channel data from request.
3.retry loop:
a. push channel_data into _in_channel
b. get_channeldata_from_fetch_buffer: get results.
4._pack_for_rpc_resp: pack RPC responses
5.profile: generte profile string and pack into response.
Args:
rpc_request: one RPC request
Returns:
rpc_resp: one RPC response
"""
if self._tracer is not None: if self._tracer is not None:
trace_buffer = self._tracer.data_buffer() trace_buffer = self._tracer.data_buffer()
data_id, cond_v = self._get_next_data_id() data_id, cond_v = self._get_next_data_id()
_LOGGER.info("(logid={}) Succ generate id".format(data_id))
start_call, end_call = None, None start_call, end_call = None, None
if not self._is_thread_op: if not self._is_thread_op:
...@@ -249,19 +359,33 @@ class DAGExecutor(object): ...@@ -249,19 +359,33 @@ class DAGExecutor(object):
else: else:
start_call = self._profiler.record("call_{}#DAG_0".format(data_id)) start_call = self._profiler.record("call_{}#DAG_0".format(data_id))
_LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id) req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
log_id = req_channeldata.log_id
_LOGGER.info("(data_id={} log_id={}) Succ Generate ID ".format(data_id,
log_id))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug("(logid={}) Pushing data into Graph engine".format( _LOGGER.debug("(data_id={}) Pushing data into Graph engine".format(
data_id)) data_id))
try: try:
if req_channeldata is None:
_LOGGER.critical(
"(data_id={} log_id={}) req_channeldata is None"
.format(data_id, log_id))
if not isinstance(self._in_channel,
(ThreadChannel, ProcessChannel)):
_LOGGER.critical(
"(data_id={} log_id={})[DAG Executor] Failed to "
"set in_channel: in_channel must be Channel type, but get {}".
format(data_id, log_id, type(self._in_channel)))
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] Stop") _LOGGER.error("(data_id:{} log_id={})[DAG Executor] Stop".
format(data_id, log_id))
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
...@@ -270,24 +394,29 @@ class DAGExecutor(object): ...@@ -270,24 +394,29 @@ class DAGExecutor(object):
error_info="dag closed.", error_info="dag closed.",
data_id=data_id)) data_id=data_id))
_LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id)) _LOGGER.debug("(data_id={} log_id={}) Wait for Graph engine...".
format(data_id, log_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v) cond_v)
if resp_channeldata.error_code == ChannelDataErrcode.OK.value: if resp_channeldata.error_code == ChannelDataErrcode.OK.value:
_LOGGER.info("(logid={}) Succ predict".format(data_id)) _LOGGER.info("(data_id={} log_id={}) Succ predict".format(
data_id, log_id))
break break
else: else:
_LOGGER.error("(logid={}) Failed to predict: {}" _LOGGER.error("(data_id={} log_id={}) Failed to predict: {}"
.format(data_id, resp_channeldata.error_info)) .format(data_id, log_id,
resp_channeldata.error_info))
if resp_channeldata.error_code != ChannelDataErrcode.TIMEOUT.value: if resp_channeldata.error_code != ChannelDataErrcode.TIMEOUT.value:
break break
if i + 1 < self._retry: if i + 1 < self._retry:
_LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format( _LOGGER.warning(
data_id, i + 1, self._retry)) "(data_id={} log_id={}) DAGExecutor retry({}/{})"
.format(data_id, log_id, i + 1, self._retry))
_LOGGER.debug("(logid={}) Packing RPC response package".format(data_id)) _LOGGER.debug("(data_id={} log_id={}) Packing RPC response package"
.format(data_id, log_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
...@@ -323,6 +452,15 @@ class DAGExecutor(object): ...@@ -323,6 +452,15 @@ class DAGExecutor(object):
return rpc_resp return rpc_resp
def _pack_for_rpc_resp(self, channeldata): def _pack_for_rpc_resp(self, channeldata):
"""
Packing one RPC response
Args:
channeldata: one channel data to be packed
Returns:
resp: one RPC response
"""
try: try:
return self._pack_rpc_func(channeldata) return self._pack_rpc_func(channeldata)
except Exception as e: except Exception as e:
...@@ -333,11 +471,14 @@ class DAGExecutor(object): ...@@ -333,11 +471,14 @@ class DAGExecutor(object):
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.err_no = ChannelDataErrcode.RPC_PACKAGE_ERROR.value resp.err_no = ChannelDataErrcode.RPC_PACKAGE_ERROR.value
resp.err_msg = "rpc package error: {}".format(e) resp.err_msg = "rpc package error: {}".format(e)
resp.result = ""
return resp return resp
class DAG(object): class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def __init__(self, request_name, response_op, use_profile, is_thread_op, def __init__(self, request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer): channel_size, build_dag_each_worker, tracer):
self._request_name = request_name self._request_name = request_name
...@@ -353,6 +494,18 @@ class DAG(object): ...@@ -353,6 +494,18 @@ class DAG(object):
@staticmethod @staticmethod
def get_use_ops(response_op): def get_use_ops(response_op):
"""
Starting from ResponseOp, recursively traverse the front OPs. Getting
all used ops and the post op list of each op (excluding ResponseOp)
Args:
response_op: ResponseOp
Returns:
used_ops: used ops, set
succ_ops_of_use_op: op and the next op list, dict.
"""
unique_names = set() unique_names = set()
used_ops = set() used_ops = set()
succ_ops_of_use_op = {} # {op_name: succ_ops} succ_ops_of_use_op = {} # {op_name: succ_ops}
...@@ -378,6 +531,15 @@ class DAG(object): ...@@ -378,6 +531,15 @@ class DAG(object):
return used_ops, succ_ops_of_use_op return used_ops, succ_ops_of_use_op
def _gen_channel(self, name_gen): def _gen_channel(self, name_gen):
"""
Generate one ThreadChannel or ProcessChannel.
Args:
name_gen: channel name
Returns:
channel: one channel generated
"""
channel = None channel = None
if self._is_thread_op: if self._is_thread_op:
channel = ThreadChannel( channel = ThreadChannel(
...@@ -389,11 +551,37 @@ class DAG(object): ...@@ -389,11 +551,37 @@ class DAG(object):
return channel return channel
def _gen_virtual_op(self, name_gen): def _gen_virtual_op(self, name_gen):
"""
Generate one virtual Op
Args:
name_gen: Op name
Returns:
vir_op: one virtual Op object.
"""
vir_op = VirtualOp(name=name_gen.next()) vir_op = VirtualOp(name=name_gen.next())
_LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name)) _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
return vir_op return vir_op
def _topo_sort(self, used_ops, response_op, out_degree_ops): def _topo_sort(self, used_ops, response_op, out_degree_ops):
"""
Topological sort of DAG, creates inverted multi-layers views.
Args:
used_ops: op used in DAG
response_op: response op
out_degree_ops: Next op list for each op, dict. the output of
get_use_ops()
Returns:
dag_views: the inverted hierarchical topology list. examples:
DAG :[A -> B -> C -> E]
\-> D /
dag_views: [[E], [C, D], [B], [A]]
last_op:the last op front of ResponseOp
"""
out_degree_num = { out_degree_num = {
name: len(ops) name: len(ops)
for name, ops in out_degree_ops.items() for name, ops in out_degree_ops.items()
...@@ -437,6 +625,23 @@ class DAG(object): ...@@ -437,6 +625,23 @@ class DAG(object):
return dag_views, last_op return dag_views, last_op
def _build_dag(self, response_op): def _build_dag(self, response_op):
"""
Building DAG, the most important function in class DAG. Core steps:
1.get_use_ops: Getting used ops, and out degree op list for each op.
2._topo_sort: Topological sort creates inverted multi-layers views.
3.create channels and virtual ops.
Args:
response_op: ResponseOp
Returns:
actual_ops: all OPs used in DAG, including virtual OPs
channels: all channels used in DAG
input_channel: the channel of first OP
output_channel: the channel of last OP
pack_func: pack_response_package function of response_op
unpack_func: unpack_request_package function of request_op
"""
if response_op is None: if response_op is None:
_LOGGER.critical("Failed to build DAG: ResponseOp" _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.") " has not been set.")
...@@ -562,6 +767,18 @@ class DAG(object): ...@@ -562,6 +767,18 @@ class DAG(object):
return self._channels return self._channels
def build(self): def build(self):
"""
Interface for building one DAG outside.
Args:
None
Returns:
_input_channel: the channel of first OP
_output_channel: the channel of last OP
_pack_func: pack_response_package function of response_op
_unpack_func: unpack_request_package function of request_op
"""
(actual_ops, channels, input_channel, output_channel, pack_func, (actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) = self._build_dag(self._response_op) unpack_func) = self._build_dag(self._response_op)
_LOGGER.info("[DAG] Succ build DAG") _LOGGER.info("[DAG] Succ build DAG")
...@@ -579,6 +796,15 @@ class DAG(object): ...@@ -579,6 +796,15 @@ class DAG(object):
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
def start(self): def start(self):
"""
Each OP starts a thread or process by _is_thread_op
Args:
None
Returns:
_threads_or_proces: threads or process list.
"""
self._threads_or_proces = [] self._threads_or_proces = []
for op in self._actual_ops: for op in self._actual_ops:
op.use_profiler(self._use_profile) op.use_profiler(self._use_profile)
...@@ -593,11 +819,29 @@ class DAG(object): ...@@ -593,11 +819,29 @@ class DAG(object):
return self._threads_or_proces return self._threads_or_proces
def join(self): def join(self):
"""
All threads or processes join.
Args:
None
Returns:
None
"""
for x in self._threads_or_proces: for x in self._threads_or_proces:
if x is not None: if x is not None:
x.join() x.join()
def stop(self): def stop(self):
"""
Stopping and cleanning all channels.
Args:
None
Returns:
None
"""
for chl in self._channels: for chl in self._channels:
chl.stop() chl.stop()
for op in self._actual_ops: for op in self._actual_ops:
......
...@@ -21,19 +21,17 @@ import "google/api/annotations.proto"; ...@@ -21,19 +21,17 @@ import "google/api/annotations.proto";
message Response { message Response {
int32 err_no = 1; int32 err_no = 1;
string err_msg = 2; string err_msg = 2;
string result = 3; repeated string key = 3;
repeated string value = 4;
}; };
message Request { message Request {
string name = 1; repeated string key = 1;
string method = 2; repeated string value = 2;
string appid = 3; string name = 3;
int64 logid = 4; string method = 4;
string format = 5; int64 logid = 5;
string from = 6; string clientip = 6;
string cmdid = 7;
string clientip = 8;
string data = 9;
}; };
service PipelineService { service PipelineService {
......
...@@ -15,111 +15,203 @@ ...@@ -15,111 +15,203 @@
import os import os
import logging import logging
import multiprocessing import multiprocessing
try: #from paddle_serving_server_gpu import OpMaker, OpSeqMaker
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server #from paddle_serving_server_gpu import Server as GpuServer
PACKAGE_VERSION = "GPU" #from paddle_serving_server import Server as CpuServer
except ImportError:
from paddle_serving_server import OpMaker, OpSeqMaker, Server
PACKAGE_VERSION = "CPU"
from . import util from . import util
from paddle_serving_app.local_predict import LocalPredictor #from paddle_serving_app.local_predict import LocalPredictor
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_workdir_name_gen = util.NameGenerator("workdir_") _workdir_name_gen = util.NameGenerator("workdir_")
class LocalServiceHandler(object): class LocalServiceHandler(object):
"""
LocalServiceHandler is the processor of the local service, contains
three client types, brpc, grpc and local_predictor.If you use the
brpc or grpc, serveing startup ability is provided.If you use
local_predictor, local predict ability is provided by paddle_serving_app.
"""
def __init__(self, def __init__(self,
model_config, model_config,
client_type='local_predictor', client_type='local_predictor',
workdir="", workdir="",
thread_num=2, thread_num=2,
devices="", devices="",
fetch_names=None,
mem_optim=True, mem_optim=True,
ir_optim=False, ir_optim=False,
available_port_generator=None): available_port_generator=None,
use_trt=False,
use_profile=False):
"""
Initialization of localservicehandler
Args:
model_config: model config path
client_type: brpc, grpc and local_predictor[default]
workdir: work directory
thread_num: number of threads, concurrent quantity.
devices: gpu id list[gpu], "" default[cpu]
fetch_names: get fetch names out of LocalServiceHandler in
local_predictor mode. fetch_names_ is compatible for Client().
mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default.
available_port_generator: generate available ports
use_trt: use nvidia tensorRt engine, False default.
use_profile: use profiling, False default.
Returns:
None
"""
if available_port_generator is None: if available_port_generator is None:
available_port_generator = util.GetAvailablePortGenerator() available_port_generator = util.GetAvailablePortGenerator()
self._model_config = model_config self._model_config = model_config
self._port_list = [] self._port_list = []
self._device_type = "cpu"
if devices == "": if devices == "":
# cpu # cpu
devices = [-1] devices = [-1]
self._device_type = "cpu"
self._port_list.append(available_port_generator.next()) self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in cpu device. Port({})" _LOGGER.info("Model({}) will be launch in cpu device. Port({})"
.format(model_config, self._port_list)) .format(model_config, self._port_list))
else: else:
# gpu # gpu
if PACKAGE_VERSION == "CPU": self._device_type = "gpu"
raise ValueError(
"You are using the CPU version package("
"paddle-serving-server), unable to set devices")
devices = [int(x) for x in devices.split(",")] devices = [int(x) for x in devices.split(",")]
for _ in devices: for _ in devices:
self._port_list.append(available_port_generator.next()) self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})" _LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})"
.format(model_config, devices, self._port_list)) .format(model_config, devices, self._port_list))
self.client_type = client_type self._client_type = client_type
self._workdir = workdir self._workdir = workdir
self._devices = devices self._devices = devices
self._thread_num = thread_num self._thread_num = thread_num
self._mem_optim = mem_optim self._mem_optim = mem_optim
self._ir_optim = ir_optim self._ir_optim = ir_optim
self.local_predictor_client = None self._local_predictor_client = None
self._rpc_service_list = [] self._rpc_service_list = []
self._server_pros = [] self._server_pros = []
self._fetch_vars = None self._use_trt = use_trt
self._use_profile = use_profile
self.fetch_names_ = fetch_names
def get_fetch_list(self): def get_fetch_list(self):
return self._fetch_vars return self.fetch_names_
def get_port_list(self): def get_port_list(self):
return self._port_list return self._port_list
def get_client(self): # for local_predictor_only def get_client(self):
if self.local_predictor_client is None: """
self.local_predictor_client = LocalPredictor() Function get_client is only used for local predictor case, creates one
self.local_predictor_client.load_model_config( LocalPredictor object, and initializes the paddle predictor by function
"{}".format(self._model_config), gpu=False, profile=False) load_model_config.
return self.local_predictor_client
Args:
None
Returns:
_local_predictor_client
"""
from paddle_serving_app.local_predict import LocalPredictor
if self._local_predictor_client is None:
self._local_predictor_client = LocalPredictor()
use_gpu = False
if self._device_type == "gpu":
use_gpu = True
self._local_predictor_client.load_model_config(
model_path=self._model_config,
use_gpu=use_gpu,
gpu_id=self._devices[0],
use_profile=self._use_profile,
thread_num=self._thread_num,
mem_optim=self._mem_optim,
ir_optim=self._ir_optim,
use_trt=self._use_trt)
return self._local_predictor_client
def get_client_config(self): def get_client_config(self):
return os.path.join(self._model_config, "serving_server_conf.prototxt") return os.path.join(self._model_config, "serving_server_conf.prototxt")
def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim, def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
ir_optim): ir_optim):
device = "gpu" """
if gpuid == -1: According to _device_type, generating one CpuServer or GpuServer, and
device = "cpu" setting the model config amd startup params.
op_maker = OpMaker()
read_op = op_maker.create('general_reader') Args:
general_infer_op = op_maker.create('general_infer') workdir: work directory
general_response_op = op_maker.create('general_response') port: network port
gpuid: gpu id
op_seq_maker = OpSeqMaker() thread_num: thread num
op_seq_maker.add_op(read_op) mem_optim: use memory/graphics memory optimization
op_seq_maker.add_op(general_infer_op) ir_optim: use calculation chart optimization
op_seq_maker.add_op(general_response_op)
Returns:
server = Server() server: CpuServer/GpuServer
"""
if self._device_type == "cpu":
from paddle_serving_server import OpMaker, OpSeqMaker, Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
else:
#gpu
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
if gpuid >= 0:
server.set_gpuid(gpuid)
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num) server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim) server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim) server.set_ir_optimize(ir_optim)
server.load_model_config(self._model_config) server.load_model_config(self._model_config)
if gpuid >= 0: server.prepare_server(
server.set_gpuid(gpuid) workdir=workdir, port=port, device=self._device_type)
server.prepare_server(workdir=workdir, port=port, device=device) if self.fetch_names_ is None:
if self._fetch_vars is None: self.fetch_names_ = server.get_fetch_list()
self._fetch_vars = server.get_fetch_list()
return server return server
def _start_one_server(self, service_idx): def _start_one_server(self, service_idx):
"""
Start one server
Args:
service_idx: server index
Returns:
None
"""
self._rpc_service_list[service_idx].run_server() self._rpc_service_list[service_idx].run_server()
def prepare_server(self): def prepare_server(self):
"""
Prepare all servers to be started, and append them into list.
"""
for i, device_id in enumerate(self._devices): for i, device_id in enumerate(self._devices):
if self._workdir != "": if self._workdir != "":
workdir = "{}_{}".format(self._workdir, i) workdir = "{}_{}".format(self._workdir, i)
...@@ -135,6 +227,9 @@ class LocalServiceHandler(object): ...@@ -135,6 +227,9 @@ class LocalServiceHandler(object):
ir_optim=self._ir_optim)) ir_optim=self._ir_optim))
def start_server(self): def start_server(self):
"""
Start multiple processes and start one server in each process
"""
for i, service in enumerate(self._rpc_service_list): for i, service in enumerate(self._rpc_service_list):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._start_one_server, args=(i, )) target=self._start_one_server, args=(i, ))
......
...@@ -89,6 +89,18 @@ class Op(object): ...@@ -89,6 +89,18 @@ class Op(object):
self._succ_close_op = False self._succ_close_op = False
def init_from_dict(self, conf): def init_from_dict(self, conf):
"""
Initializing one Op from config.yaml. If server_endpoints exist,
which is remote RPC mode, otherwise it is local RPC mode. There
are three types of predictios in local RPC mode, brpc, grpc and
local_predictor.
Args:
conf: config.yaml
Returns:
None
"""
# init op # init op
if self.concurrency is None: if self.concurrency is None:
self.concurrency = conf["concurrency"] self.concurrency = conf["concurrency"]
...@@ -119,34 +131,46 @@ class Op(object): ...@@ -119,34 +131,46 @@ class Op(object):
else: else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
self.model_config = None
self.workdir = None
self.thread_num = self.concurrency
self.devices = ""
self.mem_optim = False
self.ir_optim = False
if self._server_endpoints is None: if self._server_endpoints is None:
server_endpoints = conf.get("server_endpoints", []) server_endpoints = conf.get("server_endpoints", [])
if len(server_endpoints) != 0: if len(server_endpoints) != 0:
# remote service # remote service
self.with_serving = True self.with_serving = True
self._server_endpoints = server_endpoints self._server_endpoints = server_endpoints
self.client_type = conf["client_type"]
else: else:
if self._local_service_handler is None: if self._local_service_handler is None:
local_service_conf = conf.get("local_service_conf") local_service_conf = conf.get("local_service_conf")
_LOGGER.info("local_service_conf: {}".format( _LOGGER.info("local_service_conf: {}".format(
local_service_conf)) local_service_conf))
model_config = local_service_conf.get("model_config") self.model_config = local_service_conf.get("model_config")
self.client_type = local_service_conf.get("client_type") self.client_type = local_service_conf.get("client_type")
_LOGGER.info("model_config: {}".format(model_config)) self.workdir = local_service_conf.get("workdir")
if model_config is None: self.thread_num = local_service_conf.get("thread_num")
self.devices = local_service_conf.get("devices")
self.mem_optim = local_service_conf.get("mem_optim")
self.ir_optim = local_service_conf.get("ir_optim")
self._fetch_names = local_service_conf.get("fetch_list")
if self.model_config is None:
self.with_serving = False self.with_serving = False
else: else:
# local rpc service # local rpc service
self.with_serving = True self.with_serving = True
if self.client_type == "brpc" or self.client_type == "grpc": if self.client_type == "brpc" or self.client_type == "grpc":
service_handler = local_service_handler.LocalServiceHandler( service_handler = local_service_handler.LocalServiceHandler(
model_config=model_config, model_config=self.model_config,
client_type=self.client_type, client_type=self.client_type,
workdir=local_service_conf["workdir"], workdir=self.workdir,
thread_num=local_service_conf["thread_num"], thread_num=self.thread_num,
devices=local_service_conf["devices"], devices=self.devices,
mem_optim=local_service_conf["mem_optim"], mem_optim=self.mem_optim,
ir_optim=local_service_conf["ir_optim"]) ir_optim=self.ir_optim)
service_handler.prepare_server() # get fetch_list service_handler.prepare_server() # get fetch_list
serivce_ports = service_handler.get_port_list() serivce_ports = service_handler.get_port_list()
self._server_endpoints = [ self._server_endpoints = [
...@@ -160,19 +184,15 @@ class Op(object): ...@@ -160,19 +184,15 @@ class Op(object):
) )
elif self.client_type == "local_predictor": elif self.client_type == "local_predictor":
service_handler = local_service_handler.LocalServiceHandler( service_handler = local_service_handler.LocalServiceHandler(
model_config=model_config, model_config=self.model_config,
client_type=self.client_type, client_type=self.client_type,
workdir=local_service_conf["workdir"], workdir=self.workdir,
thread_num=local_service_conf["thread_num"], thread_num=self.thread_num,
devices=local_service_conf["devices"]) devices=self.devices,
#service_handler.prepare_server() # get fetch_list fetch_names=self._fetch_names)
self.local_predictor = service_handler.get_client()
if self._client_config is None: if self._client_config is None:
self._client_config = service_handler.get_client_config( self._client_config = service_handler.get_client_config(
) )
if self._fetch_names is None:
self._fetch_names = service_handler.get_fetch_list(
)
self._local_service_handler = service_handler self._local_service_handler = service_handler
else: else:
self.with_serving = True self.with_serving = True
...@@ -209,6 +229,15 @@ class Op(object): ...@@ -209,6 +229,15 @@ class Op(object):
self._batch_size, self._auto_batching_timeout))) self._batch_size, self._auto_batching_timeout)))
def launch_local_rpc_service(self): def launch_local_rpc_service(self):
"""
Launching multiple local rpc servers.
Args:
None
Returns:
None
"""
if self._local_service_handler is None: if self._local_service_handler is None:
_LOGGER.warning( _LOGGER.warning(
self._log("Failed to launch local rpc" self._log("Failed to launch local rpc"
...@@ -223,6 +252,15 @@ class Op(object): ...@@ -223,6 +252,15 @@ class Op(object):
.format(self.name, port)) .format(self.name, port))
def use_default_auto_batching_config(self): def use_default_auto_batching_config(self):
"""
Set the auto batching config default.
Args:
None
Returns:
None
"""
if self._batch_size != 1: if self._batch_size != 1:
_LOGGER.warning("Op({}) reset batch_size=1 (original: {})" _LOGGER.warning("Op({}) reset batch_size=1 (original: {})"
.format(self.name, self._batch_size)) .format(self.name, self._batch_size))
...@@ -240,6 +278,18 @@ class Op(object): ...@@ -240,6 +278,18 @@ class Op(object):
self._tracer = tracer self._tracer = tracer
def init_client(self, client_config, server_endpoints): def init_client(self, client_config, server_endpoints):
"""
Initialize the client object. There are three types of clients, brpc,
grpc and local_predictor. In grpc or brpc mode, the client connects
endpoints.
Args:
client_config: client config info
server_endpoints: server IP/Port list.
Returns:
client: client object.
"""
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) has no client (and it also do not " _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function)".format(self.name)) "run the process function)".format(self.name))
...@@ -267,6 +317,16 @@ class Op(object): ...@@ -267,6 +317,16 @@ class Op(object):
return self._input_ops return self._input_ops
def set_input_ops(self, ops): def set_input_ops(self, ops):
"""
Set input ops.Each op have many input ops, but only one input
channel.
Args:
ops: op list
Returns:
None.
"""
if not isinstance(ops, list): if not isinstance(ops, list):
ops = [] if ops is None else [ops] ops = [] if ops is None else [ops]
self._input_ops = [] self._input_ops = []
...@@ -279,6 +339,10 @@ class Op(object): ...@@ -279,6 +339,10 @@ class Op(object):
self._input_ops.append(op) self._input_ops.append(op)
def add_input_channel(self, channel): def add_input_channel(self, channel):
"""
Adding one input channel to the Op. Each op have many front op,
but, only one input channel.
"""
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("Failed to set input_channel: input " self._log("Failed to set input_channel: input "
...@@ -295,6 +359,16 @@ class Op(object): ...@@ -295,6 +359,16 @@ class Op(object):
return self._input return self._input
def add_output_channel(self, channel): def add_output_channel(self, channel):
"""
Adding one output channel to the Op. Each op have many output channels,
But only one front channel.
Args:
channel: an output channel object.
Returns:
None
"""
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("Failed to add output_channel: output channel " self._log("Failed to add output_channel: output channel "
...@@ -309,15 +383,15 @@ class Op(object): ...@@ -309,15 +383,15 @@ class Op(object):
def _get_output_channels(self): def _get_output_channels(self):
return self._outputs return self._outputs
def preprocess(self, input_dicts, data_id, log_id): def preprocess(self, input_dicts, data_id=0, log_id=0):
""" """
In preprocess stage, assembling data for process stage. users can In preprocess stage, assembling data for process stage. users can
override this function for model feed features. override this function for model feed features.
Args: Args:
input_dicts: input data to be preprocessed input_dicts: input data to be preprocessed
data_id: inner unique id data_id: inner unique id, 0 default
log_id: global unique id for RTT log_id: global unique id for RTT, 0 default
Return: Return:
input_dict: data for process stage input_dict: data for process stage
...@@ -337,13 +411,14 @@ class Op(object): ...@@ -337,13 +411,14 @@ class Op(object):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict, False, None, "" return input_dict, False, None, ""
def process(self, feed_batch, typical_logid): def process(self, feed_batch, typical_logid=0):
""" """
In process stage, send requests to the inference server or predict locally. In process stage, send requests to the inference server or predict locally.
users do not need to inherit this function users do not need to inherit this function
Args: Args:
feed_batch: data to be fed to inference server feed_batch: data to be fed to inference server
typical_logid: mark batch predicts typical_logid: mark batch predicts, usually the first logid in batch,
0 default.
Returns: Returns:
call_result: predict result call_result: predict result
...@@ -372,13 +447,13 @@ class Op(object): ...@@ -372,13 +447,13 @@ class Op(object):
call_result.pop("serving_status_code") call_result.pop("serving_status_code")
return call_result return call_result
def postprocess(self, input_dict, fetch_dict, log_id): def postprocess(self, input_dict, fetch_dict, log_id=0):
""" """
In postprocess stage, assemble data for next op or output. In postprocess stage, assemble data for next op or output.
Args: Args:
input_dict: data returned in preprocess stage. input_dict: data returned in preprocess stage.
fetch_dict: data returned in process stage. fetch_dict: data returned in process stage.
log_id: logid log_id: logid, 0 default
Returns: Returns:
fetch_dict: return fetch_dict default fetch_dict: return fetch_dict default
...@@ -455,6 +530,16 @@ class Op(object): ...@@ -455,6 +530,16 @@ class Op(object):
channel.push(data, name) channel.push(data, name)
def start_with_process(self): def start_with_process(self):
"""
Each OP creates a process to run the main loop, initializes the CUDA
environment in each individual process.
Args:
None
Returns:
process array
"""
trace_buffer = None trace_buffer = None
if self._tracer is not None: if self._tracer is not None:
trace_buffer = self._tracer.data_buffer() trace_buffer = self._tracer.data_buffer()
...@@ -463,22 +548,42 @@ class Op(object): ...@@ -463,22 +548,42 @@ class Op(object):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), False, trace_buffer)) self._get_output_channels(), False, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim))
p.daemon = True p.daemon = True
p.start() p.start()
process.append(p) process.append(p)
return process return process
def start_with_thread(self): def start_with_thread(self):
"""
Each OP creates a thread to run the main loop, initializes the CUDA
environment in the main thread.
Args:
None
Returns:
thread array
"""
trace_buffer = None trace_buffer = None
if self._tracer is not None: if self._tracer is not None:
trace_buffer = self._tracer.data_buffer() trace_buffer = self._tracer.data_buffer()
#Init cuda env in main thread
if self.client_type == "local_predictor":
_LOGGER.info("Init cuda env in main thread")
self.local_predictor = self._local_service_handler.get_client()
threads = [] threads = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
t = threading.Thread( t = threading.Thread(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), True, trace_buffer)) self._get_output_channels(), True, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim))
# When a process exits, it attempts to terminate # When a process exits, it attempts to terminate
# all of its daemonic child processes. # all of its daemonic child processes.
t.daemon = True t.daemon = True
...@@ -803,6 +908,22 @@ class Op(object): ...@@ -803,6 +908,22 @@ class Op(object):
def _auto_batching_generator(self, input_channel, op_name, batch_size, def _auto_batching_generator(self, input_channel, op_name, batch_size,
timeout, op_info_prefix): timeout, op_info_prefix):
"""
Merge batch_size requests for one prediction.Taking one piece of data
from the input channel each time until equals batch_size, or the waiting
time exceeds auto_batching_timeout.
Args:
input_channel: the input channel of Op
op_name: op name
batch_size: batch size, Less than worker_num
timeout: batch timeout, seconds, If timeout is None, and the quantity
taken from the front is less than batch_size, blocking occured.
op_info_prefix: op link info.
Returns:
None
"""
while True: while True:
batch = [] batch = []
while len(batch) == 0: while len(batch) == 0:
...@@ -823,6 +944,9 @@ class Op(object): ...@@ -823,6 +944,9 @@ class Op(object):
else: else:
channeldata_dict = input_channel.front(op_name) channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict) batch.append(channeldata_dict)
_LOGGER.debug(
"_auto_batching_generator get {} channeldata from op:{} into batch, batch_size:{}".
format(idx, op_name, batch_size))
except ChannelTimeoutError: except ChannelTimeoutError:
_LOGGER.debug("{} Failed to generate batch: " _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix)) "timeout".format(op_info_prefix))
...@@ -866,14 +990,54 @@ class Op(object): ...@@ -866,14 +990,54 @@ class Op(object):
return parsed_data_dict, need_profile_dict, profile_dict, logid_dict return parsed_data_dict, need_profile_dict, profile_dict, logid_dict
def _run(self, concurrency_idx, input_channel, output_channels, def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer): is_thread_op, trace_buffer, model_config, workdir, thread_num,
devices, mem_optim, ir_optim):
"""
_run() is the entry function of OP process / thread model.When client
type is local_predictor in process mode, the CUDA environment needs to
be initialized by LocalServiceHandler[child process], otherwise, Cuda
error(3), initialization error is occured. Preprocess, process and
postprocess are executed in the main loop. The preprocess and postprocess
function is usually rewrited by users. Trace data is recorded by trace_que.
Args:
concurrency_idx: thread/process index
input_channel: input channel, take the data to be processed
output_channels: output channel, store processed data
is_thread_op: False, It's process op; True, It's thread op
trace_buffer: store trace infomations
model_config: model config path
workdir: work directory
thread_num: number of threads, concurrent quantity
devices: gpu id list[gpu], "" default[cpu]
mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default.
Returns:
None
"""
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
tid = threading.current_thread().ident tid = threading.current_thread().ident
# init op # init ops
profiler = None profiler = None
try: try:
if is_thread_op == False and self.client_type == "local_predictor":
self.service_handler = local_service_handler.LocalServiceHandler(
model_config=model_config,
client_type="local_predictor",
workdir=workdir,
thread_num=thread_num,
devices=devices,
mem_optim=mem_optim,
ir_optim=ir_optim)
_LOGGER.info("Init cuda env in process {}".format(
concurrency_idx))
self.local_predictor = self.service_handler.get_client()
# check all ops initialized successfully.
profiler = self._initialize(is_thread_op, concurrency_idx) profiler = self._initialize(is_thread_op, concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.critical( _LOGGER.critical(
"{} failed to init op: {}".format(op_info_prefix, e), "{} failed to init op: {}".format(op_info_prefix, e),
...@@ -1014,6 +1178,19 @@ class Op(object): ...@@ -1014,6 +1178,19 @@ class Op(object):
break break
def _initialize(self, is_thread_op, concurrency_idx): def _initialize(self, is_thread_op, concurrency_idx):
"""
Initialize one OP object in the target function of a thread or porcess.
Initialize the client object with _client_config and _server_endpoints.
Create a TimeProfiler per thread or process for recording profiler info.
Args:
is_thread_op: True, one op runs in one thread; False, one op runs
in one process.
concurrency_idx: process id, Thread mode does not use this param.
Returns:
TimeProfiler
"""
if is_thread_op: if is_thread_op:
with self._for_init_op_lock: with self._for_init_op_lock:
if not self._succ_init_op: if not self._succ_init_op:
...@@ -1053,9 +1230,17 @@ class Op(object): ...@@ -1053,9 +1230,17 @@ class Op(object):
class RequestOp(Op): class RequestOp(Op):
""" RequestOp do not run preprocess, process, postprocess. """ """
RequestOp is a special Op, for unpacking one request package. If the
request needs one special unpackaging method, you need to inherit class
RequestOp and rewrite function unpack_request_package.Notice!!! Class
RequestOp does not run preprocess, process, postprocess.
"""
def __init__(self): def __init__(self):
"""
Initialize the RequestOp
"""
# PipelineService.name = "@DAGExecutor" # PipelineService.name = "@DAGExecutor"
super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
# init op # init op
...@@ -1084,25 +1269,36 @@ class RequestOp(Op): ...@@ -1084,25 +1269,36 @@ class RequestOp(Op):
if request is None: if request is None:
_LOGGER.critical("request is None") _LOGGER.critical("request is None")
raise ValueError("request is None") raise ValueError("request is None")
_LOGGER.info("unpack_request_package reqeust:{}".format(request))
dict_data["name"] = request.name for idx, key in enumerate(request.key):
dict_data["method"] = request.method data = request.value[idx]
dict_data["appid"] = request.appid try:
dict_data["format"] = request.format evaled_data = eval(data)
dict_data["from"] = getattr(request, "from") if isinstance(evaled_data, np.ndarray):
dict_data["cmdid"] = request.cmdid data = evaled_data
dict_data["clientip"] = request.clientip except Exception as e:
dict_data["data"] = request.data pass
dict_data[key] = data
log_id = request.logid log_id = request.logid
req_data = proto_data.SerializeToString() _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
request.method))
return dict_data, log_id, None, "" return dict_data, log_id, None, ""
class ResponseOp(Op): class ResponseOp(Op):
""" ResponseOp do not run preprocess, process, postprocess. """ """
ResponseOp is a special Op, for packing one response package. If the channeldata
needs a special packaging method, you need to inherit class ReponseOp and rewrite
pack_response_package function. Notice!!! Class ResponseOp does not run preprocess,
process, postprocess.
"""
def __init__(self, input_ops): def __init__(self, input_ops):
"""
Initialize the ResponseOp
"""
super(ResponseOp, self).__init__( super(ResponseOp, self).__init__(
name="@DAGExecutor", input_ops=input_ops) name="@DAGExecutor", input_ops=input_ops)
# init op # init op
...@@ -1115,23 +1311,28 @@ class ResponseOp(Op): ...@@ -1115,23 +1311,28 @@ class ResponseOp(Op):
def pack_response_package(self, channeldata): def pack_response_package(self, channeldata):
""" """
Getting channeldata from the last channel, pack custom results by json Getting channeldata from the last channel, packting the response
format and serialize by protobuf. package serialized by protobuf.
Args:
channeldata: Type ChannelData
Returns:
resp: pipeline_service_pb2.Response()
""" """
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
keys = []
values = []
error_code = channeldata.error_code error_code = channeldata.error_code
error_info = "" error_info = ""
if error_code == ChannelDataErrcode.OK.value: if error_code == ChannelDataErrcode.OK.value:
# Framework level errors
if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value: if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value:
feed = channeldata.parse() feed = channeldata.parse()
# ndarray to string: # ndarray to string:
# https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray # https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray
np.set_printoptions(threshold=sys.maxsize) np.set_printoptions(threshold=sys.maxsize)
for name, var in feed.items(): for name, var in feed.items():
values.append(var.__repr__()) resp.value.append(var.__repr__())
keys.append(name) resp.key.append(name)
elif channeldata.datatype == ChannelDataType.DICT.value: elif channeldata.datatype == ChannelDataType.DICT.value:
feed = channeldata.parse() feed = channeldata.parse()
for name, var in feed.items(): for name, var in feed.items():
...@@ -1144,8 +1345,8 @@ class ResponseOp(Op): ...@@ -1144,8 +1345,8 @@ class ResponseOp(Op):
"response package: {}".format( "response package: {}".format(
channeldata.id, resp.error_info)) channeldata.id, resp.error_info))
break break
values.append(var) resp.value.append(var)
keys.append(name) resp.key.append(name)
else: else:
error_code = ChannelDataErrcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = self._log("error type({}) in datatype.".format( error_info = self._log("error type({}) in datatype.".format(
...@@ -1153,6 +1354,7 @@ class ResponseOp(Op): ...@@ -1153,6 +1354,7 @@ class ResponseOp(Op):
_LOGGER.error("(logid={}) Failed to pack RPC response" _LOGGER.error("(logid={}) Failed to pack RPC response"
" package: {}".format(channeldata.id, error_info)) " package: {}".format(channeldata.id, error_info))
else: else:
# Product level errors
error_info = channeldata.error_info error_info = channeldata.error_info
if error_code == ChannelDataErrcode.PRODUCT_ERROR.value: if error_code == ChannelDataErrcode.PRODUCT_ERROR.value:
#rewrite error_code when product errors occured #rewrite error_code when product errors occured
...@@ -1160,21 +1362,32 @@ class ResponseOp(Op): ...@@ -1160,21 +1362,32 @@ class ResponseOp(Op):
error_info = channeldata.prod_error_info error_info = channeldata.prod_error_info
# pack results # pack results
result = {}
result["keys"] = keys
result["values"] = values
if error_code is None: if error_code is None:
error_code = 0 error_code = 0
#1.json encode
resp.err_no = error_code resp.err_no = error_code
resp.err_msg = error_info resp.err_msg = error_info
resp.result = base64.b64encode(json.dumps(result))
return resp return resp
class VirtualOp(Op): class VirtualOp(Op):
''' For connecting two channels. ''' """
To connect 2 ops across levels in dag view, we create virtual ops
between non-virtual ops, and transfer data only. For examples,
the pred ops of F are D & E.In the process of building DAG, we will
create channels layer by layer according to dag views.Op F is not
in the next layer view of [B, E], so we will create a virtual OP
'V1' whose pred OP is E. And so on, we create two virtual op 'V2'
and 'V3', Finally, we find the non-virtual op F. we create 4 channels
among E, V1, V2, V3 and F, the producer of V1, V2, V3 and F is E.
DAG: [A -> B -> C -> D -> F]
\-> E ----------/
DAG view: [[A], [B, E], [C], [D], [F]]
BUILD DAG: [A -> B -> C -> D -> E -> F]
\-> E -> V1-> V2-> V3/
"""
def __init__(self, name, concurrency=1): def __init__(self, name, concurrency=1):
super(VirtualOp, self).__init__( super(VirtualOp, self).__init__(
...@@ -1182,9 +1395,27 @@ class VirtualOp(Op): ...@@ -1182,9 +1395,27 @@ class VirtualOp(Op):
self._virtual_pred_ops = [] self._virtual_pred_ops = []
def add_virtual_pred_op(self, op): def add_virtual_pred_op(self, op):
"""
Add the front op of current vritual op.
Args:
op: one op object, may be a virtual op or not.
Returns:
None
"""
self._virtual_pred_ops.append(op) self._virtual_pred_ops.append(op)
def _actual_pred_op_names(self, op): def _actual_pred_op_names(self, op):
"""
Recursively find the front op which is a non-virtual op.
Args:
op: one op object
Returns:
names: the name of non-virtual pred ops.
"""
# can use disjoint-set, but it's not necessary # can use disjoint-set, but it's not necessary
if not isinstance(op, VirtualOp): if not isinstance(op, VirtualOp):
return [op.name] return [op.name]
...@@ -1194,6 +1425,15 @@ class VirtualOp(Op): ...@@ -1194,6 +1425,15 @@ class VirtualOp(Op):
return names return names
def add_output_channel(self, channel): def add_output_channel(self, channel):
"""
Adding the output channel of non-virtual pred ops.
Args:
channel: one channel.
Returns:
None.
"""
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("Failed to add output_channel: output_channel" self._log("Failed to add output_channel: output_channel"
...@@ -1207,6 +1447,20 @@ class VirtualOp(Op): ...@@ -1207,6 +1447,20 @@ class VirtualOp(Op):
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op): is_thread_op):
"""
The target function _run() only transfers data between OPs in one thread
or process.
Args:
concurrency_idx: process id, not avaliable in thread mode.
input_channel: input channel
output_channels: output channels
client_type: no use
is_thread_op: True, thread mode; False, process mode
Returns:
None
"""
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix) log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident tid = threading.current_thread().ident
......
...@@ -19,6 +19,7 @@ from numpy import * ...@@ -19,6 +19,7 @@ from numpy import *
import logging import logging
import functools import functools
import json import json
import socket
from .channel import ChannelDataErrcode from .channel import ChannelDataErrcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
...@@ -27,6 +28,10 @@ _LOGGER = logging.getLogger(__name__) ...@@ -27,6 +28,10 @@ _LOGGER = logging.getLogger(__name__)
class PipelineClient(object): class PipelineClient(object):
"""
PipelineClient provides the basic capabilities of the pipeline SDK
"""
def __init__(self): def __init__(self):
self._channel = None self._channel = None
self._profile_key = "pipeline.profile" self._profile_key = "pipeline.profile"
...@@ -43,29 +48,38 @@ class PipelineClient(object): ...@@ -43,29 +48,38 @@ class PipelineClient(object):
def _pack_request_package(self, feed_dict, profile): def _pack_request_package(self, feed_dict, profile):
req = pipeline_service_pb2.Request() req = pipeline_service_pb2.Request()
"""
logid = feed_dict.get("logid")
if logid is None:
req.logid = 0
else:
req.logid = long(logid)
feed_dict.pop("logid")
clientip = feed_dict.get("clientip")
if clientip is None:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
req.clientip = ip
else:
req.clientip = clientip
feed_dict.pop("clientip")
np.set_printoptions(threshold=sys.maxsize) np.set_printoptions(threshold=sys.maxsize)
new_dict = {}
for key, value in feed_dict.items(): for key, value in feed_dict.items():
req.key.append(key)
if isinstance(value, np.ndarray): if isinstance(value, np.ndarray):
new_dict[key] = value.__repr__() req.value.append(value.__repr__())
elif isinstance(value, (str, unicode)): elif isinstance(value, (str, unicode)):
new_dict[key] = value req.value.append(value)
elif isinstance(value, list): elif isinstance(value, list):
new_dict[key] = np.array(value).__repr__() req.value.append(np.array(value).__repr__())
else: else:
raise TypeError("only str and np.ndarray type is supported: {}". raise TypeError("only str and np.ndarray type is supported: {}".
format(type(value))) format(type(value)))
if profile: if profile:
new_dict[self._profile_key] = self._profile_value req.key.append(self._profile_key)
""" req.value.append(self._profile_value)
req.appid = feed_dict.get("appid")
req.logid = feed_dict.get("logid")
req.format = feed_dict.get("format")
setattr(req, "from", feed_dict.get("from"))
req.cmdid = feed_dict.get("cmdid")
req.clientip = feed_dict.get("clientip")
req.data = feed_dict.get("data")
return req return req
def _unpack_response_package(self, resp, fetch): def _unpack_response_package(self, resp, fetch):
......
...@@ -32,6 +32,10 @@ _LOGGER = logging.getLogger(__name__) ...@@ -32,6 +32,10 @@ _LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
"""
Pipeline Servicer entrance.
"""
def __init__(self, name, response_op, dag_conf, worker_idx=-1): def __init__(self, name, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
self._name = name self._name = name
...@@ -42,9 +46,12 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -42,9 +46,12 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context): def inference(self, request, context):
_LOGGER.info("inference request name:{} self.name:{}".format( _LOGGER.info("(log_id={}) inference request name:{} self.name:{}".
request.name, self._name)) format(request.logid, request.name, self._name))
if request.name != "" and request.name != self._name: if request.name != "" and request.name != self._name:
_LOGGER.error("(log_id={}) name dismatch error. request.name:{},"
"server.name={}".format(request.logid, request.name,
self._name))
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.err_no = channel.ChannelDataErrcode.NO_SERVICE.value resp.err_no = channel.ChannelDataErrcode.NO_SERVICE.value
resp.err_msg = "Failed to inference: Service name error." resp.err_msg = "Failed to inference: Service name error."
...@@ -56,7 +63,9 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -56,7 +63,9 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
@contextlib.contextmanager @contextlib.contextmanager
def _reserve_port(port): def _reserve_port(port):
"""Find and reserve a port for all subprocesses to use.""" """
Find and reserve a port for all subprocesses to use.
"""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
...@@ -69,6 +78,10 @@ def _reserve_port(port): ...@@ -69,6 +78,10 @@ def _reserve_port(port):
class PipelineServer(object): class PipelineServer(object):
"""
Pipeline Server : grpc gateway + grpc server.
"""
def __init__(self, name=None): def __init__(self, name=None):
self._name = name # for grpc-gateway path self._name = name # for grpc-gateway path
self._rpc_port = None self._rpc_port = None
...@@ -77,6 +90,16 @@ class PipelineServer(object): ...@@ -77,6 +90,16 @@ class PipelineServer(object):
self._proxy_server = None self._proxy_server = None
def _grpc_gateway(self, grpc_port, http_port): def _grpc_gateway(self, grpc_port, http_port):
"""
Running a gateway server, linking libproxy_server.so
Args:
grpc_port: GRPC port
http_port: HTTP port
Returns:
None
"""
import os import os
from ctypes import cdll from ctypes import cdll
from . import gateway from . import gateway
...@@ -86,6 +109,17 @@ class PipelineServer(object): ...@@ -86,6 +109,17 @@ class PipelineServer(object):
proxy_server.run_proxy_server(grpc_port, http_port) proxy_server.run_proxy_server(grpc_port, http_port)
def _run_grpc_gateway(self, grpc_port, http_port): def _run_grpc_gateway(self, grpc_port, http_port):
"""
Starting the GRPC gateway in a new process. Exposing one
available HTTP port outside, and reflecting the data to RPC port.
Args:
grpc_port: GRPC port
http_port: HTTP port
Returns:
None
"""
if http_port <= 0: if http_port <= 0:
_LOGGER.info("Ignore grpc_gateway configuration.") _LOGGER.info("Ignore grpc_gateway configuration.")
return return
...@@ -102,6 +136,15 @@ class PipelineServer(object): ...@@ -102,6 +136,15 @@ class PipelineServer(object):
self._proxy_server.start() self._proxy_server.start()
def set_response_op(self, response_op): def set_response_op(self, response_op):
"""
Set the response OP.
Args:
response_op: ResponseOp or its subclass object
Returns:
None
"""
if not isinstance(response_op, operator.ResponseOp): if not isinstance(response_op, operator.ResponseOp):
raise Exception("Failed to set response_op: response_op " raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.") "must be ResponseOp type.")
...@@ -112,6 +155,17 @@ class PipelineServer(object): ...@@ -112,6 +155,17 @@ class PipelineServer(object):
self._used_op, _ = dag.DAG.get_use_ops(self._response_op) self._used_op, _ = dag.DAG.get_use_ops(self._response_op)
def prepare_server(self, yml_file=None, yml_dict=None): def prepare_server(self, yml_file=None, yml_dict=None):
"""
Reading configures from the yml file(config.yaml), and launching
local services.
Args:
yml_file: Reading configures from yaml files
yml_dict: Reading configures from yaml dict.
Returns:
None
"""
conf = ServerYamlConfChecker.load_server_yaml_conf( conf = ServerYamlConfChecker.load_server_yaml_conf(
yml_file=yml_file, yml_dict=yml_dict) yml_file=yml_file, yml_dict=yml_dict)
...@@ -161,6 +215,15 @@ class PipelineServer(object): ...@@ -161,6 +215,15 @@ class PipelineServer(object):
self._start_local_rpc_service() self._start_local_rpc_service()
def _init_ops(self, op_conf): def _init_ops(self, op_conf):
"""
Initializing all OPs from dicetory.
Args:
op_conf: the op configures in yaml dict.
Returns:
None.
"""
default_conf = { default_conf = {
"concurrency": 1, "concurrency": 1,
"timeout": -1, "timeout": -1,
...@@ -190,6 +253,17 @@ class PipelineServer(object): ...@@ -190,6 +253,17 @@ class PipelineServer(object):
op.launch_local_rpc_service() op.launch_local_rpc_service()
def run_server(self): def run_server(self):
"""
If _build_dag_each_worker is True, Starting _worker_num processes and
running one GRPC server in each process. Otherwise, Staring one GRPC
server.
Args:
None
Returns:
None
"""
if self._build_dag_each_worker: if self._build_dag_each_worker:
with _reserve_port(self._rpc_port) as port: with _reserve_port(self._rpc_port) as port:
bind_address = 'localhost:{}'.format(port) bind_address = 'localhost:{}'.format(port)
...@@ -222,6 +296,15 @@ class PipelineServer(object): ...@@ -222,6 +296,15 @@ class PipelineServer(object):
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
"""
Running one GRPC server with PipelineServicer.
Args:
bind_address: binding IP/Port
response_op: ResponseOp or its subclass object
dag_conf: DAG config
worker_idx: Process index.
"""
options = [('grpc.so_reuseport', 1), options = [('grpc.so_reuseport', 1),
('grpc.max_send_message_length', 256 * 1024 * 1024), ('grpc.max_send_message_length', 256 * 1024 * 1024),
('grpc.max_send_message_length', 256 * 1024 * 1024)] ('grpc.max_send_message_length', 256 * 1024 * 1024)]
...@@ -237,6 +320,10 @@ class PipelineServer(object): ...@@ -237,6 +320,10 @@ class PipelineServer(object):
class ServerYamlConfChecker(object): class ServerYamlConfChecker(object):
"""
Checking validities of server yaml files.
"""
def __init__(self): def __init__(self):
pass pass
......
...@@ -16,21 +16,19 @@ syntax = "proto2"; ...@@ -16,21 +16,19 @@ syntax = "proto2";
package baidu.paddle_serving.pipeline_serving; package baidu.paddle_serving.pipeline_serving;
message Request { message Request {
optional string name = 1; repeated string key = 1;
optional string methond = 2; repeated string value = 2;
optional string appid = 3; optional string name = 3;
optional int64 logid = 4; optional string method = 4;
optional string format = 5; optional int64 logid = 5;
optional string from = 6; optional string clientip = 6;
optional string cmdid = 7;
optional string clientip = 8;
optional string data = 9;
}; };
message Response { message Response {
optional int32 err_no = 1; optional int32 err_no = 1;
optional string err_msg = 2; optional string err_msg = 2;
optional string result = 3; repeated string key = 3;
repeated string value = 4;
}; };
service PipelineService { service PipelineService {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册