提交 2ab63eef 编写于 作者: B barriery 提交者: GitHub

Merge pull request #654 from barrierye/grpc-client

add grpc impl (like web service)
......@@ -86,6 +86,70 @@ function(protobuf_generate_python SRCS)
set(${SRCS} ${${SRCS}} PARENT_SCOPE)
endfunction()
function(grpc_protobuf_generate_python SRCS)
# shameless copy from https://github.com/Kitware/CMake/blob/master/Modules/FindProtobuf.cmake
if(NOT ARGN)
message(SEND_ERROR "Error: GRPC_PROTOBUF_GENERATE_PYTHON() called without any proto files")
return()
endif()
if(PROTOBUF_GENERATE_CPP_APPEND_PATH)
# Create an include path for each file specified
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(ABS_PATH ${ABS_FIL} PATH)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
else()
set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR})
endif()
if(DEFINED PROTOBUF_IMPORT_DIRS AND NOT DEFINED Protobuf_IMPORT_DIRS)
set(Protobuf_IMPORT_DIRS "${PROTOBUF_IMPORT_DIRS}")
endif()
if(DEFINED Protobuf_IMPORT_DIRS)
foreach(DIR ${Protobuf_IMPORT_DIRS})
get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
endif()
set(${SRCS})
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(FIL_WE ${FIL} NAME_WE)
if(NOT PROTOBUF_GENERATE_CPP_APPEND_PATH)
get_filename_component(FIL_DIR ${FIL} DIRECTORY)
if(FIL_DIR)
set(FIL_WE "${FIL_DIR}/${FIL_WE}")
endif()
endif()
list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py")
add_custom_command(
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py"
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} --python_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE}
COMMENT "Running Python protocol buffer compiler on ${FIL}"
VERBATIM )
list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2_grpc.py")
add_custom_command(
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2_grpc.py"
COMMAND ${PYTHON_EXECUTABLE} -m grpc_tools.protoc --python_out ${CMAKE_CURRENT_BINARY_DIR} --grpc_python_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL}
COMMENT "Running Python grpc protocol buffer compiler on ${FIL}"
VERBATIM )
endforeach()
set(${SRCS} ${${SRCS}} PARENT_SCOPE)
endfunction()
# Print and set the protobuf library information,
# finish this cmake process and exit from this file.
macro(PROMPT_PROTOBUF_LIB)
......
......@@ -704,6 +704,15 @@ function(py_proto_compile TARGET_NAME)
add_custom_target(${TARGET_NAME} ALL DEPENDS ${py_srcs})
endfunction()
function(py_grpc_proto_compile TARGET_NAME)
set(oneValueArgs "")
set(multiValueArgs SRCS)
cmake_parse_arguments(py_grpc_proto_compile "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(py_srcs)
grpc_protobuf_generate_python(py_srcs ${py_grpc_proto_compile_SRCS})
add_custom_target(${TARGET_NAME} ALL DEPENDS ${py_srcs})
endfunction()
function(py_test TARGET_NAME)
if(WITH_TESTING)
set(options "")
......
......@@ -35,6 +35,10 @@ py_proto_compile(general_model_config_py_proto SRCS proto/general_model_config.p
add_custom_target(general_model_config_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_model_config_py_proto general_model_config_py_proto_init)
py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi_lang_general_model_service.proto)
add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init)
if (CLIENT)
py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto)
add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
......@@ -51,6 +55,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
if (APP)
......@@ -77,6 +86,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
else()
add_custom_command(TARGET server_config_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory
......@@ -95,5 +109,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD
COMMENT "Copy generated general_model_config proto file into directory
paddle_serving_server_gpu/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/proto
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/proto
COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_server_gpu/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
endif()
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
message Tensor {
optional bytes data = 1;
repeated int32 int_data = 2;
repeated int64 int64_data = 3;
repeated float float_data = 4;
optional int32 elem_type = 5;
repeated int32 shape = 6;
repeated int32 lod = 7; // only for fetch tensor currently
};
message FeedInst { repeated Tensor tensor_array = 1; };
message FetchInst { repeated Tensor tensor_array = 1; };
message Request {
repeated FeedInst insts = 1;
repeated string feed_var_names = 2;
repeated string fetch_var_names = 3;
required bool is_python = 4 [ default = false ];
};
message Response {
repeated ModelOutput outputs = 1;
optional string tag = 2;
};
message ModelOutput {
repeated FetchInst insts = 1;
optional string engine_name = 2;
}
service MultiLangGeneralModelService {
rpc inference(Request) returns (Response) {}
};
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient
import sys
client = MultiLangClient()
client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9393"])
import paddle
test_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.test(), buf_size=500),
batch_size=1)
for data in test_reader():
future = client.predict(feed={"x": data[0][0]}, fetch=["price"], asyn=True)
fetch_map = future.result()
print("{} {}".format(fetch_map["price"][0], data[0][1][0]))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import os
import sys
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
from paddle_serving_server import MultiLangServer
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(response_op)
server = MultiLangServer()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.load_model_config(sys.argv[1])
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
......@@ -21,7 +21,10 @@ import google.protobuf.text_format
import numpy as np
import time
import sys
from .serving_client import PredictorRes
import grpc
from .proto import multi_lang_general_model_service_pb2
from .proto import multi_lang_general_model_service_pb2_grpc
int_type = 0
float_type = 1
......@@ -125,6 +128,8 @@ class Client(object):
self.all_numpy_input = True
self.has_numpy_input = False
self.rpc_timeout_ms = 20000
from .serving_client import PredictorRes
self.predictorres_constructor = PredictorRes
def load_client_config(self, path):
from .serving_client import PredictorClient
......@@ -304,7 +309,7 @@ class Client(object):
self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0')
result_batch_handle = PredictorRes()
result_batch_handle = self.predictorres_constructor()
if self.all_numpy_input:
res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
......@@ -372,3 +377,172 @@ class Client(object):
def release(self):
self.client_handle_.destroy_predictor()
self.client_handle_ = None
class MultiLangClient(object):
def __init__(self):
self.channel_ = None
def load_client_config(self, path):
if not isinstance(path, str):
raise Exception("GClient only supports multi-model temporarily")
self._parse_model_config(path)
def connect(self, endpoint):
self.channel_ = grpc.insecure_channel(endpoint[0]) #TODO
self.stub_ = multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelServiceStub(
self.channel_)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _parse_model_config(self, model_config_path):
model_conf = m_config.GeneralModelConfig()
f = open(model_config_path, 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
else:
counter = 1
for dim in self.feed_shapes_[var.alias_name]:
counter *= dim
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _pack_feed_data(self, feed, fetch, is_python):
req = multi_lang_general_model_service_pb2.Request()
req.fetch_var_names.extend(fetch)
req.feed_var_names.extend(feed.keys())
req.is_python = is_python
feed_batch = None
if isinstance(feed, dict):
feed_batch = [feed]
elif isinstance(feed, list):
feed_batch = feed
else:
raise Exception("{} not support".format(type(feed)))
init_feed_names = False
for feed_data in feed_batch:
inst = multi_lang_general_model_service_pb2.FeedInst()
for name in req.feed_var_names:
tensor = multi_lang_general_model_service_pb2.Tensor()
var = feed_data[name]
v_type = self.feed_types_[name]
if is_python:
data = None
if isinstance(var, list):
if v_type == 0: # int64
data = np.array(var, dtype="int64")
elif v_type == 1: # float32
data = np.array(var, dtype="float32")
else:
raise Exception("error type.")
else:
data = var
if var.dtype == "float64":
data = data.astype("float32")
tensor.data = data.tobytes()
else:
if v_type == 0: # int64
if isinstance(var, np.ndarray):
tensor.int64_data.extend(var.reshape(-1).tolist())
else:
tensor.int64_data.extend(self._flatten_list(var))
elif v_type == 1: # float32
if isinstance(var, np.ndarray):
tensor.float_data.extend(var.reshape(-1).tolist())
else:
tensor.float_data.extend(self._flatten_list(var))
else:
raise Exception("error type.")
if isinstance(var, np.ndarray):
tensor.shape.extend(list(var.shape))
else:
tensor.shape.extend(self.feed_shapes_[name])
inst.tensor_array.append(tensor)
req.insts.append(inst)
return req
def _unpack_resp(self, resp, fetch, is_python, need_variant_tag):
result_map = {}
inst = resp.outputs[0].insts[0]
tag = resp.tag
for i, name in enumerate(fetch):
var = inst.tensor_array[i]
v_type = self.fetch_types_[name]
if is_python:
if v_type == 0: # int64
result_map[name] = np.frombuffer(var.data, dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.frombuffer(var.data, dtype="float32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
result_map[name] = np.array(
list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.array(
list(var.float_data), dtype="float32")
else:
raise Exception("error type.")
result_map[name].shape = list(var.shape)
if name in self.lod_tensor_set_:
result_map["{}.lod".format(name)] = np.array(list(var.lod))
return result_map if not need_variant_tag else [result_map, tag]
def _done_callback_func(self, fetch, is_python, need_variant_tag):
def unpack_resp(resp):
return self._unpack_resp(resp, fetch, is_python, need_variant_tag)
return unpack_resp
def predict(self,
feed,
fetch,
need_variant_tag=False,
asyn=False,
is_python=True):
req = self._pack_feed_data(feed, fetch, is_python=is_python)
if not asyn:
resp = self.stub_.inference(req)
return self._unpack_resp(
resp,
fetch,
is_python=is_python,
need_variant_tag=need_variant_tag)
else:
call_future = self.stub_.inference.future(req)
return MultiLangPredictFuture(
call_future,
self._done_callback_func(
fetch,
is_python=is_python,
need_variant_tag=need_variant_tag))
class MultiLangPredictFuture(object):
def __init__(self, call_future, callback_func):
self.call_future_ = call_future
self.callback_func_ = callback_func
def result(self):
resp = self.call_future_.result()
return self.callback_func_(resp)
......@@ -25,6 +25,13 @@ from contextlib import closing
import collections
import fcntl
import numpy as np
import grpc
from .proto import multi_lang_general_model_service_pb2
from .proto import multi_lang_general_model_service_pb2_grpc
from multiprocessing import Pool, Process
from concurrent import futures
class OpMaker(object):
def __init__(self):
......@@ -428,3 +435,158 @@ class Server(object):
print("Going to Run Command")
print(command)
os.system(command)
class MultiLangServerService(
multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService):
def __init__(self, model_config_path, endpoints):
from paddle_serving_client import Client
self._parse_model_config(model_config_path)
self.bclient_ = Client()
self.bclient_.load_client_config(
"{}/serving_server_conf.prototxt".format(model_config_path))
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_path):
model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(model_config_path),
'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0:
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1:
data = np.frombuffer(var.data, dtype="float32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python
def _pack_resp_package(self, result, fetch_names, is_python, tag):
resp = multi_lang_general_model_service_pb2.Response()
# Only one model is supported temporarily
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(result[name].reshape(-1).tolist())
elif v_type == 1: # float32
tensor.float_data.extend(result[name].reshape(-1).tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(result[name].shape))
if name in self.lod_tensor_set_:
tensor.lod.extend(result["{}.lod".format(name)].tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
resp.outputs.append(model_output)
resp.tag = tag
return resp
def inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_request(request)
data, tag = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True)
return self._pack_resp_package(data, fetch_names, is_python, tag)
class MultiLangServer(object):
def __init__(self, worker_num=2):
self.bserver_ = Server()
self.worker_num_ = worker_num
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def load_model_config(self, model_config_path):
if not isinstance(model_config_path, str):
raise Exception(
"MultiLangServer only supports multi-model temporarily")
self.bserver_.load_model_config(model_config_path)
self.model_config_path_ = model_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"):
default_port = 12000
self.port_list_ = []
for i in range(1000):
if default_port + i != port and self._port_is_available(default_port
+ i):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device)
self.gport_ = port
def _launch_brpc_service(self, bserver):
bserver.run_server()
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def run_server(self):
p_bserver = Process(
target=self._launch_brpc_service, args=(self.bserver_, ))
p_bserver.start()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.worker_num_))
multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
MultiLangServerService(self.model_config_path_,
["0.0.0.0:{}".format(self.port_list_[0])]),
server)
server.add_insecure_port('[::]:{}'.format(self.gport_))
server.start()
p_bserver.join()
server.wait_for_termination()
......@@ -27,6 +27,13 @@ import argparse
import collections
import fcntl
import numpy as np
import grpc
from .proto import multi_lang_general_model_service_pb2
from .proto import multi_lang_general_model_service_pb2_grpc
from multiprocessing import Pool, Process
from concurrent import futures
def serve_args():
parser = argparse.ArgumentParser("serve")
......@@ -469,3 +476,158 @@ class Server(object):
print(command)
os.system(command)
class MultiLangServerService(
multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService):
def __init__(self, model_config_path, endpoints):
from paddle_serving_client import Client
self._parse_model_config(model_config_path)
self.bclient_ = Client()
self.bclient_.load_client_config(
"{}/serving_server_conf.prototxt".format(model_config_path))
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_path):
model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(model_config_path),
'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0:
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1:
data = np.frombuffer(var.data, dtype="float32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python
def _pack_resp_package(self, result, fetch_names, is_python, tag):
resp = multi_lang_general_model_service_pb2.Response()
# Only one model is supported temporarily
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(result[name].reshape(-1).tolist())
elif v_type == 1: # float32
tensor.float_data.extend(result[name].reshape(-1).tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(result[name].shape))
if name in self.lod_tensor_set_:
tensor.lod.extend(result["{}.lod".format(name)].tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
resp.outputs.append(model_output)
resp.tag = tag
return resp
def inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_request(request)
data, tag = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True)
return self._pack_resp_package(data, fetch_names, is_python, tag)
class MultiLangServer(object):
def __init__(self, worker_num=2):
self.bserver_ = Server()
self.worker_num_ = worker_num
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def load_model_config(self, model_config_path):
if not isinstance(model_config_path, str):
raise Exception(
"MultiLangServer only supports multi-model temporarily")
self.bserver_.load_model_config(model_config_path)
self.model_config_path_ = model_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"):
default_port = 12000
self.port_list_ = []
for i in range(1000):
if default_port + i != port and self._port_is_available(default_port
+ i):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device)
self.gport_ = port
def _launch_brpc_service(self, bserver):
bserver.run_server()
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def run_server(self):
p_bserver = Process(
target=self._launch_brpc_service, args=(self.bserver_, ))
p_bserver.start()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.worker_num_))
multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
MultiLangServerService(self.model_config_path_,
["0.0.0.0:{}".format(self.port_list_[0])]),
server)
server.add_insecure_port('[::]:{}'.format(self.gport_))
server.start()
p_bserver.join()
server.wait_for_termination()
numpy>=1.12, <=1.16.4 ; python_version<"3.5"
grpcio-tools>=1.28.1
grpcio>=1.28.1
......@@ -58,7 +58,8 @@ if '${PACK}' == 'ON':
REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0', 'numpy >= 1.12'
'six >= 1.10.0', 'protobuf >= 3.1.0', 'numpy >= 1.12', 'grpcio >= 1.28.1',
'grpcio-tools >= 1.28.1'
]
if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"):
......
......@@ -37,7 +37,7 @@ def python_version():
max_version, mid_version, min_version = python_version()
REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0',
'six >= 1.10.0', 'protobuf >= 3.1.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
'paddle_serving_client', 'flask >= 1.1.1', 'paddle_serving_app'
]
......
......@@ -37,7 +37,7 @@ def python_version():
max_version, mid_version, min_version = python_version()
REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0',
'six >= 1.10.0', 'protobuf >= 3.1.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
'paddle_serving_client', 'flask >= 1.1.1', 'paddle_serving_app'
]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册