提交 da2c133f 编写于 作者: J Jiawei Wang 提交者: wangjiawei04

Merge pull request #900 from wangjiawei04/imagenet_pipeline

Imagenet pipeline
上级 6026f6c7
...@@ -38,7 +38,8 @@ start = time.time() ...@@ -38,7 +38,8 @@ start = time.time()
image_file = "https://paddle-serving.bj.bcebos.com/imagenet-example/daisy.jpg" image_file = "https://paddle-serving.bj.bcebos.com/imagenet-example/daisy.jpg"
for i in range(10): for i in range(10):
img = seq(image_file) img = seq(image_file)
fetch_map = client.predict(feed={"image": img}, fetch=["score"]) fetch_map = client.predict(
feed={"image": img}, fetch=["score"], batch=False)
prob = max(fetch_map["score"][0]) prob = max(fetch_map["score"][0])
label = label_dict[fetch_map["score"][0].tolist().index(prob)].strip( label = label_dict[fetch_map["score"][0].tolist().index(prob)].strip(
).replace(",", "") ).replace(",", "")
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import sys import sys
from paddle_serving_client import Client from paddle_serving_client import Client
import numpy as np
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
if len(sys.argv) != 4: if len(sys.argv) != 4:
...@@ -44,12 +44,13 @@ class ImageService(WebService): ...@@ -44,12 +44,13 @@ class ImageService(WebService):
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
feed_batch = [] feed_batch = []
is_batch = True
for ins in feed: for ins in feed:
if "image" not in ins: if "image" not in ins:
raise ("feed data error!") raise ("feed data error!")
img = self.seq(ins["image"]) img = self.seq(ins["image"])
feed_batch.append({"image": img[np.newaxis, :]}) feed_batch.append({"image": img[np.newaxis, :]})
return feed_batch, fetch return feed_batch, fetch, is_batch
def postprocess(self, feed=[], fetch=[], fetch_map={}): def postprocess(self, feed=[], fetch=[], fetch_map={}):
score_list = fetch_map["score"] score_list = fetch_map["score"]
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
import sys import sys
from paddle_serving_app.reader import LACReader from paddle_serving_app.reader import LACReader
import numpy as np
class LACService(WebService): class LACService(WebService):
...@@ -23,13 +24,21 @@ class LACService(WebService): ...@@ -23,13 +24,21 @@ class LACService(WebService):
def preprocess(self, feed={}, fetch=[]): def preprocess(self, feed={}, fetch=[]):
feed_batch = [] feed_batch = []
fetch = ["crf_decode"]
lod_info = [0]
is_batch = True
for ins in feed: for ins in feed:
if "words" not in ins: if "words" not in ins:
raise ("feed data error!") raise ("feed data error!")
feed_data = self.reader.process(ins["words"]) feed_data = self.reader.process(ins["words"])
feed_batch.append({"words": feed_data}) feed_batch.append(np.array(feed_data).reshape(len(feed_data), 1))
fetch = ["crf_decode"] lod_info.append(lod_info[-1] + len(feed_data))
return feed_batch, fetch feed_dict = {
"words": np.concatenate(
feed_batch, axis=0),
"words.lod": lod_info
}
return feed_dict, fetch, is_batch
def postprocess(self, feed={}, fetch=[], fetch_map={}): def postprocess(self, feed={}, fetch=[], fetch_map={}):
batch_ret = [] batch_ret = []
......
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_model.sh
```
## Start server
```
python resnet50_web_service.py &>log.txt &
```
## RPC test
```
python pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python web_service.py &>log.txt &
```
## 测试
```
curl -X POST -k http://localhost:18082/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18082
rpc_port: 9999
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
imagenet:
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 2
#uci模型路径
model_config: ResNet50_vd_model
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["score"]
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/ResNet50_vd.tar.gz
tar -xzvf ResNet50_vd.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/image_data.tar.gz
tar -xzvf image_data.tar.gz
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:9999'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
with open("daisy.jpg", 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["label", "prob"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import base64, cv2
class ImagenetOp(Op):
def init_op(self):
self.seq = Sequential([
Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
True)
])
self.label_dict = {}
label_idx = 0
with open("imagenet.label") as fin:
for line in fin:
self.label_dict[label_idx] = line.strip()
label_idx += 1
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
img = self.seq(im)
return {"image": img[np.newaxis, :].copy()}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
print(fetch_dict)
score_list = fetch_dict["score"]
result = {"label": [], "prob": []}
for score in score_list:
score = score.tolist()
max_score = max(score)
result["label"].append(self.label_dict[score.index(max_score)]
.strip().replace(",", ""))
result["prob"].append(max_score)
result["label"] = str(result["label"])
result["prob"] = str(result["prob"])
return result, None, ""
class ImageService(WebService):
def get_pipeline_response(self, read_op):
image_op = ImagenetOp(name="imagenet", input_ops=[read_op])
return image_op
uci_service = ImageService(name="imagenet")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
...@@ -37,6 +37,7 @@ class SentaService(WebService): ...@@ -37,6 +37,7 @@ class SentaService(WebService):
#定义senta模型预测服务的预处理,调用顺序:lac reader->lac模型预测->预测结果后处理->senta reader #定义senta模型预测服务的预处理,调用顺序:lac reader->lac模型预测->预测结果后处理->senta reader
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
feed_batch = [] feed_batch = []
is_batch = True
words_lod = [0] words_lod = [0]
for ins in feed: for ins in feed:
if "words" not in ins: if "words" not in ins:
...@@ -64,14 +65,13 @@ class SentaService(WebService): ...@@ -64,14 +65,13 @@ class SentaService(WebService):
return { return {
"words": np.concatenate(feed_batch), "words": np.concatenate(feed_batch),
"words.lod": words_lod "words.lod": words_lod
}, fetch }, fetch, is_batch
senta_service = SentaService(name="senta") senta_service = SentaService(name="senta")
senta_service.load_model_config("senta_bilstm_model") senta_service.load_model_config("senta_bilstm_model")
senta_service.prepare_server(workdir="workdir") senta_service.prepare_server(workdir="workdir")
senta_service.init_lac_client( senta_service.init_lac_client(
lac_port=9300, lac_port=9300, lac_client_config="lac_model/serving_server_conf.prototxt")
lac_client_config="lac/lac_model/serving_server_conf.prototxt")
senta_service.run_rpc_service() senta_service.run_rpc_service()
senta_service.run_web_service() senta_service.run_web_service()
...@@ -1343,7 +1343,7 @@ class ResponseOp(Op): ...@@ -1343,7 +1343,7 @@ class ResponseOp(Op):
type(var))) type(var)))
_LOGGER.error("(logid={}) Failed to pack RPC " _LOGGER.error("(logid={}) Failed to pack RPC "
"response package: {}".format( "response package: {}".format(
channeldata.id, resp.error_info)) channeldata.id, resp.err_msg))
break break
resp.value.append(var) resp.value.append(var)
resp.key.append(name) resp.key.append(name)
......
...@@ -23,7 +23,7 @@ import socket ...@@ -23,7 +23,7 @@ import socket
from .channel import ChannelDataErrcode from .channel import ChannelDataErrcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
import six
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
...@@ -53,7 +53,10 @@ class PipelineClient(object): ...@@ -53,7 +53,10 @@ class PipelineClient(object):
if logid is None: if logid is None:
req.logid = 0 req.logid = 0
else: else:
if six.PY2:
req.logid = long(logid) req.logid = long(logid)
elif six.PY3:
req.logid = int(log_id)
feed_dict.pop("logid") feed_dict.pop("logid")
clientip = feed_dict.get("clientip") clientip = feed_dict.get("clientip")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册