未验证 提交 07c9de9c 编写于 作者: T TeslaZhao 提交者: GitHub

Merge branch 'develop' into Serving-mcl

......@@ -38,7 +38,8 @@ start = time.time()
image_file = "https://paddle-serving.bj.bcebos.com/imagenet-example/daisy.jpg"
for i in range(10):
img = seq(image_file)
fetch_map = client.predict(feed={"image": img}, fetch=["score"])
fetch_map = client.predict(
feed={"image": img}, fetch=["score"], batch=False)
prob = max(fetch_map["score"][0])
label = label_dict[fetch_map["score"][0].tolist().index(prob)].strip(
).replace(",", "")
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import sys
from paddle_serving_client import Client
import numpy as np
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
if len(sys.argv) != 4:
......@@ -44,12 +44,13 @@ class ImageService(WebService):
def preprocess(self, feed=[], fetch=[]):
feed_batch = []
is_batch = True
for ins in feed:
if "image" not in ins:
raise ("feed data error!")
img = self.seq(ins["image"])
feed_batch.append({"image": img[np.newaxis, :]})
return feed_batch, fetch
return feed_batch, fetch, is_batch
def postprocess(self, feed=[], fetch=[], fetch_map={}):
score_list = fetch_map["score"]
......
......@@ -15,6 +15,7 @@
from paddle_serving_server.web_service import WebService
import sys
from paddle_serving_app.reader import LACReader
import numpy as np
class LACService(WebService):
......@@ -23,13 +24,21 @@ class LACService(WebService):
def preprocess(self, feed={}, fetch=[]):
feed_batch = []
fetch = ["crf_decode"]
lod_info = [0]
is_batch = True
for ins in feed:
if "words" not in ins:
raise ("feed data error!")
feed_data = self.reader.process(ins["words"])
feed_batch.append({"words": feed_data})
fetch = ["crf_decode"]
return feed_batch, fetch
feed_batch.append(np.array(feed_data).reshape(len(feed_data), 1))
lod_info.append(lod_info[-1] + len(feed_data))
feed_dict = {
"words": np.concatenate(
feed_batch, axis=0),
"words.lod": lod_info
}
return feed_dict, fetch, is_batch
def postprocess(self, feed={}, fetch=[], fetch_map={}):
batch_ret = []
......
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_model.sh
```
## Start server
```
python resnet50_web_service.py &>log.txt &
```
## RPC test
```
python pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python web_service.py &>log.txt &
```
## 测试
```
curl -X POST -k http://localhost:18082/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18082
rpc_port: 9999
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
imagenet:
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 2
#uci模型路径
model_config: ResNet50_vd_model
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["score"]
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/ResNet50_vd.tar.gz
tar -xzvf ResNet50_vd.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/image_data.tar.gz
tar -xzvf image_data.tar.gz
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:9999'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
with open("daisy.jpg", 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["label", "prob"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import base64, cv2
class ImagenetOp(Op):
def init_op(self):
self.seq = Sequential([
Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
True)
])
self.label_dict = {}
label_idx = 0
with open("imagenet.label") as fin:
for line in fin:
self.label_dict[label_idx] = line.strip()
label_idx += 1
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
img = self.seq(im)
return {"image": img[np.newaxis, :].copy()}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
print(fetch_dict)
score_list = fetch_dict["score"]
result = {"label": [], "prob": []}
for score in score_list:
score = score.tolist()
max_score = max(score)
result["label"].append(self.label_dict[score.index(max_score)]
.strip().replace(",", ""))
result["prob"].append(max_score)
result["label"] = str(result["label"])
result["prob"] = str(result["prob"])
return result, None, ""
class ImageService(WebService):
def get_pipeline_response(self, read_op):
image_op = ImagenetOp(name="imagenet", input_ops=[read_op])
return image_op
uci_service = ImageService(name="imagenet")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
......@@ -37,6 +37,7 @@ class SentaService(WebService):
#定义senta模型预测服务的预处理,调用顺序:lac reader->lac模型预测->预测结果后处理->senta reader
def preprocess(self, feed=[], fetch=[]):
feed_batch = []
is_batch = True
words_lod = [0]
for ins in feed:
if "words" not in ins:
......@@ -64,14 +65,13 @@ class SentaService(WebService):
return {
"words": np.concatenate(feed_batch),
"words.lod": words_lod
}, fetch
}, fetch, is_batch
senta_service = SentaService(name="senta")
senta_service.load_model_config("senta_bilstm_model")
senta_service.prepare_server(workdir="workdir")
senta_service.init_lac_client(
lac_port=9300,
lac_client_config="lac/lac_model/serving_server_conf.prototxt")
lac_port=9300, lac_client_config="lac_model/serving_server_conf.prototxt")
senta_service.run_rpc_service()
senta_service.run_web_service()
#UNET_BENCHMARK 使用说明
## 功能
* benchmark测试
## 注意事项
* 示例图片(可以有多张)请放置于与img_data路径中,支持jpg,jpeg
* 图片张数应该大于等于并发数量
## TODO
* http benchmark
#!/bin/bash
python unet_benchmark.py --thread 1 --batch_size 1 --model ../unet_client/serving_client_conf.prototxt
# thread/batch can be modified as you wish
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
unet bench mark script
20201130 first edition by cg82616424
"""
from __future__ import unicode_literals, absolute_import
import os
import time
import json
import requests
from paddle_serving_client import Client
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
from paddle_serving_app.reader import Sequential, File2Image, Resize, Transpose, BGR2RGB, SegPostprocess
args = benchmark_args()
def get_img_names(path):
"""
Brief:
get img files(jpg) under this path
if any exception happened return None
Args:
path (string): image file path
Returns:
list: images names under this folder
"""
if not os.path.exists(path):
return None
if not os.path.isdir(path):
return None
list_name = []
for f_handler in os.listdir(path):
file_path = os.path.join(path, f_handler)
if os.path.isdir(file_path):
continue
else:
if not file_path.endswith(".jpeg") and not file_path.endswith(
".jpg"):
continue
list_name.append(file_path)
return list_name
def preprocess_img(img_list):
"""
Brief:
prepare img data for benchmark
Args:
img_list(list): list for img file path
Returns:
image content binary list after preprocess
"""
preprocess = Sequential([File2Image(), Resize((512, 512))])
result_list = []
for img in img_list:
img_tmp = preprocess(img)
result_list.append(img_tmp)
return result_list
def benckmark_worker(idx, resource):
"""
Brief:
benchmark single worker for unet
Args:
idx(int): worker idx ,use idx to select backend unet service
resource(dict): unet serving endpoint dict
Returns:
latency
TODO:
http benckmarks
"""
profile_flags = False
latency_flags = False
postprocess = SegPostprocess(2)
if os.getenv("FLAGS_profile_client"):
profile_flags = True
if os.getenv("FLAGS_serving_latency"):
latency_flags = True
latency_list = []
client_handler = Client()
client_handler.load_client_config(args.model)
client_handler.connect(
[resource["endpoint"][idx % len(resource["endpoint"])]])
start = time.time()
turns = resource["turns"]
img_list = resource["img_list"]
for i in range(turns):
if args.batch_size >= 1:
l_start = time.time()
feed_batch = []
b_start = time.time()
for bi in range(args.batch_size):
feed_batch.append({"image": img_list[bi]})
b_end = time.time()
if profile_flags:
sys.stderr.write(
"PROFILE\tpid:{}\tunt_pre_0:{} unet_pre_1:{}\n".format(
os.getpid(),
int(round(b_start * 1000000)),
int(round(b_end * 1000000))))
result = client_handler.predict(
feed={"image": img_list[bi]}, fetch=["output"])
#result["filename"] = "./img_data/N0060.jpg" % (os.getpid(), idx, time.time())
#postprocess(result) # if you want to measure post process time, you have to uncomment this line
l_end = time.time()
if latency_flags:
latency_list.append(l_end * 1000 - l_start * 1000)
else:
print("unsupport batch size {}".format(args.batch_size))
end = time.time()
if latency_flags:
return [[end - start], latency_list]
else:
return [[end - start]]
if __name__ == '__main__':
"""
usage:
"""
img_file_list = get_img_names("./img_data")
img_content_list = preprocess_img(img_file_list)
multi_thread_runner = MultiThreadRunner()
endpoint_list = ["127.0.0.1:9494"]
turns = 1
start = time.time()
result = multi_thread_runner.run(benckmark_worker, args.thread, {
"endpoint": endpoint_list,
"turns": turns,
"img_list": img_content_list
})
end = time.time()
total_cost = end - start
avg_cost = 0
for i in range(args.thread):
avg_cost += result[0][i]
avg_cost = avg_cost / args.thread
print("total cost: {}s".format(total_cost))
print("each thread cost: {}s. ".format(avg_cost))
print("qps: {}samples/s".format(args.batch_size * args.thread * turns /
total_cost))
if os.getenv("FLAGS_serving_latency"):
show_latency(result[1])
......@@ -52,6 +52,20 @@ class WebService(object):
def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config
import os
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
if os.path.isdir(model_config):
client_config = "{}/serving_server_conf.prototxt".format(
model_config)
elif os.path.isfile(path):
client_config = model_config
model_conf = m_config.GeneralModelConfig()
f = open(client_config, 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_names = [var.alias_name for var in model_conf.feed_var]
self.fetch_names = [var.alias_name for var in model_conf.fetch_var]
def _launch_rpc_service(self):
op_maker = OpMaker()
......@@ -179,10 +193,7 @@ class WebService(object):
def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=1)
self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
def get_app_instance(self):
return self.app_instance
......
......@@ -58,6 +58,20 @@ class WebService(object):
def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config
import os
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
if os.path.isdir(model_config):
client_config = "{}/serving_server_conf.prototxt".format(
model_config)
elif os.path.isfile(path):
client_config = model_config
model_conf = m_config.GeneralModelConfig()
f = open(client_config, 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_names = [var.alias_name for var in model_conf.feed_var]
self.fetch_names = [var.alias_name for var in model_conf.fetch_var]
def set_gpus(self, gpus):
print("This API will be deprecated later. Please do not use it")
......@@ -240,10 +254,7 @@ class WebService(object):
def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=4)
self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
def get_app_instance(self):
return self.app_instance
......
......@@ -1343,7 +1343,7 @@ class ResponseOp(Op):
type(var)))
_LOGGER.error("(logid={}) Failed to pack RPC "
"response package: {}".format(
channeldata.id, resp.error_info))
channeldata.id, resp.err_msg))
break
resp.value.append(var)
resp.key.append(name)
......
......@@ -23,7 +23,7 @@ import socket
from .channel import ChannelDataErrcode
from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc
import six
_LOGGER = logging.getLogger(__name__)
......@@ -53,7 +53,10 @@ class PipelineClient(object):
if logid is None:
req.logid = 0
else:
req.logid = long(logid)
if six.PY2:
req.logid = long(logid)
elif six.PY3:
req.logid = int(log_id)
feed_dict.pop("logid")
clientip = feed_dict.get("clientip")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册