提交 f10a4109 编写于 作者: B barriery

add pipeline web service

上级 82ee58c1
......@@ -24,12 +24,23 @@ import sys
import numpy as np
import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator
class WebService(object):
def __init__(self, name="default_service"):
self.name = name
self.gpus = []
class DefaultRpcServer(object):
def __init__(self, available_port_generator):
self.available_port_generator = available_port_generator
self.gpus = None
self.rpc_service_list = []
self.server_pros = []
self.port_list = []
self.model_config = None
self.workdir = None
self.device = None
def get_port_list(self):
return self.port_list
def load_model_config(self, model_config):
self.model_config = model_config
......@@ -37,7 +48,7 @@ class WebService(object):
def set_gpus(self, gpus):
self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self,
def _prepare_one_server(self,
workdir="conf",
port=9292,
gpuid=0,
......@@ -69,36 +80,19 @@ class WebService(object):
server.prepare_server(workdir=workdir, port=port, device=device)
return server
def _launch_rpc_service(self, service_idx):
def _start_one_server(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self,
workdir="",
port=9393,
device="gpu",
gpuid=0,
mem_optim=True,
ir_optim=False):
self.workdir = workdir
self.port = port
self.device = device
self.gpuid = gpuid
self.port_list = []
default_port = 12000
for i in range(1000):
if self.port_is_available(default_port + i):
self.port_list.append(default_port + i)
if len(self.port_list) > len(self.gpus):
break
while len(self.port_list) < len(self.gpus):
self.port_list.append(self.available_port_generator.next())
if len(self.gpus) == 0:
# init cpu service
......@@ -113,7 +107,7 @@ class WebService(object):
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service(
self._prepare_one_server(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
......@@ -121,106 +115,159 @@ class WebService(object):
mem_optim=mem_optim,
ir_optim=ir_optim))
def _launch_web_service(self):
gpu_num = len(self.gpus)
self.client = Client()
self.client.load_client_config("{}/serving_server_conf.prototxt".format(
self.model_config))
endpoints = ""
if gpu_num > 0:
for i in range(gpu_num):
endpoints += "127.0.0.1:{},".format(self.port_list[i])
else:
endpoints = "127.0.0.1:{}".format(self.port_list[0])
self.client.connect([endpoints])
def get_prediction(self, request):
if not request.json:
abort(400)
if "fetch" not in request.json:
abort(400)
try:
feed, fetch = self.preprocess(request.json["feed"],
request.json["fetch"])
if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"]
if len(feed) == 0:
raise ValueError("empty input")
fetch_map = self.client.predict(feed=feed, fetch=fetch)
result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError as err:
result = {"result": err}
return result
def run_rpc_service(self):
def start_server(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
server_pros = []
for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._launch_rpc_service, args=(i, ))
server_pros.append(p)
for p in server_pros:
p = Process(target=self._start_one_server, args=(i, ))
self.server_pros.append(p)
for p in self.server_pros:
p.start()
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_web_service()
class DefaultPipelineServer(object):
def __init__(self, available_port_generator):
self.server = pipeline.PipelineServer()
self.available_port_generator = available_port_generator
service_name = "/" + self.name + "/prediction"
def create_internel_op_class(self, f_preprocess, f_postprocess):
class InternelOp(pipeline.Op):
def init_op(self):
pass
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
preped_data = f_preprocess(input_dict)
return preped_data
self.app_instance = app_instance
def postprocess(self, input_dicts, fetch_dict):
(_, input_dict), = input_dicts.items()
postped_data = f_postprocess(input_dict, fetch_dict)
return postped_data
# TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
app_instance = Flask(__name__)
return InternelOp
@app_instance.before_first_request
def init():
self._launch_local_predictor(gpu)
def init_server(self,
internel_op_class,
internel_op_name,
internel_op_endpoints,
internel_op_client_config,
internel_op_concurrency,
internel_op_timeout=-1,
internel_op_retry=1,
internel_op_batch_size=1,
internel_op_auto_batching_timeout=None):
read_op = pipeline.RequestOp()
internel_op = internel_op_class(
name=internel_op_name,
input_ops=[read_op],
server_endpoints=internel_op_endpoints,
client_config=internel_op_client_config,
concurrency=internel_op_concurrency,
timeout=internel_op_timeout,
retry=internel_op_retry,
batch_size=internel_op_batch_size,
auto_batching_timeout=internel_op_auto_batching_timeout)
response_op = pipeline.ResponseOp(input_ops=[internel_op])
self.server.set_response_op(response_op)
service_name = "/" + self.name + "/prediction"
def prepare_server(self,
rpc_port,
http_port,
worker_num,
build_dag_each_worker=False,
is_thread_op=False,
client_type="brpc",
retry=1,
use_profile=False,
tracer_interval_s=-1):
default_server_conf = {
"port": rpc_port,
"worker_num": worker_num,
"build_dag_each_worker": build_dag_each_worker,
"grpc_gateway_port": http_port,
"dag": {
"is_thread_op": is_thread_op,
"client_type": client_type,
"retry": retry,
"use_profile": use_profile,
"tracer": {
"interval_s": tracer_interval_s,
}
}
}
self.server.prepare_server(yml_dict=default_server_conf)
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
def start_server(self):
self.server.run_server()
self.app_instance = app_instance
def _launch_local_predictor(self, gpu):
from paddle_serving_app.local_predict import Debugger
self.client = Debugger()
self.client.load_model_config(
"{}".format(self.model_config), gpu=gpu, profile=False)
class PipelineWebService(object):
def __init__(self, name="default"):
self.name = name
self.port = None
self.model_config = None
self.available_port_generator = AvailablePortGenerator(12000)
self.default_rpc_server = DefaultRpcServer(
self.available_port_generator)
self.default_pipeline_server = DefaultPipelineServer(
self.available_port_generator)
def run_web_service(self):
self.app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=4)
def load_model_config(self, model_config):
self.model_config = model_config
self.default_rpc_server.load_model_config(model_config)
def set_gpus(self, gpus):
self.default_rpc_server.set_gpus(gpus)
def get_app_instance(self):
return self.app_instance
def prepare_server(self,
workdir="",
port=9393,
device="gpu",
worker_num=4,
mem_optim=True,
ir_optim=False):
if not self.available_port_generator.port_is_available(port):
raise SystemExit(
"Failed to prepare server: prot({}) is not available".format(
port))
self.port = port
# rpc server
self.default_rpc_server.prepare_server(
workdir=workdir,
device=device,
mem_optim=mem_optim,
ir_optim=ir_optim)
rpc_endpoints = self.default_rpc_server.get_port_list()
# pipeline server
internel_op_class = self.default_pipeline_server.create_internel_op_class(
self.preprocess, self.postprocess)
internel_op_endpoints = [
"127.0.0.1:{}".format(port) for port in rpc_endpoints
]
self.default_pipeline_server.init_server(
internel_op_class=internel_op_class,
internel_op_name=self.name,
internel_op_endpoints=internel_op_endpoints,
internel_op_client_config="{}/serving_server_conf.prototxt".format(
self.model_config),
internel_op_concurrency=worker_num)
self.default_pipeline_server.prepare_server(
rpc_port=self.available_port_generator.next(),
http_port=self.port,
worker_num=worker_num)
def run_service(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address: http://{}:{}/prediction"
.format(localIP, self.port))
self.default_rpc_server.start_server()
self.default_pipeline_server.start_server()
def preprocess(self, feed=[], fetch=[]):
return feed, fetch
def preprocess(self, feed_dict):
return feed_dict
def postprocess(self, feed=[], fetch=[], fetch_map=None):
for key in fetch_map.iterkeys():
fetch_map[key] = fetch_map[key].tolist()
return fetch_map
def postprocess(self, feed_dict, fetch_dict):
return fetch_dict
......@@ -14,7 +14,7 @@
syntax = "proto3";
package baidu.paddle_serving.pipeline_serving;
option go_package = ".;pipeline_gateway";
option go_package = ".;pipeline_serving";
import "google/api/annotations.proto";
......@@ -33,7 +33,7 @@ message Request {
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/pipeline/prediction"
post : "/prediction"
body : "*"
};
}
......
......@@ -21,7 +21,6 @@ import (
"log"
"strconv"
//"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
......@@ -50,13 +49,4 @@ func run_proxy_server(port int) error {
return http.ListenAndServe(":" + strconv.Itoa(port), mux) // proxy port
}
func main() {
/*
flag.Parse()
defer glog.Flush()
if err := run(); err != nil {
glog.Fatal(err)
}
*/
}
func main() {}
......@@ -101,8 +101,9 @@ class PipelineServer(object):
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def prepare_server(self, yml_file):
conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file)
def prepare_server(self, yml_file=None, yml_dict=None):
conf = ServerYamlConfChecker.load_server_yaml_conf(
yml_file=yml_file, yml_dict=yml_dict)
self._port = conf["port"]
if not self._port_is_available(self._port):
......@@ -173,9 +174,18 @@ class ServerYamlConfChecker(object):
pass
@staticmethod
def load_server_yaml_conf(yml_file):
with open(yml_file) as f:
conf = yaml.load(f.read())
def load_server_yaml_conf(yml_file=None, yml_dict=None):
if yml_file is not None and yml_dict is not None:
raise SystemExit("Failed to prepare_server: only one of yml_file"
" or yml_dict can be selected as the parameter.")
if yml_file is not None:
with open(yml_file) as f:
conf = yaml.load(f.read())
elif yml_dict is not None:
conf = yml_dict
else:
raise SystemExit("Failed to prepare_server: yml_file or yml_dict"
" can not be None.")
ServerYamlConfChecker.check_server_conf(conf)
ServerYamlConfChecker.check_dag_conf(conf["dag"])
ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"])
......
......@@ -17,6 +17,8 @@ import logging
import threading
import multiprocessing
import multiprocessing.managers
from contextlib import closing
import socket
if sys.version_info.major == 2:
import Queue
from Queue import PriorityQueue
......@@ -29,6 +31,26 @@ else:
_LOGGER = logging.getLogger(__name__)
class AvailablePortGenerator(object):
def __init__(self, start_port=12000):
self._curr_port = start_port
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def next(self):
while not self.port_is_available(self._curr_port):
self._curr_port += 1
self._curr_port += 1
return self._curr_port - 1
class NameGenerator(object):
# use unsafe-id-generator
def __init__(self, prefix):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册