提交 30f07d1a 编写于 作者: H HexToString

Merge branch 'v0.6.0' of https://github.com/PaddlePaddle/Serving into v0.6.0

......@@ -4,9 +4,9 @@ Paddle Serving在0.6.0版本开始支持在Kubenetes集群上部署,并提供
### 集群准备
如果您还没有Kubenetes集群,我们推荐[购买并使用百度智能云CCE集群](). 如果是其他云服务商提供的集群,或者自行安装Kubenetes集群,请遵照对应的教程。
如果您还没有Kubenetes集群,我们推荐[购买并使用百度智能云CCE集群](https://cloud.baidu.com/doc/CCE/index.html). 如果是其他云服务商提供的集群,或者自行安装Kubenetes集群,请遵照对应的教程。
您还需要准备一个用于Kubenetes集群部署使用的镜像仓库,通常与云服务提供商绑定,如果您使用的是百度智能云的CCE集群,可以参照[百度智能云CCR镜像仓库使用方式]()。当然Docker Hub也可以作为镜像仓库,但是可能在部署时会出现下载速度慢的情况。
您还需要准备一个用于Kubenetes集群部署使用的镜像仓库,通常与云服务提供商绑定,如果您使用的是百度智能云的CCE集群,可以参照[百度智能云CCR镜像仓库使用方式](https://cloud.baidu.com/doc/CCR/index.html)。当然Docker Hub也可以作为镜像仓库,但是可能在部署时会出现下载速度慢的情况。
### 环境准备
......
此差异已折叠。
此差异已折叠。
# 在Paddle Serving使用安全网关
## 简介
在之前的服务部署示例中,我们都从开发的角度切入,然而,在现实的生产环境中,仅仅提供一个能够预测的远端服务接口还远远不够。我们仍然要考虑以下不足。
- 这个服务还不能以网关的形式提供,访问路径难以管理。
- 这个服务接口不够安全,需要做相应的鉴权。
- 这个服务接口不能够控制流量,无法合理利用资源。
本文档的作用,就以 Uci 房价预测服务为例,来介绍如何强化预测服务API接口安全。API网关作为流量入口,对接口进行统一管理。但API网关可以提供流量加密和鉴权等安全功能。
## Docker部署
可以使用docker-compose来部署安全网关。这个示例的步骤就是 [部署本地Serving容器] - [部署本地安全网关] - [通过安全网关访问Serving]
**注明:** docker-compose与docker不一样,它依赖于docker,一次可以部署多个docker容器,可以类比于本地版的kubenetes,docker-compose的教程请参考[docker-compose安装](https://docs.docker.com/compose/install/)
```shell
docker-compose -f tools/auth/auth-serving-docker.yaml up -d
```
可以通过 `docker ps` 来查看启动的容器。
```shell
3035cf445029 pantsel/konga:next "/app/start.sh" About an hour ago Up About an hour 0.0.0.0:8005->1337/tcp anquan_konga_1
7ce3abee550c registry.baidubce.com/serving_gateway/kong:paddle "/docker-entrypoint.…" About an hour ago Up About an hour (healthy) 0.0.0.0:8000->8000/tcp, 127.0.0.1:8001->8001/tcp, 0.0.0.0:8443->8443/tcp, 127.0.0.1:8444->8444/tcp anquan_kong_1
25810fd79a27 postgres:9.6 "docker-entrypoint.s…" About an hour ago Up About an hour (healthy) 5432/tcp anquan_db_1
ee59a3dd4806 registry.baidubce.com/serving_dev/serving-runtime:cpu-py36 "bash -c ' wget --no…" About an hour ago Up About an hour 0.0.0.0:9393->9393/tcp anquan_serving_1
665fd8a34e15 redis:latest "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:6379->6379/tcp anquan_redis_1
```
其中我们之前serving容器 以 9393端口暴露,KONG网关的端口是8443, KONG的Web控制台的端口是8001。接下来我们在浏览器访问 `https://$IP_ADDR:8001`, 其中 IP_ADDR就是宿主机的IP。
<img src="kong-dashboard.png">
可以看到在注册结束后,登陆,看到了 DASHBOARD,我们先看SERVICES,可以看到`serving_service`,这意味着我们端口在9393的Serving服务已经在KONG当中被注册。
<img src="kong-services.png">
<img src="kong-routes.png">
然后在ROUTES中,我们可以看到 serving 被链接到了 `/serving-uci`
最后我们点击 CONSUMERS - default_user - Credentials - API KEYS ,我们可以看到 `Api Keys` 下看到很多key
<img src="kong-api_keys.png">
接下来可以通过curl访问
```shell
curl -H "Content-Type:application/json" -H "X-INSTANCE-ID:kong_ins" -H "apikey:hP6v25BQVS5CcS1nqKpxdrFkUxze9JWD" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' https://127.0.0.1:8443/serving-uci/uci/prediction -k
```
与之前的Serving HTTP服务相比,有以下区别。
- 使用https加密访问,而不是http
- 使用serving_uci的路径映射到网关
- 在header处增加了 `X-INSTANCE-ID``apikey`
## K8S部署
同样,我们也提供了K8S集群部署Serving安全网关的方式。
### Step 1:启动Serving服务
我们仍然以 [Uci房价预测](../python/examples/fit_a_line)服务作为例子,这里省略了镜像制作的过程,详情可以参考 [在Kubernetes集群上部署Paddle Serving](./PADDLE_SERVING_ON_KUBERNETES.md)
在这里我们直接执行
```
kubectl apply -f tools/auth/serving-demo-k8s.yaml
```
可以看到
### Step 2: 安装 KONG (一个集群只需要执行一次就可以)
接下来我们执行KONG Ingress的安装
```
kubectl apply -f tools/auth/kong-install.yaml
```
输出是
```
namespace/kong created
customresourcedefinition.apiextensions.k8s.io/kongclusterplugins.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongconsumers.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongingresses.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongplugins.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/tcpingresses.configuration.konghq.com created
serviceaccount/kong-serviceaccount created
clusterrole.rbac.authorization.k8s.io/kong-ingress-clusterrole created
clusterrolebinding.rbac.authorization.k8s.io/kong-ingress-clusterrole-nisa-binding created
service/kong-proxy created
service/kong-validation-webhook created
deployment.apps/ingress-kong created
```
我们可以输入
```
kubectl get service --all-namespaces
```
会显示
```
NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
default uci ClusterIP 172.16.87.89 <none> 9393/TCP 7d7h
kong kong-proxy NodePort 172.16.23.91 <none> 80:8175/TCP,443:8521/TCP 102m
kong kong-validation-webhook ClusterIP 172.16.114.93 <none> 443/TCP 102m
```
### Step 3: 创建Ingress资源
接下来需要做Serving服务和KONG的链接
```
kubectl apply -f tools/auth/kong-ingress-k8s.yaml
```
我们也给出yaml文件内容
```
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: demo
annotations:
konghq.com/strip-path: "true"
kubernetes.io/ingress.class: kong
spec:
rules:
- http:
paths:
- path: /foo
backend:
serviceName: {{SERVING_SERVICE_NAME}}
servicePort: {{SERVICE_PORT}}
```
其中serviceName就是uci,servicePort就是9393,如果是别的服务就需要改这两个字段,最终会映射到`/foo`下。
在这一步之后,我们就可以
```
curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' http://$IP:$PORT/foo/uci/prediction
```
### Step 4: 增加安全网关限制
之前的接口没有鉴权功能,无法验证用户身份合法性,现在我们添加一个key-auth插件
执行
```
kubectl apply -f key-auth-k8s.yaml
```
其中,yaml文内容为
```
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: key-auth
plugin: key-auth
```
现在,需要创建secret,key值为用户指定,需要在请求时携带Header中apikey字段
执行
```
kubectl create secret generic default-apikey \
--from-literal=kongCredType=key-auth \
--from-literal=key=ZGVmYXVsdC1hcGlrZXkK
```
在这里,我们的key是随意制定了一串 `ZGVmYXVsdC1hcGlrZXkK`,实际情况也可以
创建一个用户(consumer)标识访问者身份,并未该用户绑定apikey。
执行
```
kubectl apply -f kong-consumer-k8s.yaml
```
其中,yaml文内容为
```
apiVersion: configuration.konghq.com/v1
kind: KongConsumer
metadata:
name: default
annotations:
kubernetes.io/ingress.class: kong
username: default
credentials:
- default-apikey
```
如果我们这时还想再像上一步一样的做curl访问,会发现已经无法访问,此时已经具备了安全能力,我们需要对应的key。
### Step 5: 通过API Key访问服务
执行
```
curl -H "Content-Type:application/json" -H "apikey:ZGVmYXVsdC1hcGlrZXkK" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' https://$IP:$PORT/foo/uci/prediction -k
```
我们可以看到 apikey 已经加入到了curl请求的header当中。
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_model.sh
```
## Start server
```
python resnet50_web_service.py &>log.txt &
```
## RPC test
```
python pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_model.sh
```
## 启动服务
```
python resnet50_web_service.py &>log.txt &
```
## 测试
```
python pipeline_rpc_client.py
```
import sys
import os
import base64
import yaml
import requests
import time
import json
try:
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin)
del_list = []
for key in res["DAG"].keys():
if "call" in key:
del_list.append(key)
for key in del_list:
del res["DAG"][key]
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device, gpu_id):
fin = open("config.yml", "r")
config = yaml.load(fin)
fin.close()
config["dag"]["tracer"] = {"interval_s": 10}
if device == "gpu":
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 1
config["op"]["imagenet"]["local_service_conf"]["devices"] = gpu_id
else:
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 0
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = "http://127.0.0.1:18080/imagenet/prediction"
start = time.time()
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
keys, values = [], []
for i in range(batch_size):
keys.append("image_{}".format(i))
values.append(image)
data = {"key": keys, "value": values}
latency_list = []
start_time = time.time()
total_num = 0
while True:
l_start = time.time()
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_num += 1
if time.time() - start_time > 20:
break
end = time.time()
return [[end - start], latency_list, [total_num]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
start = time.time()
result = multi_thread_runner.run(run_http, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
def run_rpc(thread, batch_size):
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
start = time.time()
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
start_time = time.time()
while True:
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
if time.time() - start_time > 10:
break
end = time.time()
return [[end - start]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size)
if __name__ == "__main__":
if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3])
device = sys.argv[4]
if device == "gpu":
gpu_id = sys.argv[5]
else:
gpu_id = None
gen_yml(device, gpu_id)
elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3])
batch_size = int(sys.argv[4])
if mode == "http":
multithread_http(thread, batch_size)
elif mode == "rpc":
multithread_rpc(thread, batch_size)
elif sys.argv[1] == "dump":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.6"
modelname="clas-DarkNet53"
# HTTP
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do
for batch_size in 1
do
echo "----${modelname} thread num: ${thread_num} batch size: ${batch_size} mode:http ----" >>profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
# Start http client
python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
cuda_version: "10.1"
cudnn_version: "7.6"
trt_version: "6.0"
python_version: "3.7"
gcc_version: "8.2"
paddle_version: "2.0.1"
cpu: "Intel(R) Xeon(R) Gold 5117 CPU @ 2.00GHz X12"
gpu: "T4"
xpu: "None"
api: ""
owner: "cuicheng01"
model_name: "DarkNet53"
model_type: "static"
model_source: "PaddleClas"
model_url: "https://paddle-imagenet-models-name.bj.bcebos.com/DarkNet53_pretrained.tar"
batch_size: 1
num_of_samples: 1000
input_shape: "3,224,224"
runtime_device: "gpu"
ir_optim: true
enable_memory_optim: true
enable_tensorrt: false
precision: "fp32"
enable_mkldnn: false
cpu_math_library_num_threads: ""
cuda_version: "10.1"
cudnn_version: "7.6"
trt_version: "6.0"
python_version: "3.7"
gcc_version: "8.2"
paddle_version: "2.0.1"
cpu: "Intel(R) Xeon(R) Gold 5117 CPU @ 2.00GHz X12"
gpu: "T4"
xpu: "None"
api: ""
owner: "cuicheng01"
model_name: "imagenet"
model_type: "static"
model_source: "PaddleClas"
model_url: "model_url_path"
batch_size: 1
num_of_samples: 1000
input_shape: "3,224,224"
runtime_device: "cpu"
ir_optim: true
enable_memory_optim: true
enable_tensorrt: false
precision: "fp32"
enable_mkldnn: false
cpu_math_library_num_threads: ""
export FLAGS_profile_pipeline=1
alias python3="python3.7"
modelname="imagenet"
use_gpu=1
gpu_id="0"
benchmark_config_filename="benchmark_config.yaml"
# HTTP
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
if [ $use_gpu -eq 1 ]; then
python3 benchmark.py yaml local_predictor 1 gpu $gpu_id
else
python3 benchmark.py yaml local_predictor 1 cpu
fi
rm -rf profile_log_$modelname
for thread_num in 1
do
for batch_size in 1
do
echo "#----imagenet thread num: $thread_num batch size: $batch_size mode:http use_gpu:$use_gpu----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 resnet50_web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=${gpu_id} --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=${gpu_id} --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
python3 -m paddle_serving_server_gpu.profiler >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
ps -ef | grep nvidia-smi | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_MEM:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTIL:", max}' gpu_utilization.log >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
python3 -m paddle_serving_server_gpu.parse_profile --benchmark_cfg $benchmark_config_filename --benchmark_log profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
done
done
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18080
rpc_port: 9993
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
imagenet:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: DarkNet53/ppcls_model/
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
import psutil
cpu_utilization=psutil.cpu_percent(1,False)
print('CPU_UTILIZATION:', cpu_utilization)
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/model/DarkNet53.tar
tar -xf DarkNet53.tar
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/image_data.tar.gz
tar -xzvf image_data.tar.gz
import numpy as np
import requests
import json
import cv2
import base64
import os
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
if __name__ == "__main__":
url = "http://127.0.0.1:18080/imagenet/prediction"
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]}
for i in range(100):
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:9993'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
with open("daisy.jpg", 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["label", "prob"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import base64, cv2
class ImagenetOp(Op):
def init_op(self):
self.seq = Sequential([
Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
True)
])
self.label_dict = {}
label_idx = 0
with open("imagenet.label") as fin:
for line in fin:
self.label_dict[label_idx] = line.strip()
label_idx += 1
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
batch_size = len(input_dict.keys())
imgs = []
for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8'))
data = np.fromstring(data, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
img = self.seq(im)
imgs.append(img[np.newaxis, :].copy())
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
score = score.tolist()
max_score = max(score)
result["label"].append(self.label_dict[score.index(max_score)]
.strip().replace(",", ""))
result["prob"].append(max_score)
result["label"] = str(result["label"])
result["prob"] = str(result["prob"])
return result, None, ""
class ImageService(WebService):
def get_pipeline_response(self, read_op):
image_op = ImagenetOp(name="imagenet", input_ops=[read_op])
return image_op
uci_service = ImageService(name="imagenet")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_model.sh
```
## Start server
```
python resnet50_web_service.py &>log.txt &
```
## RPC test
```
python pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_model.sh
```
## 启动服务
```
python resnet50_web_service.py &>log.txt &
```
## 测试
```
python pipeline_rpc_client.py
```
import sys
import os
import base64
import yaml
import requests
import time
import json
try:
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin)
del_list = []
for key in res["DAG"].keys():
if "call" in key:
del_list.append(key)
for key in del_list:
del res["DAG"][key]
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device, gpu_id):
fin = open("config.yml", "r")
config = yaml.load(fin)
fin.close()
config["dag"]["tracer"] = {"interval_s": 10}
if device == "gpu":
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 1
config["op"]["imagenet"]["local_service_conf"]["devices"] = gpu_id
else:
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 0
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = "http://127.0.0.1:18080/imagenet/prediction"
start = time.time()
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
keys, values = [], []
for i in range(batch_size):
keys.append("image_{}".format(i))
values.append(image)
data = {"key": keys, "value": values}
latency_list = []
start_time = time.time()
total_num = 0
while True:
l_start = time.time()
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_num += 1
if time.time() - start_time > 20:
break
end = time.time()
return [[end - start], latency_list, [total_num]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
start = time.time()
result = multi_thread_runner.run(run_http, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
def run_rpc(thread, batch_size):
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
start = time.time()
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
start_time = time.time()
while True:
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
if time.time() - start_time > 10:
break
end = time.time()
return [[end - start]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size)
if __name__ == "__main__":
if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3])
device = sys.argv[4]
if device == "gpu":
gpu_id = sys.argv[5]
else:
gpu_id = None
gen_yml(device, gpu_id)
elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3])
batch_size = int(sys.argv[4])
if mode == "http":
multithread_http(thread, batch_size)
elif mode == "rpc":
multithread_rpc(thread, batch_size)
elif sys.argv[1] == "dump":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.6"
modelname="clas-HRNet_W18_C"
# HTTP
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do
for batch_size in 1
do
echo "----${modelname} thread num: ${thread_num} batch size: ${batch_size} mode:http ----" >>profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
# Start http client
python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
cuda_version: "10.1"
cudnn_version: "7.6"
trt_version: "6.0"
python_version: "3.7"
gcc_version: "8.2"
paddle_version: "2.0.1"
cpu: "Intel(R) Xeon(R) Gold 5117 CPU @ 2.00GHz X12"
gpu: "T4"
xpu: "None"
api: ""
owner: "cuicheng01"
model_name: "HRNet_W18_C"
model_type: "static"
model_source: "PaddleClas"
model_url: "https://paddle-imagenet-models-name.bj.bcebos.com/HRNet_W18_C_pretrained.tar"
batch_size: 1
num_of_samples: 1000
input_shape: "3,224,224"
runtime_device: "gpu"
ir_optim: true
enable_memory_optim: true
enable_tensorrt: false
precision: "fp32"
enable_mkldnn: false
cpu_math_library_num_threads: ""
cuda_version: "10.1"
cudnn_version: "7.6"
trt_version: "6.0"
python_version: "3.7"
gcc_version: "8.2"
paddle_version: "2.0.1"
cpu: "Intel(R) Xeon(R) Gold 5117 CPU @ 2.00GHz X12"
gpu: "T4"
xpu: "None"
api: ""
owner: "cuicheng01"
model_name: "imagenet"
model_type: "static"
model_source: "PaddleClas"
model_url: "model_url_path"
batch_size: 1
num_of_samples: 1000
input_shape: "3,224,224"
runtime_device: "cpu"
ir_optim: true
enable_memory_optim: true
enable_tensorrt: false
precision: "fp32"
enable_mkldnn: false
cpu_math_library_num_threads: ""
export FLAGS_profile_pipeline=1
alias python3="python3.7"
modelname="imagenet"
use_gpu=1
gpu_id="0"
benchmark_config_filename="benchmark_config.yaml"
# HTTP
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
if [ $use_gpu -eq 1 ]; then
python3 benchmark.py yaml local_predictor 1 gpu $gpu_id
else
python3 benchmark.py yaml local_predictor 1 cpu
fi
rm -rf profile_log_$modelname
for thread_num in 1
do
for batch_size in 1
do
echo "#----imagenet thread num: $thread_num batch size: $batch_size mode:http use_gpu:$use_gpu----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 resnet50_web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=${gpu_id} --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=${gpu_id} --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
python3 -m paddle_serving_server_gpu.profiler >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
ps -ef | grep nvidia-smi | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_MEM:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTIL:", max}' gpu_utilization.log >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
python3 -m paddle_serving_server_gpu.parse_profile --benchmark_cfg $benchmark_config_filename --benchmark_log profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
done
done
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18080
rpc_port: 9993
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
imagenet:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: HRNet_W18_C/ppcls_model/
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
import psutil
cpu_utilization=psutil.cpu_percent(1,False)
print('CPU_UTILIZATION:', cpu_utilization)
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/model/HRNet_W18_C.tar
tar -xf HRNet_W18_C.tar
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/image_data.tar.gz
tar -xzvf image_data.tar.gz
import numpy as np
import requests
import json
import cv2
import base64
import os
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
if __name__ == "__main__":
url = "http://127.0.0.1:18080/imagenet/prediction"
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]}
for i in range(100):
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:9993'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
with open("daisy.jpg", 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["label", "prob"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import base64, cv2
class ImagenetOp(Op):
def init_op(self):
self.seq = Sequential([
Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
True)
])
self.label_dict = {}
label_idx = 0
with open("imagenet.label") as fin:
for line in fin:
self.label_dict[label_idx] = line.strip()
label_idx += 1
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
batch_size = len(input_dict.keys())
imgs = []
for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8'))
data = np.fromstring(data, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
img = self.seq(im)
imgs.append(img[np.newaxis, :].copy())
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
score = score.tolist()
max_score = max(score)
result["label"].append(self.label_dict[score.index(max_score)]
.strip().replace(",", ""))
result["prob"].append(max_score)
result["label"] = str(result["label"])
result["prob"] = str(result["prob"])
return result, None, ""
class ImageService(WebService):
def get_pipeline_response(self, read_op):
image_op = ImagenetOp(name="imagenet", input_ops=[read_op])
return image_op
uci_service = ImageService(name="imagenet")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_model.sh
```
## Start server
```
python resnet50_web_service.py &>log.txt &
```
## RPC test
```
python pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_model.sh
```
## 启动服务
```
python resnet50_web_service.py &>log.txt &
```
## 测试
```
python pipeline_rpc_client.py
```
export FLAGS_profile_pipeline=1
alias python3="python3.6"
modelname="clas-MobileNetV1"
# HTTP
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do
for batch_size in 1
do
echo "----${modelname} thread num: ${thread_num} batch size: ${batch_size} mode:http ----" >>profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
# Start http client
python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
import psutil
cpu_utilization=psutil.cpu_percent(1,False)
print('CPU_UTILIZATION:', cpu_utilization)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册