diff --git a/cmake/external/protobuf.cmake b/cmake/external/protobuf.cmake index fd4b7c5898b1128c6a73f00e678e96f117f0d91e..a7ddf193c61f0640fe330e280b4260174772f254 100644 --- a/cmake/external/protobuf.cmake +++ b/cmake/external/protobuf.cmake @@ -86,6 +86,70 @@ function(protobuf_generate_python SRCS) set(${SRCS} ${${SRCS}} PARENT_SCOPE) endfunction() +function(grpc_protobuf_generate_python SRCS) + # shameless copy from https://github.com/Kitware/CMake/blob/master/Modules/FindProtobuf.cmake + if(NOT ARGN) + message(SEND_ERROR "Error: GRPC_PROTOBUF_GENERATE_PYTHON() called without any proto files") + return() + endif() + + if(PROTOBUF_GENERATE_CPP_APPEND_PATH) + # Create an include path for each file specified + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(ABS_PATH ${ABS_FIL} PATH) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + else() + set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR}) + endif() + if(DEFINED PROTOBUF_IMPORT_DIRS AND NOT DEFINED Protobuf_IMPORT_DIRS) + set(Protobuf_IMPORT_DIRS "${PROTOBUF_IMPORT_DIRS}") + endif() + + if(DEFINED Protobuf_IMPORT_DIRS) + foreach(DIR ${Protobuf_IMPORT_DIRS}) + get_filename_component(ABS_PATH ${DIR} ABSOLUTE) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + endif() + + set(${SRCS}) + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(FIL_WE ${FIL} NAME_WE) + if(NOT PROTOBUF_GENERATE_CPP_APPEND_PATH) + get_filename_component(FIL_DIR ${FIL} DIRECTORY) + if(FIL_DIR) + set(FIL_WE "${FIL_DIR}/${FIL_WE}") + endif() + endif() + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py") + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py" + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} --python_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} + COMMENT "Running Python protocol buffer compiler on ${FIL}" + VERBATIM ) + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2_grpc.py") + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2_grpc.py" + COMMAND ${PYTHON_EXECUTABLE} -m grpc_tools.protoc --python_out ${CMAKE_CURRENT_BINARY_DIR} --grpc_python_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} + COMMENT "Running Python grpc protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + + set(${SRCS} ${${SRCS}} PARENT_SCOPE) +endfunction() + + # Print and set the protobuf library information, # finish this cmake process and exit from this file. macro(PROMPT_PROTOBUF_LIB) diff --git a/cmake/generic.cmake b/cmake/generic.cmake index 861889266b0132b8812d2d958dd6675dc631fd33..dd2fe4dc94e7213d6ad15d37f74ab1c6d41d660a 100644 --- a/cmake/generic.cmake +++ b/cmake/generic.cmake @@ -704,6 +704,15 @@ function(py_proto_compile TARGET_NAME) add_custom_target(${TARGET_NAME} ALL DEPENDS ${py_srcs}) endfunction() +function(py_grpc_proto_compile TARGET_NAME) + set(oneValueArgs "") + set(multiValueArgs SRCS) + cmake_parse_arguments(py_grpc_proto_compile "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + set(py_srcs) + grpc_protobuf_generate_python(py_srcs ${py_grpc_proto_compile_SRCS}) + add_custom_target(${TARGET_NAME} ALL DEPENDS ${py_srcs}) +endfunction() + function(py_test TARGET_NAME) if(WITH_TESTING) set(options "") diff --git a/core/configure/CMakeLists.txt b/core/configure/CMakeLists.txt index d3e5b75da96ad7a0789866a4a2c474fad988c21b..c3b0be5142896f87868cdd7c13686b87f03c573a 100644 --- a/core/configure/CMakeLists.txt +++ b/core/configure/CMakeLists.txt @@ -35,6 +35,10 @@ py_proto_compile(general_model_config_py_proto SRCS proto/general_model_config.p add_custom_target(general_model_config_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_dependencies(general_model_config_py_proto general_model_config_py_proto_init) +py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi_lang_general_model_service.proto) +add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) +add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init) + if (CLIENT) py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto) add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) @@ -51,6 +55,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD COMMENT "Copy generated general_model_config proto file into directory paddle_serving_client/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) +add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto + COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto + COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto." + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) endif() if (APP) @@ -77,6 +86,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto COMMENT "Copy generated general_model_config proto file into directory paddle_serving_server/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) +add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto + COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto + COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_server/proto." + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) else() add_custom_command(TARGET server_config_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory @@ -95,5 +109,11 @@ add_custom_command(TARGET general_model_config_py_proto POST_BUILD COMMENT "Copy generated general_model_config proto file into directory paddle_serving_server_gpu/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + +add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/proto + COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/proto + COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_server_gpu/proto." + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) endif() endif() diff --git a/core/configure/proto/multi_lang_general_model_service.proto b/core/configure/proto/multi_lang_general_model_service.proto new file mode 100644 index 0000000000000000000000000000000000000000..6e1764b23b3e6f7d9eb9a33925bcd83cfb1810bb --- /dev/null +++ b/core/configure/proto/multi_lang_general_model_service.proto @@ -0,0 +1,50 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +message Tensor { + optional bytes data = 1; + repeated int32 int_data = 2; + repeated int64 int64_data = 3; + repeated float float_data = 4; + optional int32 elem_type = 5; + repeated int32 shape = 6; + repeated int32 lod = 7; // only for fetch tensor currently +}; + +message FeedInst { repeated Tensor tensor_array = 1; }; + +message FetchInst { repeated Tensor tensor_array = 1; }; + +message Request { + repeated FeedInst insts = 1; + repeated string feed_var_names = 2; + repeated string fetch_var_names = 3; + required bool is_python = 4 [ default = false ]; +}; + +message Response { + repeated ModelOutput outputs = 1; + optional string tag = 2; +}; + +message ModelOutput { + repeated FetchInst insts = 1; + optional string engine_name = 2; +} + +service MultiLangGeneralModelService { + rpc inference(Request) returns (Response) {} +}; diff --git a/python/examples/fit_a_line/test_multilang_client.py b/python/examples/fit_a_line/test_multilang_client.py new file mode 100644 index 0000000000000000000000000000000000000000..c2c58378e523afb9724bc54a25228598d529dd7a --- /dev/null +++ b/python/examples/fit_a_line/test_multilang_client.py @@ -0,0 +1,32 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient +import sys + +client = MultiLangClient() +client.load_client_config(sys.argv[1]) +client.connect(["127.0.0.1:9393"]) + +import paddle +test_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.uci_housing.test(), buf_size=500), + batch_size=1) + +for data in test_reader(): + future = client.predict(feed={"x": data[0][0]}, fetch=["price"], asyn=True) + fetch_map = future.result() + print("{} {}".format(fetch_map["price"][0], data[0][1][0])) diff --git a/python/examples/fit_a_line/test_multilang_server.py b/python/examples/fit_a_line/test_multilang_server.py new file mode 100644 index 0000000000000000000000000000000000000000..23eb938f0ee1bf6b195509816dea5221bbfa9218 --- /dev/null +++ b/python/examples/fit_a_line/test_multilang_server.py @@ -0,0 +1,36 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import MultiLangServer + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_infer_op = op_maker.create('general_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_infer_op) +op_seq_maker.add_op(response_op) + +server = MultiLangServer() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.load_model_config(sys.argv[1]) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py index f201eefc449b3aea11db6ae209d79fb6acb05173..09a91809f52718eb7f52d234b3e8c5406883f419 100644 --- a/python/paddle_serving_client/__init__.py +++ b/python/paddle_serving_client/__init__.py @@ -21,7 +21,10 @@ import google.protobuf.text_format import numpy as np import time import sys -from .serving_client import PredictorRes + +import grpc +from .proto import multi_lang_general_model_service_pb2 +from .proto import multi_lang_general_model_service_pb2_grpc int_type = 0 float_type = 1 @@ -125,6 +128,8 @@ class Client(object): self.all_numpy_input = True self.has_numpy_input = False self.rpc_timeout_ms = 20000 + from .serving_client import PredictorRes + self.predictorres_constructor = PredictorRes def load_client_config(self, path): from .serving_client import PredictorClient @@ -304,7 +309,7 @@ class Client(object): self.profile_.record('py_prepro_1') self.profile_.record('py_client_infer_0') - result_batch_handle = PredictorRes() + result_batch_handle = self.predictorres_constructor() if self.all_numpy_input: res = self.client_handle_.numpy_predict( float_slot_batch, float_feed_names, float_shape, int_slot_batch, @@ -372,3 +377,172 @@ class Client(object): def release(self): self.client_handle_.destroy_predictor() self.client_handle_ = None + + +class MultiLangClient(object): + def __init__(self): + self.channel_ = None + + def load_client_config(self, path): + if not isinstance(path, str): + raise Exception("GClient only supports multi-model temporarily") + self._parse_model_config(path) + + def connect(self, endpoint): + self.channel_ = grpc.insecure_channel(endpoint[0]) #TODO + self.stub_ = multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelServiceStub( + self.channel_) + + def _flatten_list(self, nested_list): + for item in nested_list: + if isinstance(item, (list, tuple)): + for sub_item in self._flatten_list(item): + yield sub_item + else: + yield item + + def _parse_model_config(self, model_config_path): + model_conf = m_config.GeneralModelConfig() + f = open(model_config_path, 'r') + model_conf = google.protobuf.text_format.Merge( + str(f.read()), model_conf) + self.feed_names_ = [var.alias_name for var in model_conf.feed_var] + self.feed_types_ = {} + self.feed_shapes_ = {} + self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var] + self.fetch_types_ = {} + self.lod_tensor_set_ = set() + for i, var in enumerate(model_conf.feed_var): + self.feed_types_[var.alias_name] = var.feed_type + self.feed_shapes_[var.alias_name] = var.shape + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + else: + counter = 1 + for dim in self.feed_shapes_[var.alias_name]: + counter *= dim + for i, var in enumerate(model_conf.fetch_var): + self.fetch_types_[var.alias_name] = var.fetch_type + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + + def _pack_feed_data(self, feed, fetch, is_python): + req = multi_lang_general_model_service_pb2.Request() + req.fetch_var_names.extend(fetch) + req.feed_var_names.extend(feed.keys()) + req.is_python = is_python + feed_batch = None + if isinstance(feed, dict): + feed_batch = [feed] + elif isinstance(feed, list): + feed_batch = feed + else: + raise Exception("{} not support".format(type(feed))) + init_feed_names = False + for feed_data in feed_batch: + inst = multi_lang_general_model_service_pb2.FeedInst() + for name in req.feed_var_names: + tensor = multi_lang_general_model_service_pb2.Tensor() + var = feed_data[name] + v_type = self.feed_types_[name] + if is_python: + data = None + if isinstance(var, list): + if v_type == 0: # int64 + data = np.array(var, dtype="int64") + elif v_type == 1: # float32 + data = np.array(var, dtype="float32") + else: + raise Exception("error type.") + else: + data = var + if var.dtype == "float64": + data = data.astype("float32") + tensor.data = data.tobytes() + else: + if v_type == 0: # int64 + if isinstance(var, np.ndarray): + tensor.int64_data.extend(var.reshape(-1).tolist()) + else: + tensor.int64_data.extend(self._flatten_list(var)) + elif v_type == 1: # float32 + if isinstance(var, np.ndarray): + tensor.float_data.extend(var.reshape(-1).tolist()) + else: + tensor.float_data.extend(self._flatten_list(var)) + else: + raise Exception("error type.") + if isinstance(var, np.ndarray): + tensor.shape.extend(list(var.shape)) + else: + tensor.shape.extend(self.feed_shapes_[name]) + inst.tensor_array.append(tensor) + req.insts.append(inst) + return req + + def _unpack_resp(self, resp, fetch, is_python, need_variant_tag): + result_map = {} + inst = resp.outputs[0].insts[0] + tag = resp.tag + for i, name in enumerate(fetch): + var = inst.tensor_array[i] + v_type = self.fetch_types_[name] + if is_python: + if v_type == 0: # int64 + result_map[name] = np.frombuffer(var.data, dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.frombuffer(var.data, dtype="float32") + else: + raise Exception("error type.") + else: + if v_type == 0: # int64 + result_map[name] = np.array( + list(var.int64_data), dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.array( + list(var.float_data), dtype="float32") + else: + raise Exception("error type.") + result_map[name].shape = list(var.shape) + if name in self.lod_tensor_set_: + result_map["{}.lod".format(name)] = np.array(list(var.lod)) + return result_map if not need_variant_tag else [result_map, tag] + + def _done_callback_func(self, fetch, is_python, need_variant_tag): + def unpack_resp(resp): + return self._unpack_resp(resp, fetch, is_python, need_variant_tag) + + return unpack_resp + + def predict(self, + feed, + fetch, + need_variant_tag=False, + asyn=False, + is_python=True): + req = self._pack_feed_data(feed, fetch, is_python=is_python) + if not asyn: + resp = self.stub_.inference(req) + return self._unpack_resp( + resp, + fetch, + is_python=is_python, + need_variant_tag=need_variant_tag) + else: + call_future = self.stub_.inference.future(req) + return MultiLangPredictFuture( + call_future, + self._done_callback_func( + fetch, + is_python=is_python, + need_variant_tag=need_variant_tag)) + + +class MultiLangPredictFuture(object): + def __init__(self, call_future, callback_func): + self.call_future_ = call_future + self.callback_func_ = callback_func + + def result(self): + resp = self.call_future_.result() + return self.callback_func_(resp) diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 7356de2c2feac126272cf9a771a03146a87ef541..3306c040f11dac8834745777d65178624d922bd5 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -25,6 +25,13 @@ from contextlib import closing import collections import fcntl +import numpy as np +import grpc +from .proto import multi_lang_general_model_service_pb2 +from .proto import multi_lang_general_model_service_pb2_grpc +from multiprocessing import Pool, Process +from concurrent import futures + class OpMaker(object): def __init__(self): @@ -428,3 +435,158 @@ class Server(object): print("Going to Run Command") print(command) os.system(command) + + +class MultiLangServerService( + multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService): + def __init__(self, model_config_path, endpoints): + from paddle_serving_client import Client + self._parse_model_config(model_config_path) + self.bclient_ = Client() + self.bclient_.load_client_config( + "{}/serving_server_conf.prototxt".format(model_config_path)) + self.bclient_.connect(endpoints) + + def _parse_model_config(self, model_config_path): + model_conf = m_config.GeneralModelConfig() + f = open("{}/serving_server_conf.prototxt".format(model_config_path), + 'r') + model_conf = google.protobuf.text_format.Merge( + str(f.read()), model_conf) + self.feed_names_ = [var.alias_name for var in model_conf.feed_var] + self.feed_types_ = {} + self.feed_shapes_ = {} + self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var] + self.fetch_types_ = {} + self.lod_tensor_set_ = set() + for i, var in enumerate(model_conf.feed_var): + self.feed_types_[var.alias_name] = var.feed_type + self.feed_shapes_[var.alias_name] = var.shape + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + for i, var in enumerate(model_conf.fetch_var): + self.fetch_types_[var.alias_name] = var.fetch_type + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + + def _flatten_list(self, nested_list): + for item in nested_list: + if isinstance(item, (list, tuple)): + for sub_item in self._flatten_list(item): + yield sub_item + else: + yield item + + def _unpack_request(self, request): + feed_names = list(request.feed_var_names) + fetch_names = list(request.fetch_var_names) + is_python = request.is_python + feed_batch = [] + for feed_inst in request.insts: + feed_dict = {} + for idx, name in enumerate(feed_names): + var = feed_inst.tensor_array[idx] + v_type = self.feed_types_[name] + data = None + if is_python: + if v_type == 0: + data = np.frombuffer(var.data, dtype="int64") + elif v_type == 1: + data = np.frombuffer(var.data, dtype="float32") + else: + raise Exception("error type.") + else: + if v_type == 0: # int64 + data = np.array(list(var.int64_data), dtype="int64") + elif v_type == 1: # float32 + data = np.array(list(var.float_data), dtype="float32") + else: + raise Exception("error type.") + data.shape = list(feed_inst.tensor_array[idx].shape) + feed_dict[name] = data + feed_batch.append(feed_dict) + return feed_batch, fetch_names, is_python + + def _pack_resp_package(self, result, fetch_names, is_python, tag): + resp = multi_lang_general_model_service_pb2.Response() + # Only one model is supported temporarily + model_output = multi_lang_general_model_service_pb2.ModelOutput() + inst = multi_lang_general_model_service_pb2.FetchInst() + for idx, name in enumerate(fetch_names): + tensor = multi_lang_general_model_service_pb2.Tensor() + v_type = self.fetch_types_[name] + if is_python: + tensor.data = result[name].tobytes() + else: + if v_type == 0: # int64 + tensor.int64_data.extend(result[name].reshape(-1).tolist()) + elif v_type == 1: # float32 + tensor.float_data.extend(result[name].reshape(-1).tolist()) + else: + raise Exception("error type.") + tensor.shape.extend(list(result[name].shape)) + if name in self.lod_tensor_set_: + tensor.lod.extend(result["{}.lod".format(name)].tolist()) + inst.tensor_array.append(tensor) + model_output.insts.append(inst) + resp.outputs.append(model_output) + resp.tag = tag + return resp + + def inference(self, request, context): + feed_dict, fetch_names, is_python = self._unpack_request(request) + data, tag = self.bclient_.predict( + feed=feed_dict, fetch=fetch_names, need_variant_tag=True) + return self._pack_resp_package(data, fetch_names, is_python, tag) + + +class MultiLangServer(object): + def __init__(self, worker_num=2): + self.bserver_ = Server() + self.worker_num_ = worker_num + + def set_op_sequence(self, op_seq): + self.bserver_.set_op_sequence(op_seq) + + def load_model_config(self, model_config_path): + if not isinstance(model_config_path, str): + raise Exception( + "MultiLangServer only supports multi-model temporarily") + self.bserver_.load_model_config(model_config_path) + self.model_config_path_ = model_config_path + + def prepare_server(self, workdir=None, port=9292, device="cpu"): + default_port = 12000 + self.port_list_ = [] + for i in range(1000): + if default_port + i != port and self._port_is_available(default_port + + i): + self.port_list_.append(default_port + i) + break + self.bserver_.prepare_server( + workdir=workdir, port=self.port_list_[0], device=device) + self.gport_ = port + + def _launch_brpc_service(self, bserver): + bserver.run_server() + + def _port_is_available(self, port): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.settimeout(2) + result = sock.connect_ex(('0.0.0.0', port)) + return result != 0 + + def run_server(self): + p_bserver = Process( + target=self._launch_brpc_service, args=(self.bserver_, )) + p_bserver.start() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=self.worker_num_)) + multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server( + MultiLangServerService(self.model_config_path_, + ["0.0.0.0:{}".format(self.port_list_[0])]), + server) + server.add_insecure_port('[::]:{}'.format(self.gport_)) + server.start() + p_bserver.join() + server.wait_for_termination() diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index e40c0fa48763eaa66373e9f2149552c4f8693eb7..b26869a2380a3994b18c43c2295c6a40233bfb70 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -27,6 +27,13 @@ import argparse import collections import fcntl +import numpy as np +import grpc +from .proto import multi_lang_general_model_service_pb2 +from .proto import multi_lang_general_model_service_pb2_grpc +from multiprocessing import Pool, Process +from concurrent import futures + def serve_args(): parser = argparse.ArgumentParser("serve") @@ -469,3 +476,158 @@ class Server(object): print(command) os.system(command) + + +class MultiLangServerService( + multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService): + def __init__(self, model_config_path, endpoints): + from paddle_serving_client import Client + self._parse_model_config(model_config_path) + self.bclient_ = Client() + self.bclient_.load_client_config( + "{}/serving_server_conf.prototxt".format(model_config_path)) + self.bclient_.connect(endpoints) + + def _parse_model_config(self, model_config_path): + model_conf = m_config.GeneralModelConfig() + f = open("{}/serving_server_conf.prototxt".format(model_config_path), + 'r') + model_conf = google.protobuf.text_format.Merge( + str(f.read()), model_conf) + self.feed_names_ = [var.alias_name for var in model_conf.feed_var] + self.feed_types_ = {} + self.feed_shapes_ = {} + self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var] + self.fetch_types_ = {} + self.lod_tensor_set_ = set() + for i, var in enumerate(model_conf.feed_var): + self.feed_types_[var.alias_name] = var.feed_type + self.feed_shapes_[var.alias_name] = var.shape + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + for i, var in enumerate(model_conf.fetch_var): + self.fetch_types_[var.alias_name] = var.fetch_type + if var.is_lod_tensor: + self.lod_tensor_set_.add(var.alias_name) + + def _flatten_list(self, nested_list): + for item in nested_list: + if isinstance(item, (list, tuple)): + for sub_item in self._flatten_list(item): + yield sub_item + else: + yield item + + def _unpack_request(self, request): + feed_names = list(request.feed_var_names) + fetch_names = list(request.fetch_var_names) + is_python = request.is_python + feed_batch = [] + for feed_inst in request.insts: + feed_dict = {} + for idx, name in enumerate(feed_names): + var = feed_inst.tensor_array[idx] + v_type = self.feed_types_[name] + data = None + if is_python: + if v_type == 0: + data = np.frombuffer(var.data, dtype="int64") + elif v_type == 1: + data = np.frombuffer(var.data, dtype="float32") + else: + raise Exception("error type.") + else: + if v_type == 0: # int64 + data = np.array(list(var.int64_data), dtype="int64") + elif v_type == 1: # float32 + data = np.array(list(var.float_data), dtype="float32") + else: + raise Exception("error type.") + data.shape = list(feed_inst.tensor_array[idx].shape) + feed_dict[name] = data + feed_batch.append(feed_dict) + return feed_batch, fetch_names, is_python + + def _pack_resp_package(self, result, fetch_names, is_python, tag): + resp = multi_lang_general_model_service_pb2.Response() + # Only one model is supported temporarily + model_output = multi_lang_general_model_service_pb2.ModelOutput() + inst = multi_lang_general_model_service_pb2.FetchInst() + for idx, name in enumerate(fetch_names): + tensor = multi_lang_general_model_service_pb2.Tensor() + v_type = self.fetch_types_[name] + if is_python: + tensor.data = result[name].tobytes() + else: + if v_type == 0: # int64 + tensor.int64_data.extend(result[name].reshape(-1).tolist()) + elif v_type == 1: # float32 + tensor.float_data.extend(result[name].reshape(-1).tolist()) + else: + raise Exception("error type.") + tensor.shape.extend(list(result[name].shape)) + if name in self.lod_tensor_set_: + tensor.lod.extend(result["{}.lod".format(name)].tolist()) + inst.tensor_array.append(tensor) + model_output.insts.append(inst) + resp.outputs.append(model_output) + resp.tag = tag + return resp + + def inference(self, request, context): + feed_dict, fetch_names, is_python = self._unpack_request(request) + data, tag = self.bclient_.predict( + feed=feed_dict, fetch=fetch_names, need_variant_tag=True) + return self._pack_resp_package(data, fetch_names, is_python, tag) + + +class MultiLangServer(object): + def __init__(self, worker_num=2): + self.bserver_ = Server() + self.worker_num_ = worker_num + + def set_op_sequence(self, op_seq): + self.bserver_.set_op_sequence(op_seq) + + def load_model_config(self, model_config_path): + if not isinstance(model_config_path, str): + raise Exception( + "MultiLangServer only supports multi-model temporarily") + self.bserver_.load_model_config(model_config_path) + self.model_config_path_ = model_config_path + + def prepare_server(self, workdir=None, port=9292, device="cpu"): + default_port = 12000 + self.port_list_ = [] + for i in range(1000): + if default_port + i != port and self._port_is_available(default_port + + i): + self.port_list_.append(default_port + i) + break + self.bserver_.prepare_server( + workdir=workdir, port=self.port_list_[0], device=device) + self.gport_ = port + + def _launch_brpc_service(self, bserver): + bserver.run_server() + + def _port_is_available(self, port): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.settimeout(2) + result = sock.connect_ex(('0.0.0.0', port)) + return result != 0 + + def run_server(self): + p_bserver = Process( + target=self._launch_brpc_service, args=(self.bserver_, )) + p_bserver.start() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=self.worker_num_)) + multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server( + MultiLangServerService(self.model_config_path_, + ["0.0.0.0:{}".format(self.port_list_[0])]), + server) + server.add_insecure_port('[::]:{}'.format(self.gport_)) + server.start() + p_bserver.join() + server.wait_for_termination() diff --git a/python/requirements.txt b/python/requirements.txt index d445216b3112ea3d5791045b43a6a3147865522f..4b61fa6a4f89d88338cd868134f510d179bc45b6 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1 +1,3 @@ numpy>=1.12, <=1.16.4 ; python_version<"3.5" +grpcio-tools>=1.28.1 +grpcio>=1.28.1 diff --git a/python/setup.py.client.in b/python/setup.py.client.in index c46a58733a2c6ac6785e0047ab19080e92dd5695..601cfc81f0971cf1fa480b1daaed70eb6c696494 100644 --- a/python/setup.py.client.in +++ b/python/setup.py.client.in @@ -58,7 +58,8 @@ if '${PACK}' == 'ON': REQUIRED_PACKAGES = [ - 'six >= 1.10.0', 'protobuf >= 3.1.0', 'numpy >= 1.12' + 'six >= 1.10.0', 'protobuf >= 3.1.0', 'numpy >= 1.12', 'grpcio >= 1.28.1', + 'grpcio-tools >= 1.28.1' ] if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"): diff --git a/python/setup.py.server.in b/python/setup.py.server.in index a7190ecf36c194e7d486f96e1bf8e219a7600dba..efa9a50bb8a31fc81b97dec0243316cdc9cd8af6 100644 --- a/python/setup.py.server.in +++ b/python/setup.py.server.in @@ -37,7 +37,7 @@ def python_version(): max_version, mid_version, min_version = python_version() REQUIRED_PACKAGES = [ - 'six >= 1.10.0', 'protobuf >= 3.1.0', + 'six >= 1.10.0', 'protobuf >= 3.1.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'paddle_serving_client', 'flask >= 1.1.1', 'paddle_serving_app' ] diff --git a/python/setup.py.server_gpu.in b/python/setup.py.server_gpu.in index 90db7addbcd8b1929342a893c8213a48f3c8e9e3..06b51c1c404590ed1db141f273bdc35f26c13176 100644 --- a/python/setup.py.server_gpu.in +++ b/python/setup.py.server_gpu.in @@ -37,7 +37,7 @@ def python_version(): max_version, mid_version, min_version = python_version() REQUIRED_PACKAGES = [ - 'six >= 1.10.0', 'protobuf >= 3.1.0', + 'six >= 1.10.0', 'protobuf >= 3.1.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'paddle_serving_client', 'flask >= 1.1.1', 'paddle_serving_app' ]