提交 8a7006bd 编写于 作者: B barriery

add defualtPipelineWebService demo

上级 59775b70
......@@ -18,13 +18,13 @@ tar xf test_imgs.tar
## Start Service
You can choose one of the following versions to launch start Service.
You can choose one of the following versions to start Service.
### Remote Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python pipeline_server.py &>pipeline.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### Local Service Version
......
......@@ -23,7 +23,7 @@ tar xf test_imgs.tar
```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python pipeline_server.py &>pipeline.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### 本地服务版本
......
# Pipeline Web Service
# Default Pipeline Web Service
这里以 Uci 服务为例来介绍 DefaultPipelineWebService 的使用。
## 获取模型
```
......@@ -15,3 +17,6 @@ python web_service.py &>log.txt &
```
curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
## 更多
`web_service.py``local_pipeline_server.py` 本质是相同的。
port: 18081
worker_num: 4
build_dag_each_worker: false
grpc_gateway_port: 18080
dag:
is_thread_op: false
client_type: brpc
retry: 1
use_profile: false
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
try:
from paddle_serving_server import pipeline
except ImportError:
from paddle_serving_server_gpu import pipeline
import numpy as np
import logging
_LOGGER = logging.getLogger()
class UciOp(pipeline.Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
_LOGGER.info(input_dict)
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dicts, fetch_dict):
_LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
read_op = pipeline.RequestOp()
uci_op = UciOp(
name="uci",
input_ops=[read_op],
local_rpc_service_handler=pipeline.LocalRpcServiceHandler(
model_config="uci_housing_model",
workdir="workdir",
thread_num=2,
devices="0,1", # if devices="", use cpu
mem_optim=True,
ir_optim=False),
concurrency=1)
response_op = pipeline.ResponseOp(input_ops=[uci_op])
server = pipeline.PipelineServer()
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.start_local_rpc_service() # after prepare_server
server.run_server()
......@@ -12,32 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.web_service import PipelineWebService
from paddle_serving_server_gpu.web_service import DefaultPipelineWebService
import logging
import numpy as np
_LOGGER = logging.getLogger()
user_handler = logging.StreamHandler()
user_handler.setLevel(logging.INFO)
user_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(user_handler)
class UciService(PipelineWebService):
class UciService(DefaultPipelineWebService):
def init_separator(self):
self.separator = ","
def preprocess(self, input_dict):
_LOGGER.info(input_dict)
x_str = input_dict["x"]
input_dict["x"] = np.array(
[float(x.strip()) for x in x_str.split(self.separator)])
# _LOGGER.info(input_dict)
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dict, fetch_dict):
_LOGGER.info(fetch_dict)
# _LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
......@@ -45,6 +40,6 @@ class UciService(PipelineWebService):
uci_service = UciService(name="uci")
uci_service.init_separator()
uci_service.load_model_config("./uci_housing_model")
uci_service.set_gpus("0")
uci_service.prepare_server(workdir="workdir", port=18080, device="gpu")
uci_service.set_gpus("0,1")
uci_service.prepare_server(workdir="workdir", port=18080)
uci_service.run_service()
......@@ -28,108 +28,6 @@ from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator
class DefaultRpcServer(object):
def __init__(self, available_port_generator):
self.available_port_generator = available_port_generator
self.gpus = None
self.rpc_service_list = []
self.server_pros = []
self.port_list = []
self.model_config = None
self.workdir = None
self.device = None
self.fetch_vars = None
def get_fetch_list(self):
return self.fetch_vars
def get_port_list(self):
return self.port_list
def load_model_config(self, model_config):
self.model_config = model_config
def set_gpus(self, gpus):
self.gpus = [int(x) for x in gpus.split(",")]
def _prepare_one_server(self,
workdir="conf",
port=9292,
gpuid=0,
thread_num=2,
mem_optim=True,
ir_optim=False):
device = "gpu"
if gpuid == -1:
device = "cpu"
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.load_model_config(self.model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
if self.fetch_vars is None:
self.fetch_vars = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def prepare_server(self,
workdir="",
device="gpu",
mem_optim=True,
ir_optim=False):
self.workdir = workdir
self.device = device
default_port = 12000
while len(self.port_list) < len(self.gpus):
self.port_list.append(self.available_port_generator.next())
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(
self.workdir,
self.port_list[0],
-1,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self._prepare_one_server(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
def start_server(self):
import socket
for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._start_one_server, args=(i, ))
self.server_pros.append(p)
for p in self.server_pros:
p.start()
class DefaultPipelineServer(object):
def __init__(self, available_port_generator):
self.server = pipeline.PipelineServer()
......@@ -149,26 +47,34 @@ class DefaultPipelineServer(object):
postped_data = f_postprocess(input_dict, fetch_dict)
return postped_data
return InternelOp
def init_server(self,
internel_op_class,
internel_op_name,
internel_op_endpoints,
internel_op_fetch_list,
internel_op_client_config,
internel_op_concurrency,
internel_op_timeout=-1,
internel_op_retry=1,
internel_op_batch_size=1,
internel_op_auto_batching_timeout=None):
self.internel_op_class = InternelOp
def create_local_rpc_service_handler(self, model_config, workdir,
thread_num, devices, mem_optim,
ir_optim):
self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler(
model_config=model_config,
workdir=workdir,
thread_num=thread_num,
devices=devices,
mem_optim=mem_optim,
ir_optim=ir_optim,
available_port_generator=self.available_port_generator)
def init_pipeline_server(self,
internel_op_name,
internel_op_fetch_list=[],
internel_op_concurrency=4,
internel_op_timeout=-1,
internel_op_retry=1,
internel_op_batch_size=1,
internel_op_auto_batching_timeout=None):
read_op = pipeline.RequestOp()
internel_op = internel_op_class(
internel_op = self.internel_op_class(
name=internel_op_name,
input_ops=[read_op],
server_endpoints=internel_op_endpoints,
fetch_list=internel_op_fetch_list,
client_config=internel_op_client_config,
local_rpc_service_handler=self.local_rpc_service_handler,
concurrency=internel_op_concurrency,
timeout=internel_op_timeout,
retry=internel_op_retry,
......@@ -177,16 +83,16 @@ class DefaultPipelineServer(object):
response_op = pipeline.ResponseOp(input_ops=[internel_op])
self.server.set_response_op(response_op)
def prepare_server(self,
rpc_port,
http_port,
worker_num,
build_dag_each_worker=False,
is_thread_op=False,
client_type="brpc",
retry=1,
use_profile=False,
tracer_interval_s=-1):
def prepare_pipeline_server(self,
rpc_port,
http_port,
worker_num,
build_dag_each_worker=False,
is_thread_op=False,
client_type="brpc",
retry=1,
use_profile=False,
tracer_interval_s=-1):
default_server_conf = {
"port": rpc_port,
"worker_num": worker_num,
......@@ -204,76 +110,61 @@ class DefaultPipelineServer(object):
}
self.server.prepare_server(yml_dict=default_server_conf)
def start_server(self):
def start_pipeline_server(self):
self.server.start_local_rpc_service()
self.server.run_server()
class PipelineWebService(object):
class DefaultPipelineWebService(object):
def __init__(self, name="default"):
self.name = name
self.port = None
self.model_config = None
self.gpus = ""
self.available_port_generator = AvailablePortGenerator(12000)
self.default_rpc_server = DefaultRpcServer(
self.available_port_generator)
self.default_pipeline_server = DefaultPipelineServer(
self.available_port_generator)
def load_model_config(self, model_config):
self.model_config = model_config
self.default_rpc_server.load_model_config(model_config)
def set_gpus(self, gpus):
self.default_rpc_server.set_gpus(gpus)
self.gpus = gpus
def prepare_server(self,
workdir="",
workdir="workdir",
port=9393,
device="gpu",
worker_num=4,
thread_num=2,
grpc_worker_num=4,
mem_optim=True,
ir_optim=False):
if not self.available_port_generator.port_is_available(port):
raise SystemExit(
"Failed to prepare server: prot({}) is not available".format(
port))
raise SystemExit("Failed to prepare server: prot({}) is not"
" available".format(port))
self.port = port
# rpc server
self.default_rpc_server.prepare_server(
self.default_pipeline_server.create_internel_op_class(self.preprocess,
self.postprocess)
self.default_pipeline_server.create_local_rpc_service_handler(
model_config=self.model_config,
workdir=workdir,
device=device,
thread_num=thread_num,
devices=self.gpus,
mem_optim=mem_optim,
ir_optim=ir_optim)
rpc_endpoints = self.default_rpc_server.get_port_list()
fetch_list = self.default_rpc_server.get_fetch_list()
# pipeline server
internel_op_class = self.default_pipeline_server.create_internel_op_class(
self.preprocess, self.postprocess)
internel_op_endpoints = [
"127.0.0.1:{}".format(port) for port in rpc_endpoints
]
self.default_pipeline_server.init_server(
internel_op_class=internel_op_class,
internel_op_name=self.name,
internel_op_endpoints=internel_op_endpoints,
internel_op_fetch_list=fetch_list,
internel_op_client_config="{}/serving_server_conf.prototxt".format(
self.model_config),
internel_op_concurrency=worker_num)
self.default_pipeline_server.prepare_server(
self.default_pipeline_server.init_pipeline_server(
internel_op_name=self.name)
self.default_pipeline_server.prepare_pipeline_server(
rpc_port=self.available_port_generator.next(),
http_port=self.port,
worker_num=worker_num)
worker_num=grpc_worker_num)
def run_service(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address: http://{}:{}/prediction"
.format(localIP, self.port))
self.default_rpc_server.start_server()
self.default_pipeline_server.start_server()
self.default_pipeline_server.start_pipeline_server()
def preprocess(self, feed_dict):
return feed_dict
......
......@@ -124,6 +124,8 @@ class PipelineServer(object):
def start_local_rpc_service(self):
# only brpc now
if self._conf["dag"]["client_type"] != "brpc":
raise ValueError("Local Servoce Version must be brpc type now.")
used_op, _ = DAG.get_use_ops(self._response_op)
for op in used_op:
if not isinstance(op, RequestOp):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册