未验证 提交 11116a32 编写于 作者: S ShiningZhang 提交者: GitHub

Merge branch 'develop' into develop

......@@ -123,3 +123,13 @@ python3 pipeline_rpc_client.py
```
{'err_no': 0, 'err_msg': '', 'key': ['res'], 'value': ["['土地整治与土壤修复研究中心', '华南农业大学1素图']"]}
```
<h3 align="center">关闭Serving/Pipeline服务</h3>
**方式一** :Ctrl+C关停服务
**方式二** :在启动Serving/Pipeline服务路径或者环境变量SERVING_HOME路径下(该路径下存在文件ProcessInfo.json)
```
python3 -m paddle_serving_server.serve stop
```
......@@ -94,3 +94,13 @@ output
```
{'err_no': 0, 'err_msg': '', 'key': ['res'], 'value': ["['土地整治与土壤修复研究中心', '华南农业大学1素图']"]}
```
<h3 align="center">Stop Serving/Pipeline service</h3>
**Method one** :Ctrl+C to quit
**Method Two** :In the path where starting the Serving/Pipeline service or the path which environment variable SERVING_HOME set (the file named ProcessInfo.json exists in this path)
```
python3 -m paddle_serving_server.serve stop
```
......@@ -59,7 +59,7 @@ fetch_var {
## C++ Serving
### 1.快速启动
### 1.快速启动与关闭
可以通过配置模型及端口号快速启动服务,启动命令如下:
......@@ -108,6 +108,11 @@ python3 -m paddle_serving_server.serve --model serving_model --thread 10 --port
```BASH
python3 -m paddle_serving_server.serve --model serving_model_1 serving_model_2 --thread 10 --port 9292
```
#### 当您想要关闭Serving服务时(在Serving启动目录或环境变量SERVING_HOME路径下,执行以下命令).
```BASH
python3 -m paddle_serving_server.serve stop
```
stop参数发送SIGINT至C++ Serving,若改成kill则发送SIGKILL信号至C++ Serving
### 2.自定义配置启动
......@@ -315,7 +320,20 @@ fetch_var {
```
## Python Pipeline
### 快速启动与关闭
Python Pipeline启动命令如下:
```BASH
python3 web_service.py
```
当您想要关闭Serving服务时(在Pipeline启动目录下或环境变量SERVING_HOME路径下,执行以下命令):
```BASH
python3 -m paddle_serving_server.serve stop
```
stop参数发送SIGINT至Pipeline Serving,若改成kill则发送SIGKILL信号至Pipeline Serving
### 配置文件
Python Pipeline提供了用户友好的多模型组合服务编程框架,适用于多模型组合应用的场景。
其配置文件为YAML格式,一般默认为config.yaml。示例如下:
```YAML
......
......@@ -108,7 +108,7 @@ python3 -m paddle_serving_server.serve --model serving_model --thread 10 --port
```BASH
python3 -m paddle_serving_server.serve --model serving_model_1 serving_model_2 --thread 10 --port 9292
```
#### Stop Serving.
#### Stop Serving(execute the following command in the directory where start serving or the path which environment variable SERVING_HOME set).
```BASH
python3 -m paddle_serving_server.serve stop
```
......@@ -328,9 +328,9 @@ fetch_var {
Example starting Pipeline Serving:
```BASH
python3 -m paddle_serving_server.serve --model serving_model --port 9393
python3 web_service.py
```
### Stop Serving.
### Stop Serving(execute the following command in the directory where start Pipeline serving or the path which environment variable SERVING_HOME set).
```BASH
python3 -m paddle_serving_server.serve stop
```
......
feed_var {
name: "x"
alias_name: "x"
is_lod_tensor: false
feed_type: 1
shape: 13
}
fetch_var {
name: "fc_0.tmp_1"
alias_name: "price"
is_lod_tensor: false
fetch_type: 1
shape: 1
}
feed_var {
name: "x"
alias_name: "x"
is_lod_tensor: false
feed_type: 1
shape: 13
}
fetch_var {
name: "fc_0.tmp_1"
alias_name: "price"
is_lod_tensor: false
fetch_type: 1
shape: 1
}
import pytest
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
This module is used to check whether the running environment for PaddleServing is configured correctly.
Two test cases are set for verifying the smooth of environment, fit a line test case for C++ Serving environment and uci for Pipeline Serving enviroment
Usage: export PYTHON_EXECUTABLE=/usr/local/bin/python3.6
python3.6 -m paddle_serving_server.serve check
'''
import sys
import os
cpp_test_cases = ["test_fit_a_line.py::TestFitALine::test_cpu", "test_fit_a_line.py::TestFitALine::test_gpu"]
pipeline_test_cases = ["test_uci_pipeline.py::TestUCIPipeline::test_cpu", "test_uci_pipeline.py::TestUCIPipeline::test_gpu"]
def run_test_cases(cases_list, case_type):
old_stdout, old_stderr = sys.stdout, sys.stderr
real_path = os.path.dirname(os.path.realpath(__file__))
for case in cases_list:
sys.stdout = open('/dev/null', 'w')
sys.stderr = open('/dev/null', 'w')
args_str = "--disable-warnings " + str(real_path) + "/" + case
args = args_str.split(" ")
res = pytest.main(args)
sys.stdout, sys.stderr = old_stdout, old_stderr
if res == 0:
print("{} {} environment running success".format(case_type, case[-3:]))
else:
print("{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/v0.7.0/doc/Install_CN.md to configure environment".format(case_type, case[-3:]))
def unset_proxy(key):
os.unsetenv(key)
def check_env():
if 'https_proxy' in os.environ or 'http_proxy' in os.environ:
unset_proxy("https_proxy")
unset_proxy("http_proxy")
run_test_cases(cpp_test_cases, "C++")
run_test_cases(pipeline_test_cases, "Pipeline")
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
rpc_port: 9998
http_port: 18082
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
#tracer
tracer:
interval_s: 10
op:
uci:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: uci_housing_model
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["price"]
#precsion, 预测精度,降低预测精度可提升预测速度
#GPU 支持: "fp32"(default), "fp16", "int8";
#CPU 支持: "fp32"(default), "fp16", "bf16"(mkldnn); 不支持: "int8"
precision: "fp32"
#ir_optim开关, 默认False
ir_optim: True
#use_mkldnn开关, 默认False, use_mkldnn与ir_optim同时打开才有性能提升
use_mkldnn: True
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
rpc_port: 9998
http_port: 18082
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
#tracer
tracer:
interval_s: 10
op:
uci:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: uci_housing_model
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["price"]
#precsion, 预测精度,降低预测精度可提升预测速度
#GPU 支持: "fp32"(default), "fp16", "int8";
#CPU 支持: "fp32"(default), "fp16", "bf16"(mkldnn); 不支持: "int8"
precision: "fp32"
#ir_optim开关, 默认False
ir_optim: True
#use_mkldnn开关, 默认False, use_mkldnn与ir_optim同时打开才有性能提升
use_mkldnn: True
feed_var {
name: "x"
alias_name: "x"
is_lod_tensor: false
feed_type: 1
shape: 13
}
fetch_var {
name: "fc_0.tmp_1"
alias_name: "price"
is_lod_tensor: false
fetch_type: 1
shape: 1
}
feed_var {
name: "x"
alias_name: "x"
is_lod_tensor: false
feed_type: 1
shape: 13
}
fetch_var {
name: "fc_0.tmp_1"
alias_name: "price"
is_lod_tensor: false
fetch_type: 1
shape: 1
}
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import sys
import argparse
_LOGGER = logging.getLogger()
def serve_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"config",
type=str,
default="config.yml",
nargs="?",
help="cpu or gpu config.yml")
return parser.parse_args()
class UciOp(Op):
def init_op(self):
self.separator = ","
self.batch_separator = ";"
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
_LOGGER.error("UciOp::preprocess >>> log_id:{}, input:{}".format(
log_id, input_dict))
x_value = input_dict["x"].split(self.batch_separator)
x_lst = []
for x_val in x_value:
x_lst.append(
np.array([
float(x.strip()) for x in x_val.split(self.separator)
]).reshape(1, 13))
input_dict["x"] = np.concatenate(x_lst, axis=0)
proc_dict = {}
return input_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
_LOGGER.info(
"UciOp::postprocess >>> data_id:{}, log_id:{}, fetch_dict:{}".
format(data_id, log_id, fetch_dict))
fetch_dict["price"] = str(fetch_dict["price"])
return fetch_dict, None, ""
class UciService(WebService):
def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
args = serve_args()
uci_service = UciService(name="uci")
uci_service.prepare_pipeline_config(args.config)
uci_service.run_service()
import os
import subprocess
import numpy as np
import copy
import cv2
import re
import sys
from paddle_serving_client import Client
from paddle_serving_client.httpclient import HttpClient
from paddle_serving_client.io import inference_model_to_serving
from paddle_serving_app.reader import SegPostprocess
from paddle_serving_app.reader import *
import paddle.inference as paddle_infer
from util import *
class TestFitALine(object):
def setup_class(self):
serving_util = ServingTest(data_path="fit_a_line", example_path="fit_a_line", model_dir="uci_housing_model",
client_dir="uci_housing_client")
serving_util.check_model_data_exist()
self.get_truth_val_by_inference(self)
self.serving_util = serving_util
def teardown_method(self):
print_log(["stderr.log", "stdout.log",
"log/serving.ERROR", "PipelineServingLogs/pipeline.log"], iden="after predict")
kill_process(9494)
self.serving_util.release()
def get_truth_val_by_inference(self):
data = np.array(
[0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795,
-0.0332]).astype("float32")[np.newaxis, :]
input_dict = {"x": data}
pd_config = paddle_infer.Config("uci_housing_model/")
pd_config.disable_gpu()
pd_config.switch_ir_optim(False)
predictor = paddle_infer.create_predictor(pd_config)
input_names = predictor.get_input_names()
for i, input_name in enumerate(input_names):
input_handle = predictor.get_input_handle(input_name)
input_handle.copy_from_cpu(input_dict[input_name])
predictor.run()
output_data_dict = {}
output_names = predictor.get_output_names()
for _, output_data_name in enumerate(output_names):
output_handle = predictor.get_output_handle(output_data_name)
output_data = output_handle.copy_to_cpu()
output_data_dict[output_data_name] = output_data
# 对齐Serving output
print(output_data_dict)
output_data_dict["price"] = output_data_dict["fc_0.tmp_1"]
del output_data_dict["fc_0.tmp_1"]
self.truth_val = output_data_dict
print(self.truth_val, self.truth_val["price"].shape)
def predict_brpc(self, batch_size=1):
data = np.array(
[0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795,
-0.0332]).astype("float32")[np.newaxis, :]
client = Client()
client.load_client_config("uci_housing_client/serving_client_conf.prototxt")
client.connect(["127.0.0.1:9494"])
fetch_list = client.get_fetch_names()
fetch_map = client.predict(
feed={"x": data}, fetch=fetch_list, batch=True)
print(fetch_map)
return fetch_map
def predict_http(self, batch_size=1):
data = np.array(
[0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795,
-0.0332]).astype("float32")[np.newaxis, :]
client = HttpClient()
client.load_client_config("uci_housing_client/serving_client_conf.prototxt")
client.connect(["127.0.0.1:9494"])
fetch_list = client.get_fetch_names()
fetch_map = client.predict(
feed={"x": data}, fetch=fetch_list, batch=True)
print(fetch_map)
return fetch_map
def test_cpu(self):
# 1.start server
self.serving_util.start_server_by_shell(
cmd=f"{self.serving_util.py_version} -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9494",
sleep=5,
)
# 2.resource check
assert count_process_num_on_port(9494) == 1
# assert check_gpu_memory(0) is False
# 3.keywords check
# 4.predict by brpc
# batch_size 1
result_data = self.predict_brpc()
self.serving_util.check_result(result_data=result_data, truth_data=self.truth_val, batch_size=1)
result_data = self.predict_http()
self.serving_util.check_result(result_data=result_data, truth_data=self.truth_val, batch_size=1)
# 5.release
kill_process(9494)
def test_gpu(self):
# 1.start server
self.serving_util.start_server_by_shell(
cmd=f"{self.serving_util.py_version} -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9494 --gpu_ids 0",
sleep=5,
)
# 2.resource check
assert count_process_num_on_port(9494) == 1
# assert check_gpu_memory(0) is False
# 3.keywords check
# 4.predict by brpc
# batch_size 1
result_data = self.predict_brpc()
self.serving_util.check_result(result_data=result_data, truth_data=self.truth_val, batch_size=1)
result_data = self.predict_http()
self.serving_util.check_result(result_data=result_data, truth_data=self.truth_val, batch_size=1)
# 5.release
kill_process(9494)
if __name__ == '__main__':
sss = TestCPPClient()
sss.get_truth_val_by_inference()
import os
import subprocess
import numpy as np
import copy
import cv2
import requests
import json
import sys
from paddle_serving_server.pipeline import PipelineClient
from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize, RCNNPostprocess
from paddle_serving_app.reader import Sequential, File2Image, Resize, Transpose, BGR2RGB, SegPostprocess
import paddle.inference as paddle_infer
from util import *
class TestUCIPipeline(object):
def setup_class(self):
serving_util = ServingTest(data_path="fit_a_line", example_path="simple_web_service", model_dir="uci_housing_model",
client_dir="uci_housing_client")
serving_util.check_model_data_exist()
self.get_truth_val_by_inference(self)
self.serving_util = serving_util
def teardown_method(self):
print_log(["stderr.log", "stdout.log",
"log/serving.ERROR", "PipelineServingLogs/pipeline.log"], iden="after predict")
kill_process(9998)
self.serving_util.release()
def get_truth_val_by_inference(self):
data = np.array(
[0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795,
-0.0332]).astype("float32")[np.newaxis, :]
input_dict = {"x": data}
pd_config = paddle_infer.Config("uci_housing_model/")
pd_config.disable_gpu()
pd_config.switch_ir_optim(False)
predictor = paddle_infer.create_predictor(pd_config)
input_names = predictor.get_input_names()
for i, input_name in enumerate(input_names):
input_handle = predictor.get_input_handle(input_name)
input_handle.copy_from_cpu(input_dict[input_name])
predictor.run()
output_data_dict = {}
output_names = predictor.get_output_names()
for _, output_data_name in enumerate(output_names):
output_handle = predictor.get_output_handle(output_data_name)
output_data = output_handle.copy_to_cpu()
output_data_dict[output_data_name] = output_data
# 对齐Serving output
output_data_dict["prob"] = output_data_dict["fc_0.tmp_1"]
del output_data_dict["fc_0.tmp_1"]
self.truth_val = output_data_dict
print(self.truth_val, self.truth_val["prob"].shape)
def predict_pipeline_rpc(self, batch_size=1):
# 1.prepare feed_data
feed_dict = {'x': '0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332'}
# TODO 原示例不支持batch
# 2.init client
# fetch = ["label", "prob"]
client = PipelineClient()
client.connect(['127.0.0.1:9998'])
# 3.predict for fetch_map
ret = client.predict(feed_dict=feed_dict)
print(ret)
# 转换为dict
result = {"prob": np.array(eval(ret.value[0]))}
print(result)
return result
def predict_pipeline_http(self, batch_size=1):
# 1.prepare feed_data
data = '0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, ' \
'-0.0332'
feed_dict = {"key": [], "value": []}
# TODO 原示例不支持batch
feed_dict["key"].append("x")
feed_dict["value"].append(data)
# 2.predict for fetch_map
url = "http://127.0.0.1:18082/uci/prediction"
r = requests.post(url=url, data=json.dumps(feed_dict))
print(r.json())
# 转换为dict of numpy array
result = {"prob": np.array(eval(r.json()["value"][0]))}
return result
def test_cpu(self):
# 1.start server
self.serving_util.start_server_by_shell(
cmd=f"{self.serving_util.py_version} web_service.py config_cpu.yml",
sleep=5,
)
# 2.resource check
assert count_process_num_on_port(9998) == 1 # gRPC Server
assert count_process_num_on_port(18082) == 1 # gRPC gateway 代理、转发
#assert check_gpu_memory(0) is False
# 3.keywords check
check_keywords_in_server_log("MKLDNN is enabled", filename="stderr.log")
# 4.predict by rpc
# batch_size=1
result = self.predict_pipeline_rpc(batch_size=1)
self.serving_util.check_result(result_data=result, truth_data=self.truth_val, batch_size=1)
# # predict by http
result = self.predict_pipeline_http(batch_size=1) # batch_size=1
self.serving_util.check_result(result_data=result, truth_data=self.truth_val, batch_size=1)
# 5.release
kill_process(9998)
kill_process(18082)
def test_gpu(self):
# 1.start server
self.serving_util.start_server_by_shell(
cmd=f"{self.serving_util.py_version} web_service.py config_gpu.yml",
sleep=5,
)
# 2.resource check
assert count_process_num_on_port(9998) == 1 # gRPC Server
assert count_process_num_on_port(18082) == 1 # gRPC gateway 代理、转发
#assert check_gpu_memory(0) is False
# 4.predict by rpc
# batch_size=1
result = self.predict_pipeline_rpc(batch_size=1)
self.serving_util.check_result(result_data=result, truth_data=self.truth_val, batch_size=1)
# # predict by http
result = self.predict_pipeline_http(batch_size=1) # batch_size=1
self.serving_util.check_result(result_data=result, truth_data=self.truth_val, batch_size=1)
# 5.release
kill_process(9998)
kill_process(18082)
import os
import pynvml
import argparse
import base64
import subprocess
import numpy as np
class ServingTest(object):
def __init__(self, data_path: str, example_path: str, model_dir: str, client_dir: str):
"""
需设置环境变量
CODE_PATH: repo上一级目录
DATA_PATH: 数据集根目录
py_version: python版本 python3.6~3.8
"""
code_path = os.path.dirname(os.path.realpath(__file__))
self.data_path = f"{code_path}/{data_path}/"
self.example_path = f"{code_path}/{example_path}/"
self.py_version = os.environ.get("PYTHON_EXECUTABLE")
self.model_dir = model_dir
self.client_config = f"{client_dir}/serving_client_conf.prototxt"
os.chdir(self.example_path)
print("======================cur path======================")
print(os.getcwd())
self.check_model_data_exist()
def check_model_data_exist(self):
if not os.path.exists(f"./{self.model_dir}"):
# 软链模型数据
dir_path, dir_names, file_names = next(os.walk(self.data_path))
for dir_ in dir_names:
abs_path = os.path.join(dir_path, dir_)
os.system(f"ln -s {abs_path} {dir_}")
for file in file_names:
abs_path = os.path.join(dir_path, file)
os.system(f"ln -s {abs_path} {file}")
def start_server_by_shell(self, cmd: str, sleep: int = 5, err="stderr.log", out="stdout.log", wait=False):
self.err = open(err, "w")
self.out = open(out, "w")
p = subprocess.Popen(cmd, shell=True, stdout=self.out, stderr=self.err)
os.system(f"sleep {sleep}")
if wait:
p.wait()
print_log([err, out])
@staticmethod
def check_result(result_data: dict, truth_data: dict, batch_size=1, delta=1e-3):
# flatten
predict_result = {}
truth_result = {}
for key, value in result_data.items():
predict_result[key] = value.flatten()
for key, value in truth_data.items():
truth_result[key] = np.repeat(value, repeats=batch_size, axis=0).flatten()
# print("预测值:", predict_result)
# print("真实值:", truth_result)
# compare
for key in predict_result.keys():
diff_array = diff_compare(predict_result[key], truth_result[key])
diff_count = np.sum(diff_array > delta)
assert diff_count == 0, f"total: {np.size(diff_array)} diff count:{diff_count} max:{np.max(diff_array)}"
# for key in predict_result.keys():
# for i, data in enumerate(predict_result[key]):
# diff = sig_fig_compare(data, truth_result[key][i])
# assert diff < delta, f"data:{data} truth:{truth_result[key][i]} diff is {diff} > {delta}, index:{i}"
@staticmethod
def parse_http_result(output):
# 转换http client返回的proto格式数据,统一为dict包numpy array
# todo 仅支持float_data
result_dict = {}
if isinstance(output, dict):
for tensor in output["outputs"][0]["tensor"]:
result_dict[tensor["alias_name"]] = np.array(tensor["float_data"]).reshape(tensor["shape"])
else:
for tensor in output.outputs[0].tensor:
result_dict[tensor.alias_name] = np.array(tensor.float_data).reshape(tensor.shape)
return result_dict
@staticmethod
def release(keywords="web_service.py"):
#os.system("kill -9 $(ps -ef | grep serving | awk '{print $2}') > /dev/null 2>&1")
os.system("kill -9 $(ps -ef | grep " + keywords + " | awk '{print $2}') > /dev/null 2>&1")
def kill_process(port, sleep_time=0):
command = "kill -9 $(netstat -nlp | grep :" + str(port) + " | awk '{print $7}' | awk -F'/' '{{ print $1 }}') > /dev/null 2>&1"
os.system(command)
# 解决端口占用
os.system(f"sleep {sleep_time}")
def check_gpu_memory(gpu_id):
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
mem_used = mem_info.used / 1024 ** 2
print(f"GPU-{gpu_id} memory used:", mem_used)
return mem_used > 100
def count_process_num_on_port(port):
command = "netstat -nlp | grep :" + str(port) + " | wc -l"
count = eval(os.popen(command).read())
print(f"port-{port} processes num:", count)
return count
def check_keywords_in_server_log(words: str, filename="stderr.log"):
p = subprocess.Popen(f"grep '{words}' {filename} > grep.log && head grep.log", shell=True)
p.wait()
assert p.returncode == 0, "keywords not found"
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def sig_fig_compare(num0, num1, delta=5):
difference = num0 - num1
num0_int_length = len(str(int(num0)))
num1_int_length = len(str(int(num1)))
num0_int = int(num0)
num1_int = int(num1)
if num0 < 1 and num1 < 1 and difference < 1:
return difference
elif num0_int_length == num1_int_length:
if num0_int_length >= 5:
return abs(num0_int - num1_int)
else:
scale = 5 - num1_int_length
num0_padding = num0 * scale
num1_padding = num1 * scale
return abs(num0_padding - num1_padding) / (10 * scale)
elif num0_int_length != num1_int_length:
return difference
def diff_compare(array1, array2):
diff = np.abs(array1 - array2)
return diff
def print_log(file_list, iden=""):
for file in file_list:
print(f"======================{file} {iden}=====================")
if os.path.exists(file):
with open(file, "r") as f:
print(f.read())
if file.startswith("log") or file.startswith("PipelineServingLogs"):
os.remove(file)
else:
print(f"{file} not exist")
print("======================================================")
def parse_prototxt(file):
with open(file, "r") as f:
lines = [i.strip().split(":") for i in f.readlines()]
engines = {}
for i in lines:
if len(i) > 1:
if i[0] in engines:
engines[i[0]].append(i[1].strip())
else:
engines[i[0]] = [i[1].strip()]
return engines
def default_args():
parser = argparse.ArgumentParser()
args = parser.parse_args([])
args.thread = 2
args.port = 9292
args.device = "cpu"
args.gpu_ids = [""]
args.op_num = 0
args.op_max_batch = 32
args.model = [""]
args.workdir = "workdir"
args.use_mkl = False
args.precision = "fp32"
args.use_calib = False
args.mem_optim_off = False
args.ir_optim = False
args.max_body_size = 512 * 1024 * 1024
args.use_encryption_model = False
args.use_multilang = False
args.use_trt = False
args.use_lite = False
args.use_xpu = False
args.product_name = None
args.container_id = None
args.gpu_multi_stream = False
return args
......@@ -34,6 +34,7 @@ import socket
from paddle_serving_server.env import CONF_HOME
import signal
from paddle_serving_server.util import *
from paddle_serving_server.env_check.run import *
# web_service.py is still used by Pipeline.
......@@ -114,7 +115,7 @@ def serve_args():
type=str,
default="start",
nargs="?",
help="stop or start PaddleServing")
help="stop or start PaddleServing, check running environemnt")
parser.add_argument(
"--thread",
type=int,
......@@ -450,7 +451,9 @@ if __name__ == "__main__":
os._exit(0)
else:
os._exit(-1)
elif args.server == "check":
check_env()
os._exit(0)
for single_model_config in args.model:
if os.path.isdir(single_model_config):
pass
......
......@@ -261,7 +261,7 @@ class WebService(object):
self.gpu_multi_stream = gpu_multi_stream
self.runtime_thread_num = runtime_thread_num
self.batch_infer_size = batch_infer_size
# record port and pid info for stopping process
dump_pid_file([self.port], "web_service")
# if gpuid != None, we will use gpuid first.
......
......@@ -29,36 +29,11 @@ import enum
import os
import copy
import time
from .error_catch import ErrorCatch, CustomException, ProductErrCode
from .error_catch import CustomExceptionCode as ChannelDataErrcode
_LOGGER = logging.getLogger(__name__)
class ChannelDataErrcode(enum.Enum):
"""
ChannelData error code
"""
OK = 0
TIMEOUT = 1
NOT_IMPLEMENTED = 2
TYPE_ERROR = 3
RPC_PACKAGE_ERROR = 4
CLIENT_ERROR = 5
CLOSED_ERROR = 6
NO_SERVICE = 7
UNKNOW = 8
INPUT_PARAMS_ERROR = 9
PRODUCT_ERROR = 100
class ProductErrCode(enum.Enum):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
pass
class ChannelDataType(enum.Enum):
"""
Channel data type
......
......@@ -26,11 +26,12 @@ import os
import logging
import collections
import json
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataErrcode, ChannelDataType, ChannelStopError,
ProductErrCode)
ChannelDataType, ChannelStopError)
from .error_catch import ProductErrCode
from .error_catch import CustomExceptionCode as ChannelDataErrcode
from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2
......@@ -42,7 +43,6 @@ class DAGExecutor(object):
"""
DAG Executor, the service entrance of DAG.
"""
def __init__(self, response_op, server_conf, worker_idx):
"""
Initialize DAGExecutor.
......@@ -114,6 +114,7 @@ class DAGExecutor(object):
self._client_profile_key = "pipeline.profile"
self._client_profile_value = "1"
@ErrorCatch
def start(self):
"""
Starting one thread for receiving data from the last channel background.
......@@ -479,20 +480,36 @@ class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def __init__(self, request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
_LOGGER.info("{}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive))
@ErrorCatch
@ParamChecker
def init_helper(self, request_name: str,
response_op,
use_profile: [bool, None],
is_thread_op: bool,
channel_size,
build_dag_each_worker: [bool, None],
tracer,
channel_recv_frist_arrive):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
init_helper(self, request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive)
print("[DAG] Succ init")
_LOGGER.info("[DAG] Succ init")
@staticmethod
......
import sys
import enum
import os
import logging
import traceback
#from paddle_serving_server.pipeline import ResponseOp
import threading
import inspect
import traceback
import functools
import re
from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from .util import ThreadIdGenerator
from paddle_serving_server.util import kill_stop_process_by_pid
_LOGGER = logging.getLogger(__name__)
class CustomExceptionCode(enum.Enum):
"""
Add new Exception
0 Success
50 ~ 99 Product error
3000 ~ 3999 Internal service error
4000 ~ 4999 Conf error
5000 ~ 5999 User input error
6000 ~ 6999 Timeout error
7000 ~ 7999 Type Check error
8000 ~ 8999 Internal communication error
9000 ~ 9999 Inference error
10000 Other error
"""
OK = 0
PRODUCT_ERROR = 50
NOT_IMPLEMENTED = 3000
CLOSED_ERROR = 3001
NO_SERVICE = 3002
INIT_ERROR = 3003
CONF_ERROR = 4000
INPUT_PARAMS_ERROR = 5000
TIMEOUT = 6000
TYPE_ERROR = 7000
RPC_PACKAGE_ERROR = 8000
CLIENT_ERROR = 9000
UNKNOW = 10000
class ProductErrCode(enum.Enum):
"""
ProductErrCode is to record business error codes.
the ProductErrCode number ranges from 51 to 99
product developers can directly add error code into this class.
"""
pass
class CustomException(Exception):
"""
An self-defined exception class
Usage : raise CustomException(CustomExceptionCode.exceptionCode, errorMsg, isSendToUser=False)
Args :
exceptionCode : CustomExceptionCode or ProductErrCode
errorMsg : string message you want to describe the error
isSendToUser : whether send to user or just record in errorlog
Return : An string of error_info
"""
def __init__(self, exceptionCode, errorMsg, isSendToUser=False):
super().__init__(self)
self.error_info = "\n\texception_code: {}\n"\
"\texception_type: {}\n"\
"\terror_msg: {}\n"\
"\tis_send_to_user: {}".format(exceptionCode.value,
CustomExceptionCode(exceptionCode).name, errorMsg, isSendToUser)
def __str__(self):
return self.error_info
class ErrorCatch():
"""
An decorator class to catch error for method or function.
Usage : @ErrorCatch
Args : None
Returns: tuple(res, response)
res is the original funciton return
response includes erro_no and erro_msg
"""
def __call__(self, func):
if inspect.isfunction(func) or inspect.ismethod(func):
@functools.wraps(func)
def wrapper(*args, **kw):
try:
res = func(*args, **kw)
except CustomException as e:
if "log_id" in kw.keys():
log_id = kw["log_id"]
elif "logid_dict" in kw.keys() and "data_id" in kw.keys():
log_id = kw["logid_dict"].get(kw["data_id"])
else:
log_id = 0
resp = pipeline_service_pb2.Response()
_LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName: {}\nArgs: {}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__, args))
split_list = re.split("\n|\t|:", str(e))
resp.err_no = int(split_list[3])
resp.err_msg = "Log_id: {} Raise_msg: {} ClassName: {} FunctionName: {}".format(log_id, split_list[9], func.__qualname__ ,func.__name__ )
is_send_to_user = split_list[-1].replace(" ", "")
if is_send_to_user == "True":
return (None, resp)
else:
print("Erro_Num: {} {}".format(resp.err_no, resp.err_msg))
print("Init error occurs. For detailed information. Please look up pipeline.log.wf in PipelineServingLogs by log_id.")
kill_stop_process_by_pid("kill", os.getpgid(os.getpid()))
except Exception as e:
if "log_id" in kw.keys():
log_id = kw["log_id"]
elif "logid_dict" in kw.keys() and "data_id" in kw.keys():
log_id = kw["logid_dict"].get(kw["data_id"])
else:
log_id = 0
resp = pipeline_service_pb2.Response()
_LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName: {}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__))
resp.err_no = CustomExceptionCode.UNKNOW.value
resp.err_msg = "Log_id: {} Raise_msg: {} ClassName: {} FunctionName: {}".format(log_id, str(e).replace("\'", ""), func.__qualname__ ,func.__name__ )
return (None, resp)
else:
resp = pipeline_service_pb2.Response()
resp.err_no = CustomExceptionCode.OK.value
resp.err_msg = ""
return (res, resp)
return wrapper
def ParamChecker(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
# fetch the argument name list.
parameters = inspect.signature(function).parameters
argument_list = list(parameters.keys())
# fetch the argument checker list.
checker_list = [parameters[argument].annotation for argument in argument_list]
# fetch the value list.
value_list = [inspect.getcallargs(function, *args, **kwargs)[argument] for argument in inspect.getfullargspec(function).args]
# initialize the result dictionary, where key is argument, value is the checker result.
result_dictionary = dict()
for argument, value, checker in zip(argument_list, value_list, checker_list):
result_dictionary[argument] = check(argument, value, checker, function)
# fetch the invalid argument list.
invalid_argument_list = [key for key in argument_list if not result_dictionary[key]]
# if there are invalid arguments, raise the error.
if len(invalid_argument_list) > 0:
raise CustomException(CustomExceptionCode.INPUT_PARAMS_ERROR, "invalid arg list: {}".format(invalid_argument_list), True)
# check the result.
result = function(*args, **kwargs)
checker = inspect.signature(function).return_annotation
if not check('return', result, checker, function):
raise CustomException(CustomExceptionCode.INPUT_PARAMS_ERROR, "invalid return type", True)
# return the result.
return result
return wrapper
def check(name, value, checker, function):
if isinstance(checker, (tuple, list, set)):
return True in [check(name, value, sub_checker, function) for sub_checker in checker]
elif checker is inspect._empty:
return True
elif checker is None:
return value is None
elif isinstance(checker, type):
return isinstance(value, checker)
elif callable(checker):
result = checker(value)
return result
class ParamVerify(object):
@staticmethod
def int_check(c, lower_bound=None, upper_bound=None):
if not isinstance(c, int):
return False
if isinstance(lower_bound, int) and isinstance(upper_bound, int):
return c >= lower_bound and c <= upper_bound
return True
@staticmethod
def file_check(f):
if not isinstance(f, str):
return False
if os.path.exist(f):
return True
else:
return False
@staticmethod
def check_feed_dict(feed_dict, right_feed_list):
if not isinstance(feed_dict, dict):
return False
filter_feed_list = list(filter(lambda x: "lod" not in x, feed_dict.keys()))
# right_feed_list reads from model config. the keys of feed_dict should same to it.
if len(filter_feed_list) != len(right_feed_list):
return False
for key in right_feed_list:
if key not in filter_feed_list:
return False
return True
@staticmethod
def check_fetch_list(fetch_list, right_fetch_list):
if len(fetch_list) == 0:
return True
if not isinstance(fetch_list, list):
return False
# right_fetch_list reads from model config, fetch_list should part of it.
for key in fetch_list:
if key not in right_fetch_list:
return False
return True
ErrorCatch = ErrorCatch()
......@@ -15,6 +15,7 @@
import os
import logging
import multiprocessing
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode
#from paddle_serving_server import OpMaker, OpSeqMaker
#from paddle_serving_server import Server as GpuServer
#from paddle_serving_server import Server as CpuServer
......
......@@ -34,10 +34,14 @@ elif sys.version_info.major == 3:
else:
raise Exception("Error Python version")
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
check_feed_dict=ParamVerify.check_feed_dict
check_fetch_list=ParamVerify.check_fetch_list
from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataErrcode,
ChannelData, ChannelDataType, ChannelStopError,
ChannelTimeoutError, ProductErrCode)
from .channel import (ThreadChannel, ProcessChannel,ChannelData,
ChannelDataType, ChannelStopError, ChannelTimeoutError)
from .error_catch import ProductErrCode
from .error_catch import CustomExceptionCode as ChannelDataErrcode
from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler
from . import local_service_handler
......@@ -113,6 +117,19 @@ class Op(object):
self._succ_init_op = False
self._succ_close_op = False
# for feed/fetch dict cehck
@staticmethod
def get_feed_fetch_list(client):
from paddle_serving_app.local_predict import LocalPredictor
if isinstance(client, Client):
feed_names = client.get_feed_names()
fetch_names = client.get_fetch_names()
if isinstance(client, LocalPredictor):
feed_names = client.feed_names_
fetch_names = client.fetch_names_
return feed_names, fetch_names
def init_from_dict(self, conf):
"""
Initializing one Op from config.yaml. If server_endpoints exist,
......@@ -133,7 +150,6 @@ class Op(object):
self._fetch_names = conf.get("fetch_list")
if self._client_config is None:
self._client_config = conf.get("client_config")
if self._timeout is None:
self._timeout = conf["timeout"]
if self._timeout > 0:
......@@ -353,12 +369,14 @@ class Op(object):
if self.client_type == 'brpc':
client = Client()
client.load_client_config(client_config)
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(client)
elif self.client_type == 'pipeline_grpc':
client = PPClient()
elif self.client_type == 'local_predictor':
if self.local_predictor is None:
raise ValueError("local predictor not yet created")
client = self.local_predictor
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(client)
else:
raise ValueError("Failed to init client: unknow client "
"type {}".format(self.client_type))
......@@ -367,6 +385,7 @@ class Op(object):
_LOGGER.info("Op({}) has no fetch name set. So fetch all vars")
if self.client_type != "local_predictor":
client.connect(server_endpoints)
_LOGGER.info("init_client, feed_list:{}, fetch_list: {}".format(self.right_feed_names, self.right_fetch_names))
return client
def get_input_ops(self):
......@@ -543,7 +562,7 @@ class Op(object):
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
def process(self, feed_batch, typical_logid=0):
"""
In process stage, send requests to the inference server or predict locally.
......@@ -560,7 +579,19 @@ class Op(object):
call_result = None
err_code = ChannelDataErrcode.OK.value
err_info = ""
@ErrorCatch
@ParamChecker
def feed_fetch_list_check_helper(feed_batch : lambda feed_batch: check_feed_dict(feed_batch[0], self.right_feed_names),
fetch_list : lambda fetch_list: check_fetch_list(fetch_list, self.right_fetch_names),
log_id):
return None
_, resp = feed_fetch_list_check_helper(feed_batch, self._fetch_names, log_id=typical_logid)
if resp.err_no != CustomExceptionCode.OK.value:
err_code = resp.err_no
err_info = resp.err_msg
call_result = None
return call_result, err_code, err_info
if self.client_type == "local_predictor":
err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0:
......@@ -801,46 +832,40 @@ class Op(object):
preped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
skip_process_dict = {}
@ErrorCatch
def preprocess_help(self, parsed_data, data_id, logid_dict):
preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
return preped_data, is_skip_process, prod_errcode, prod_errinfo
for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None
is_skip_process = False
prod_errcode, prod_errinfo = None, None
log_id = logid_dict.get(data_id)
try:
preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
# Set skip_process_dict
process_res, resp = preprocess_help(self, parsed_data, data_id = data_id,
logid_dict = logid_dict)
if resp.err_no == CustomExceptionCode.OK.value:
preped_data, is_skip_process, prod_errcode, prod_errinfo = process_res
if is_skip_process is True:
skip_process_dict[data_id] = True
except TypeError as e:
# Error type in channeldata.datatype
error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.TYPE_ERROR.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
except Exception as e:
error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
if prod_errcode is not None:
_LOGGER.error("data_id: {} return product error. Product ErrNo:{}, Product ErrMsg: {}".format(data_id, prod_errcode, prod_errinfo))
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
skip_process_dict[data_id] = True
if error_channeldata is not None:
err_channeldata_dict[data_id] = error_channeldata
......@@ -1019,12 +1044,13 @@ class Op(object):
# 2 kinds of errors
if error_code != ChannelDataErrcode.OK.value or midped_batch is None:
error_info = "(log_id={}) {} failed to predict.".format(
typical_logid, self.name)
error_info = "[{}] failed to predict. {}. Please check the input dict and checkout PipelineServingLogs/pipeline.log for more details.".format(
self.name, error_info)
_LOGGER.error(error_info)
for data_id in data_ids:
err_channeldata_dict[data_id] = ChannelData(
error_code=ChannelDataErrcode.CLIENT_ERROR.value,
error_code=error_code,
error_info=error_info,
data_id=data_id,
log_id=logid_dict.get(data_id))
......@@ -1095,68 +1121,58 @@ class Op(object):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
@ErrorCatch
def postprocess_help(self, parsed_data_dict, midped_data, data_id, logid_dict):
postped_data, prod_errcode, prod_errinfo = self.postprocess(parsed_data_dict[data_id],
midped_data, data_id, logid_dict.get(data_id))
if not isinstance(postped_data, dict):
raise CustomException(CustomExceptionCode.TYPE_ERROR, "postprocess should return dict", True)
return postped_data, prod_errcode, prod_errinfo
for data_id, midped_data in midped_data_dict.items():
log_id = logid_dict.get(data_id)
postped_data, err_channeldata = None, None
prod_errcode, prod_errinfo = None, None
try:
postped_data, prod_errcode, prod_errinfo = self.postprocess(
parsed_data_dict[data_id], midped_data, data_id,
logid_dict.get(data_id))
except Exception as e:
error_info = "(data_id={} log_id={}) {} Failed to postprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
post_res, resp = postprocess_help(self, parsed_data_dict, midped_data, data_id
= data_id, logid_dict = logid_dict)
if resp.err_no == CustomExceptionCode.OK.value:
postped_data, prod_errcode, prod_errinfo = post_res
if prod_errcode is not None:
# product errors occured
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
if err_channeldata is not None:
err_channeldata_dict[data_id] = err_channeldata
continue
else:
if not isinstance(postped_data, dict):
error_info = "(log_id={} log_id={}) {} Failed to postprocess: " \
"output of postprocess funticon must be " \
"dict type, but get {}".format(
data_id, log_id, op_info_prefix,
type(postped_data))
_LOGGER.error(error_info)
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
err_channeldata_dict[data_id] = err_channeldata
continue
output_data = None
err, _ = ChannelData.check_npdata(postped_data)
if err == 0:
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
else:
output_data = ChannelData(
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data
output_data = None
err, _ = ChannelData.check_npdata(postped_data)
if err == 0:
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
else:
output_data = ChannelData(
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data
_LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict
......@@ -1353,7 +1369,6 @@ class Op(object):
_LOGGER.debug("op:{} parse_end:{}".format(op_info_prefix,
time.time()))
# print
front_cost = int(round(_time() * 1000000)) - start
for data_id, parsed_data in parsed_data_dict.items():
_LOGGER.debug(
......@@ -1507,26 +1522,30 @@ class Op(object):
Returns:
TimeProfiler
"""
if is_thread_op:
with self._for_init_op_lock:
if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None
# init client
self.client = self.init_client(self._client_config,
@ErrorCatch
def init_helper(self, is_thread_op, concurrency_idx):
if is_thread_op:
with self._for_init_op_lock:
if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
# user defined
self.init_op()
self._succ_init_op = True
self._succ_close_op = False
else:
self.concurrency_idx = concurrency_idx
# init client
self.client = self.init_client(self._client_config,
# user defined
self.init_op()
self._succ_init_op = True
self._succ_close_op = False
else:
self.concurrency_idx = concurrency_idx
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
# user defined
self.init_op()
# user defined
self.init_op()
init_helper(self, is_thread_op, concurrency_idx)
print("[OP Object] init success")
# use a separate TimeProfiler per thread or process
profiler = TimeProfiler()
profiler.enable(True)
......
......@@ -24,7 +24,7 @@ import yaml
import io
import time
import os
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from . import operator
from . import dag
......@@ -40,14 +40,22 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
"""
Pipeline Servicer entrance.
"""
def __init__(self, name, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__()
self._name = name
# init dag executor
self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start()
@ErrorCatch
@ParamChecker
def init_helper(self, name, response_op,
dag_conf: dict,
worker_idx=-1):
self._name = name
self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start()
super(PipelineServicer, self).__init__()
init_res = init_helper(self, name, response_op, dag_conf, worker_idx)
if init_res[1].err_no != CustomExceptionCode.OK.value :
raise CustomException(CustomExceptionCode.INIT_ERROR, "pipeline server init error")
print("[PipelineServicer] succ init")
_LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context):
......
......@@ -20,3 +20,5 @@ sentencepiece==0.1.92; platform_machine != "aarch64"
sentencepiece; platform_machine == "aarch64"
opencv-python==4.2.0.32; platform_machine != "aarch64"
opencv-python; platform_machine == "aarch64"
pytest
pynvml
......@@ -38,6 +38,9 @@ REQUIRED_PACKAGES = [
]
packages=['paddle_serving_server',
'paddle_serving_server.env_check',
'paddle_serving_server.env_check.fit_a_line',
'paddle_serving_server.env_check.simple_web_service',
'paddle_serving_server.proto',
'paddle_serving_server.pipeline',
'paddle_serving_server.pipeline.proto',
......@@ -46,6 +49,12 @@ packages=['paddle_serving_server',
package_dir={'paddle_serving_server':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server',
'paddle_serving_server.env_check':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/env_check',
'paddle_serving_server.env_check.fit_a_line':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/env_check/fit_a_line',
'paddle_serving_server.env_check.simple_web_service':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/env_check/simple_web_service',
'paddle_serving_server.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto',
'paddle_serving_server.pipeline':
......@@ -57,8 +66,8 @@ package_dir={'paddle_serving_server':
'paddle_serving_server.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway/proto'}
package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so'],}
package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so', 'env_check/fit_a_line/*', 'env_check/simple_web_service/*', 'env_check/fit_a_line/uci_housing_model/*', 'env_check/fit_a_line/uci_housing_client/*', 'env_check/simple_web_service/uci_housing_model/*', 'env_check/simple_web_service/uci_housing_client/*'],}
include_package_data=True
setup(
name='${SERVER_PACKAGE_NAME}',
version= package_version,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册