提交 3b800c90 编写于 作者: B barriery

add defualtPipelineWebService demo

上级 3b7fc942
...@@ -18,13 +18,13 @@ tar xf test_imgs.tar ...@@ -18,13 +18,13 @@ tar xf test_imgs.tar
## Start Service ## Start Service
You can choose one of the following versions to launch start Service. You can choose one of the following versions to start Service.
### Remote Service Version ### Remote Service Version
``` ```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log & python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log & python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python pipeline_server.py &>pipeline.log & python remote_service_pipeline_server.py &>pipeline.log &
``` ```
### Local Service Version ### Local Service Version
......
...@@ -23,7 +23,7 @@ tar xf test_imgs.tar ...@@ -23,7 +23,7 @@ tar xf test_imgs.tar
``` ```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log & python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log & python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python pipeline_server.py &>pipeline.log & python remote_service_pipeline_server.py &>pipeline.log &
``` ```
### 本地服务版本 ### 本地服务版本
......
# Pipeline Web Service # Default Pipeline Web Service
这里以 Uci 服务为例来介绍 DefaultPipelineWebService 的使用。
## 获取模型 ## 获取模型
``` ```
...@@ -15,3 +17,6 @@ python web_service.py &>log.txt & ...@@ -15,3 +17,6 @@ python web_service.py &>log.txt &
``` ```
curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
``` ```
## 更多
`web_service.py``local_pipeline_server.py` 本质是相同的。
port: 18081
worker_num: 4
build_dag_each_worker: false
grpc_gateway_port: 18080
dag:
is_thread_op: false
client_type: brpc
retry: 1
use_profile: false
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
try:
from paddle_serving_server import pipeline
except ImportError:
from paddle_serving_server_gpu import pipeline
import numpy as np
import logging
_LOGGER = logging.getLogger()
class UciOp(pipeline.Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
_LOGGER.info(input_dict)
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dicts, fetch_dict):
_LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
read_op = pipeline.RequestOp()
uci_op = UciOp(
name="uci",
input_ops=[read_op],
local_rpc_service_handler=pipeline.LocalRpcServiceHandler(
model_config="uci_housing_model",
workdir="workdir",
thread_num=2,
devices="0,1", # if devices="", use cpu
mem_optim=True,
ir_optim=False),
concurrency=1)
response_op = pipeline.ResponseOp(input_ops=[uci_op])
server = pipeline.PipelineServer()
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.start_local_rpc_service() # after prepare_server
server.run_server()
...@@ -12,32 +12,27 @@ ...@@ -12,32 +12,27 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_server_gpu.web_service import PipelineWebService from paddle_serving_server_gpu.web_service import DefaultPipelineWebService
import logging import logging
import numpy as np import numpy as np
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
user_handler = logging.StreamHandler()
user_handler.setLevel(logging.INFO)
user_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(user_handler)
class UciService(PipelineWebService): class UciService(DefaultPipelineWebService):
def init_separator(self): def init_separator(self):
self.separator = "," self.separator = ","
def preprocess(self, input_dict): def preprocess(self, input_dict):
_LOGGER.info(input_dict) # _LOGGER.info(input_dict)
x_str = input_dict["x"] x_value = input_dict["x"]
input_dict["x"] = np.array( if isinstance(x_value, (str, unicode)):
[float(x.strip()) for x in x_str.split(self.separator)]) input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict return input_dict
def postprocess(self, input_dict, fetch_dict): def postprocess(self, input_dict, fetch_dict):
_LOGGER.info(fetch_dict) # _LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0]) fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict return fetch_dict
...@@ -45,6 +40,6 @@ class UciService(PipelineWebService): ...@@ -45,6 +40,6 @@ class UciService(PipelineWebService):
uci_service = UciService(name="uci") uci_service = UciService(name="uci")
uci_service.init_separator() uci_service.init_separator()
uci_service.load_model_config("./uci_housing_model") uci_service.load_model_config("./uci_housing_model")
uci_service.set_gpus("0") uci_service.set_gpus("0,1")
uci_service.prepare_server(workdir="workdir", port=18080, device="gpu") uci_service.prepare_server(workdir="workdir", port=18080)
uci_service.run_service() uci_service.run_service()
...@@ -28,108 +28,6 @@ from paddle_serving_server_gpu import pipeline ...@@ -28,108 +28,6 @@ from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator
class DefaultRpcServer(object):
def __init__(self, available_port_generator):
self.available_port_generator = available_port_generator
self.gpus = None
self.rpc_service_list = []
self.server_pros = []
self.port_list = []
self.model_config = None
self.workdir = None
self.device = None
self.fetch_vars = None
def get_fetch_list(self):
return self.fetch_vars
def get_port_list(self):
return self.port_list
def load_model_config(self, model_config):
self.model_config = model_config
def set_gpus(self, gpus):
self.gpus = [int(x) for x in gpus.split(",")]
def _prepare_one_server(self,
workdir="conf",
port=9292,
gpuid=0,
thread_num=2,
mem_optim=True,
ir_optim=False):
device = "gpu"
if gpuid == -1:
device = "cpu"
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.load_model_config(self.model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
if self.fetch_vars is None:
self.fetch_vars = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def prepare_server(self,
workdir="",
device="gpu",
mem_optim=True,
ir_optim=False):
self.workdir = workdir
self.device = device
default_port = 12000
while len(self.port_list) < len(self.gpus):
self.port_list.append(self.available_port_generator.next())
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(
self.workdir,
self.port_list[0],
-1,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self._prepare_one_server(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
def start_server(self):
import socket
for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._start_one_server, args=(i, ))
self.server_pros.append(p)
for p in self.server_pros:
p.start()
class DefaultPipelineServer(object): class DefaultPipelineServer(object):
def __init__(self, available_port_generator): def __init__(self, available_port_generator):
self.server = pipeline.PipelineServer() self.server = pipeline.PipelineServer()
...@@ -149,26 +47,34 @@ class DefaultPipelineServer(object): ...@@ -149,26 +47,34 @@ class DefaultPipelineServer(object):
postped_data = f_postprocess(input_dict, fetch_dict) postped_data = f_postprocess(input_dict, fetch_dict)
return postped_data return postped_data
return InternelOp self.internel_op_class = InternelOp
def init_server(self, def create_local_rpc_service_handler(self, model_config, workdir,
internel_op_class, thread_num, devices, mem_optim,
internel_op_name, ir_optim):
internel_op_endpoints, self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler(
internel_op_fetch_list, model_config=model_config,
internel_op_client_config, workdir=workdir,
internel_op_concurrency, thread_num=thread_num,
internel_op_timeout=-1, devices=devices,
internel_op_retry=1, mem_optim=mem_optim,
internel_op_batch_size=1, ir_optim=ir_optim,
internel_op_auto_batching_timeout=None): available_port_generator=self.available_port_generator)
def init_pipeline_server(self,
internel_op_name,
internel_op_fetch_list=[],
internel_op_concurrency=4,
internel_op_timeout=-1,
internel_op_retry=1,
internel_op_batch_size=1,
internel_op_auto_batching_timeout=None):
read_op = pipeline.RequestOp() read_op = pipeline.RequestOp()
internel_op = internel_op_class( internel_op = self.internel_op_class(
name=internel_op_name, name=internel_op_name,
input_ops=[read_op], input_ops=[read_op],
server_endpoints=internel_op_endpoints,
fetch_list=internel_op_fetch_list, fetch_list=internel_op_fetch_list,
client_config=internel_op_client_config, local_rpc_service_handler=self.local_rpc_service_handler,
concurrency=internel_op_concurrency, concurrency=internel_op_concurrency,
timeout=internel_op_timeout, timeout=internel_op_timeout,
retry=internel_op_retry, retry=internel_op_retry,
...@@ -177,16 +83,16 @@ class DefaultPipelineServer(object): ...@@ -177,16 +83,16 @@ class DefaultPipelineServer(object):
response_op = pipeline.ResponseOp(input_ops=[internel_op]) response_op = pipeline.ResponseOp(input_ops=[internel_op])
self.server.set_response_op(response_op) self.server.set_response_op(response_op)
def prepare_server(self, def prepare_pipeline_server(self,
rpc_port, rpc_port,
http_port, http_port,
worker_num, worker_num,
build_dag_each_worker=False, build_dag_each_worker=False,
is_thread_op=False, is_thread_op=False,
client_type="brpc", client_type="brpc",
retry=1, retry=1,
use_profile=False, use_profile=False,
tracer_interval_s=-1): tracer_interval_s=-1):
default_server_conf = { default_server_conf = {
"port": rpc_port, "port": rpc_port,
"worker_num": worker_num, "worker_num": worker_num,
...@@ -204,76 +110,61 @@ class DefaultPipelineServer(object): ...@@ -204,76 +110,61 @@ class DefaultPipelineServer(object):
} }
self.server.prepare_server(yml_dict=default_server_conf) self.server.prepare_server(yml_dict=default_server_conf)
def start_server(self): def start_pipeline_server(self):
self.server.start_local_rpc_service()
self.server.run_server() self.server.run_server()
class PipelineWebService(object): class DefaultPipelineWebService(object):
def __init__(self, name="default"): def __init__(self, name="default"):
self.name = name self.name = name
self.port = None self.port = None
self.model_config = None self.model_config = None
self.gpus = ""
self.available_port_generator = AvailablePortGenerator(12000) self.available_port_generator = AvailablePortGenerator(12000)
self.default_rpc_server = DefaultRpcServer(
self.available_port_generator)
self.default_pipeline_server = DefaultPipelineServer( self.default_pipeline_server = DefaultPipelineServer(
self.available_port_generator) self.available_port_generator)
def load_model_config(self, model_config): def load_model_config(self, model_config):
self.model_config = model_config self.model_config = model_config
self.default_rpc_server.load_model_config(model_config)
def set_gpus(self, gpus): def set_gpus(self, gpus):
self.default_rpc_server.set_gpus(gpus) self.gpus = gpus
def prepare_server(self, def prepare_server(self,
workdir="", workdir="workdir",
port=9393, port=9393,
device="gpu", thread_num=2,
worker_num=4, grpc_worker_num=4,
mem_optim=True, mem_optim=True,
ir_optim=False): ir_optim=False):
if not self.available_port_generator.port_is_available(port): if not self.available_port_generator.port_is_available(port):
raise SystemExit( raise SystemExit("Failed to prepare server: prot({}) is not"
"Failed to prepare server: prot({}) is not available".format( " available".format(port))
port))
self.port = port self.port = port
# rpc server self.default_pipeline_server.create_internel_op_class(self.preprocess,
self.default_rpc_server.prepare_server( self.postprocess)
self.default_pipeline_server.create_local_rpc_service_handler(
model_config=self.model_config,
workdir=workdir, workdir=workdir,
device=device, thread_num=thread_num,
devices=self.gpus,
mem_optim=mem_optim, mem_optim=mem_optim,
ir_optim=ir_optim) ir_optim=ir_optim)
rpc_endpoints = self.default_rpc_server.get_port_list() self.default_pipeline_server.init_pipeline_server(
fetch_list = self.default_rpc_server.get_fetch_list() internel_op_name=self.name)
self.default_pipeline_server.prepare_pipeline_server(
# pipeline server
internel_op_class = self.default_pipeline_server.create_internel_op_class(
self.preprocess, self.postprocess)
internel_op_endpoints = [
"127.0.0.1:{}".format(port) for port in rpc_endpoints
]
self.default_pipeline_server.init_server(
internel_op_class=internel_op_class,
internel_op_name=self.name,
internel_op_endpoints=internel_op_endpoints,
internel_op_fetch_list=fetch_list,
internel_op_client_config="{}/serving_server_conf.prototxt".format(
self.model_config),
internel_op_concurrency=worker_num)
self.default_pipeline_server.prepare_server(
rpc_port=self.available_port_generator.next(), rpc_port=self.available_port_generator.next(),
http_port=self.port, http_port=self.port,
worker_num=worker_num) worker_num=grpc_worker_num)
def run_service(self): def run_service(self):
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address: http://{}:{}/prediction" print("web service address: http://{}:{}/prediction"
.format(localIP, self.port)) .format(localIP, self.port))
self.default_rpc_server.start_server() self.default_pipeline_server.start_pipeline_server()
self.default_pipeline_server.start_server()
def preprocess(self, feed_dict): def preprocess(self, feed_dict):
return feed_dict return feed_dict
......
...@@ -124,6 +124,8 @@ class PipelineServer(object): ...@@ -124,6 +124,8 @@ class PipelineServer(object):
def start_local_rpc_service(self): def start_local_rpc_service(self):
# only brpc now # only brpc now
if self._conf["dag"]["client_type"] != "brpc":
raise ValueError("Local Servoce Version must be brpc type now.")
used_op, _ = DAG.get_use_ops(self._response_op) used_op, _ = DAG.get_use_ops(self._response_op)
for op in used_op: for op in used_op:
if not isinstance(op, RequestOp): if not isinstance(op, RequestOp):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册