提交 a4eec7d5 编写于 作者: B barriery

add cpu version DefaultPipelineWebService

上级 edaa1bc8
......@@ -21,6 +21,151 @@ from paddle_serving_client import Client
from contextlib import closing
import socket
from paddle_serving_server import pipeline
from paddle_serving_server.pipeline.util import AvailablePortGenerator
class DefaultPipelineServer(object):
def __init__(self, available_port_generator):
self.server = pipeline.PipelineServer()
self.available_port_generator = available_port_generator
def create_internel_op_class(self, f_preprocess, f_postprocess):
class InternelOp(pipeline.Op):
# f_preprocess and f_postprocess use variables
# in closures, so init_op function is not necessary.
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
preped_data = f_preprocess(input_dict)
return preped_data
def postprocess(self, input_dicts, fetch_dict):
(_, input_dict), = input_dicts.items()
postped_data = f_postprocess(input_dict, fetch_dict)
return postped_data
self.internel_op_class = InternelOp
def create_local_rpc_service_handler(self, model_config, workdir,
thread_num, devices, mem_optim,
ir_optim):
self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler(
model_config=model_config,
workdir=workdir,
thread_num=thread_num,
devices=devices,
mem_optim=mem_optim,
ir_optim=ir_optim,
available_port_generator=self.available_port_generator)
def init_pipeline_server(self,
internel_op_name,
internel_op_fetch_list=[],
internel_op_concurrency=4,
internel_op_timeout=-1,
internel_op_retry=1,
internel_op_batch_size=1,
internel_op_auto_batching_timeout=None):
read_op = pipeline.RequestOp()
internel_op = self.internel_op_class(
name=internel_op_name,
input_ops=[read_op],
fetch_list=internel_op_fetch_list,
local_rpc_service_handler=self.local_rpc_service_handler,
concurrency=internel_op_concurrency,
timeout=internel_op_timeout,
retry=internel_op_retry,
batch_size=internel_op_batch_size,
auto_batching_timeout=internel_op_auto_batching_timeout)
response_op = pipeline.ResponseOp(input_ops=[internel_op])
self.server.set_response_op(response_op)
def prepare_pipeline_server(self,
rpc_port,
http_port,
worker_num,
build_dag_each_worker=False,
is_thread_op=False,
client_type="brpc",
retry=1,
use_profile=False,
tracer_interval_s=-1):
default_server_conf = {
"port": rpc_port,
"worker_num": worker_num,
"build_dag_each_worker": build_dag_each_worker,
"grpc_gateway_port": http_port,
"dag": {
"is_thread_op": is_thread_op,
"client_type": client_type,
"retry": retry,
"use_profile": use_profile,
"tracer": {
"interval_s": tracer_interval_s,
}
}
}
self.server.prepare_server(yml_dict=default_server_conf)
def start_pipeline_server(self):
self.server.start_local_rpc_service()
self.server.run_server()
class DefaultPipelineWebService(object):
def __init__(self, name="default"):
self.name = name
self.port = None
self.model_config = None
self.gpus = "" # use cpu
self.available_port_generator = AvailablePortGenerator(12000)
self.default_pipeline_server = DefaultPipelineServer(
self.available_port_generator)
def load_model_config(self, model_config):
self.model_config = model_config
def prepare_server(self,
workdir="workdir",
port=9393,
thread_num=2,
grpc_worker_num=4,
mem_optim=True,
ir_optim=False):
if not self.available_port_generator.port_is_available(port):
raise SystemExit("Failed to prepare server: prot({}) is not"
" available".format(port))
self.port = port
self.default_pipeline_server.create_internel_op_class(self.preprocess,
self.postprocess)
self.default_pipeline_server.create_local_rpc_service_handler(
model_config=self.model_config,
workdir=workdir,
thread_num=thread_num,
devices=self.gpus,
mem_optim=mem_optim,
ir_optim=ir_optim)
self.default_pipeline_server.init_pipeline_server(
internel_op_name=self.name)
self.default_pipeline_server.prepare_pipeline_server(
rpc_port=self.available_port_generator.next(),
http_port=self.port,
worker_num=grpc_worker_num)
def run_service(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address: http://{}:{}/prediction"
.format(localIP, self.port))
self.default_pipeline_server.start_pipeline_server()
def preprocess(self, feed_dict):
return feed_dict
def postprocess(self, feed_dict, fetch_dict):
return fetch_dict
class WebService(object):
def __init__(self, name="default_service"):
......
......@@ -125,7 +125,7 @@ class PipelineServer(object):
def start_local_rpc_service(self):
# only brpc now
if self._conf["dag"]["client_type"] != "brpc":
raise ValueError("Local Servoce Version must be brpc type now.")
raise ValueError("Local service version must be brpc type now.")
used_op, _ = DAG.get_use_ops(self._response_op)
for op in used_op:
if not isinstance(op, RequestOp):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册