提交 ec039781 编写于 作者: B barriery

update code to run

上级 f10a4109
......@@ -10,15 +10,15 @@ sh get_data.sh
```
python -m paddle_serving_server_gpu.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server_gpu.serve --model imdb_bow_model --port 9393 &> bow.log &
python pipeline_server.py &>pipeline.log &
python test_pipeline_server.py &>pipeline.log &
```
## 启动客户端
```
python pipeline_client.py
python test_pipeline_client.py
```
## HTTP 测试
```
curl -X POST -k http://localhost:9999/pipeline/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}'
curl -X POST -k http://localhost:9999/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}'
```
# Pipeline Web Service
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python web_service.py &>log.txt &
```
## 测试
```
curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.web_service import PipelineWebService
import logging
import numpy as np
_LOGGER = logging.getLogger()
user_handler = logging.StreamHandler()
user_handler.setLevel(logging.INFO)
user_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(user_handler)
class UciService(PipelineWebService):
def init_separator(self):
self.separator = ","
def preprocess(self, input_dict):
_LOGGER.info(input_dict)
x_str = input_dict["x"]
input_dict["x"] = np.array(
[float(x.strip()) for x in x_str.split(self.separator)])
return input_dict
def postprocess(self, input_dict, fetch_dict):
_LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
uci_service = UciService(name="uci")
uci_service.init_separator()
uci_service.load_model_config("./uci_housing_model")
uci_service.set_gpus("0")
uci_service.prepare_server(workdir="workdir", port=18080, device="gpu")
uci_service.run_service()
......@@ -197,6 +197,9 @@ class Server(object):
self.gpuid = 0
self.model_config_paths = None # for multi-model in a workflow
def get_fetch_list(self):
return [var.alias_name for var in self.model_conf.fetch_var]
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
......
......@@ -38,6 +38,10 @@ class DefaultRpcServer(object):
self.model_config = None
self.workdir = None
self.device = None
self.fetch_vars = None
def get_fetch_list(self):
return self.fetch_vars
def get_port_list(self):
return self.port_list
......@@ -78,6 +82,8 @@ class DefaultRpcServer(object):
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
if self.fetch_vars is None:
self.fetch_vars = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
......@@ -131,9 +137,8 @@ class DefaultPipelineServer(object):
def create_internel_op_class(self, f_preprocess, f_postprocess):
class InternelOp(pipeline.Op):
def init_op(self):
pass
# f_preprocess and f_postprocess use variables
# in closures, so init_op function is not necessary.
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
preped_data = f_preprocess(input_dict)
......@@ -150,6 +155,7 @@ class DefaultPipelineServer(object):
internel_op_class,
internel_op_name,
internel_op_endpoints,
internel_op_fetch_list,
internel_op_client_config,
internel_op_concurrency,
internel_op_timeout=-1,
......@@ -161,6 +167,7 @@ class DefaultPipelineServer(object):
name=internel_op_name,
input_ops=[read_op],
server_endpoints=internel_op_endpoints,
fetch_list=internel_op_fetch_list,
client_config=internel_op_client_config,
concurrency=internel_op_concurrency,
timeout=internel_op_timeout,
......@@ -239,6 +246,7 @@ class PipelineWebService(object):
mem_optim=mem_optim,
ir_optim=ir_optim)
rpc_endpoints = self.default_rpc_server.get_port_list()
fetch_list = self.default_rpc_server.get_fetch_list()
# pipeline server
internel_op_class = self.default_pipeline_server.create_internel_op_class(
......@@ -250,6 +258,7 @@ class PipelineWebService(object):
internel_op_class=internel_op_class,
internel_op_name=self.name,
internel_op_endpoints=internel_op_endpoints,
internel_op_fetch_list=fetch_list,
internel_op_client_config="{}/serving_server_conf.prototxt".format(
self.model_config),
internel_op_concurrency=worker_num)
......@@ -271,3 +280,204 @@ class PipelineWebService(object):
def postprocess(self, feed_dict, fetch_dict):
return fetch_dict
class WebService(object):
def __init__(self, name="default_service"):
self.name = name
self.gpus = []
self.rpc_service_list = []
def load_model_config(self, model_config):
self.model_config = model_config
def set_gpus(self, gpus):
self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self,
workdir="conf",
port=9292,
gpuid=0,
thread_num=2,
mem_optim=True,
ir_optim=False):
device = "gpu"
if gpuid == -1:
device = "cpu"
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.load_model_config(self.model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
return server
def _launch_rpc_service(self, service_idx):
self.rpc_service_list[service_idx].run_server()
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self,
workdir="",
port=9393,
device="gpu",
gpuid=0,
mem_optim=True,
ir_optim=False):
self.workdir = workdir
self.port = port
self.device = device
self.gpuid = gpuid
self.port_list = []
default_port = 12000
for i in range(1000):
if self.port_is_available(default_port + i):
self.port_list.append(default_port + i)
if len(self.port_list) > len(self.gpus):
break
if len(self.gpus) == 0:
# init cpu service
self.rpc_service_list.append(
self.default_rpc_service(
self.workdir,
self.port_list[0],
-1,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
else:
for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append(
self.default_rpc_service(
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
mem_optim=mem_optim,
ir_optim=ir_optim))
def _launch_web_service(self):
gpu_num = len(self.gpus)
self.client = Client()
self.client.load_client_config("{}/serving_server_conf.prototxt".format(
self.model_config))
endpoints = ""
if gpu_num > 0:
for i in range(gpu_num):
endpoints += "127.0.0.1:{},".format(self.port_list[i])
else:
endpoints = "127.0.0.1:{}".format(self.port_list[0])
self.client.connect([endpoints])
def get_prediction(self, request):
if not request.json:
abort(400)
if "fetch" not in request.json:
abort(400)
try:
feed, fetch = self.preprocess(request.json["feed"],
request.json["fetch"])
if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"]
if len(feed) == 0:
raise ValueError("empty input")
fetch_map = self.client.predict(feed=feed, fetch=fetch)
result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError as err:
result = {"result": err}
return result
def run_rpc_service(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
server_pros = []
for i, service in enumerate(self.rpc_service_list):
p = Process(target=self._launch_rpc_service, args=(i, ))
server_pros.append(p)
for p in server_pros:
p.start()
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_web_service()
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
self.app_instance = app_instance
# TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_local_predictor(gpu)
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
self.app_instance = app_instance
def _launch_local_predictor(self, gpu):
from paddle_serving_app.local_predict import Debugger
self.client = Debugger()
self.client.load_model_config(
"{}".format(self.model_config), gpu=gpu, profile=False)
def run_web_service(self):
self.app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=4)
def get_app_instance(self):
return self.app_instance
def preprocess(self, feed=[], fetch=[]):
return feed, fetch
def postprocess(self, feed=[], fetch=[], fetch_map=None):
for key in fetch_map.iterkeys():
fetch_map[key] = fetch_map[key].tolist()
return fetch_map
......@@ -28,12 +28,12 @@ import (
gw "./proto"
)
var (
pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:18080", "endpoint of PipelineService")
)
//export run_proxy_server
func run_proxy_server(port int) error {
func run_proxy_server(grpc_port int, http_port int) error {
var (
pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:" + strconv.Itoa(grpc_port), "endpoint of PipelineService")
)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
......@@ -46,7 +46,7 @@ func run_proxy_server(port int) error {
}
log.Println("start proxy service")
return http.ListenAndServe(":" + strconv.Itoa(port), mux) // proxy port
return http.ListenAndServe(":" + strconv.Itoa(http_port), mux) // proxy port
}
func main() {}
......@@ -775,7 +775,9 @@ class RequestOp(Op):
for idx, key in enumerate(request.key):
data = request.value[idx]
try:
data = eval(data)
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e:
pass
dictdata[key] = data
......
......@@ -42,6 +42,7 @@ class PipelineClient(object):
def _pack_request_package(self, feed_dict, profile):
req = pipeline_service_pb2.Request()
np.set_printoptions(threshold=sys.maxsize)
for key, value in feed_dict.items():
req.key.append(key)
if isinstance(value, np.ndarray):
......@@ -75,7 +76,9 @@ class PipelineClient(object):
continue
data = resp.value[idx]
try:
data = eval(data)
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e:
pass
fetch_map[key] = data
......
......@@ -25,6 +25,7 @@ import yaml
from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp
from .dag import DAGExecutor
from .util import AvailablePortGenerator
_LOGGER = logging.getLogger(__name__)
......@@ -63,26 +64,28 @@ class PipelineServer(object):
self._response_op = None
self._proxy_server = None
def _grpc_gateway(self, port):
def _grpc_gateway(self, grpc_port, http_port):
import os
from ctypes import cdll
from . import gateway
lib_path = os.path.join(
os.path.dirname(gateway.__file__), "libproxy_server.so")
proxy_server = cdll.LoadLibrary(lib_path)
proxy_server.run_proxy_server(port)
proxy_server.run_proxy_server(grpc_port, http_port)
def _run_grpc_gateway(self, port):
if port <= 0:
def _run_grpc_gateway(self, grpc_port, http_port):
if http_port <= 0:
_LOGGER.info("Ignore grpc_gateway configuration.")
return
if not self._port_is_available(port):
if not AvailablePortGenerator.port_is_available(http_port):
raise SystemExit("Failed to run grpc-gateway: prot {} "
"is already used".format(port))
"is already used".format(http_port))
if self._proxy_server is not None:
raise RuntimeError("Proxy server has been started.")
self._proxy_server = multiprocessing.Process(
target=self._grpc_gateway, args=(port, ))
target=self._grpc_gateway, args=(
grpc_port,
http_port, ))
self._proxy_server.daemon = True
self._proxy_server.start()
......@@ -95,18 +98,12 @@ class PipelineServer(object):
"can only have one previous op.")
self._response_op = response_op
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def prepare_server(self, yml_file=None, yml_dict=None):
conf = ServerYamlConfChecker.load_server_yaml_conf(
yml_file=yml_file, yml_dict=yml_dict)
self._port = conf["port"]
if not self._port_is_available(self._port):
if not AvailablePortGenerator.port_is_available(self._port):
raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._port))
self._worker_num = conf["worker_num"]
......@@ -138,7 +135,8 @@ class PipelineServer(object):
worker.start()
workers.append(worker)
self._run_grpc_gateway(
self._grpc_gateway_port) # start grpc_gateway
grpc_port=self._port,
http_port=self._grpc_gateway_port) # start grpc_gateway
for worker in workers:
worker.join()
else:
......@@ -152,7 +150,8 @@ class PipelineServer(object):
server.add_insecure_port('[::]:{}'.format(self._port))
server.start()
self._run_grpc_gateway(
self._grpc_gateway_port) # start grpc_gateway
grpc_port=self._port,
http_port=self._grpc_gateway_port) # start grpc_gateway
server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
......
......@@ -35,7 +35,8 @@ class AvailablePortGenerator(object):
def __init__(self, start_port=12000):
self._curr_port = start_port
def port_is_available(self, port):
@staticmethod
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
......@@ -45,7 +46,7 @@ class AvailablePortGenerator(object):
return False
def next(self):
while not self.port_is_available(self._curr_port):
while not AvailablePortGenerator.port_is_available(self._curr_port):
self._curr_port += 1
self._curr_port += 1
return self._curr_port - 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册