From 580e84787938051c22c5eea5ad858c4ba9b572c3 Mon Sep 17 00:00:00 2001 From: barrierye Date: Tue, 9 Jun 2020 22:05:34 +0800 Subject: [PATCH] use future in channel && remove type def --- core/configure/CMakeLists.txt | 12 +- .../proto}/general_python_service.proto | 7 +- .../configure/proto/pyserving_channel.proto | 8 +- python/examples/fit_a_line/test_py_client.py | 3 +- python/examples/fit_a_line/test_py_server.py | 43 +-- .../general_python_service.proto | 29 -- python/paddle_serving_client/pyclient.py | 21 +- python/paddle_serving_server/pyserver.py | 309 ++++++++++++------ python/paddle_serving_server/serve.py | 26 +- 9 files changed, 269 insertions(+), 189 deletions(-) rename {python/paddle_serving_server => core/configure/proto}/general_python_service.proto (84%) rename python/paddle_serving_server/python_service_channel.proto => core/configure/proto/pyserving_channel.proto (83%) delete mode 100644 python/paddle_serving_client/general_python_service.proto diff --git a/core/configure/CMakeLists.txt b/core/configure/CMakeLists.txt index 02c8ecb7..3c4bb29e 100644 --- a/core/configure/CMakeLists.txt +++ b/core/configure/CMakeLists.txt @@ -39,6 +39,9 @@ py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init) +py_grpc_proto_compile(general_python_service_py_proto SRCS proto/general_python_service.proto) +add_custom_target(general_python_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) +add_dependencies(general_python_service_py_proto general_python_service_py_proto_init) if (CLIENT) py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto) add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) @@ -60,6 +63,12 @@ add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + +add_custom_command(TARGET general_python_service_py_proto POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto + COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto + COMMENT "Copy generated general_python_service proto file into directory paddle_serving_client/proto." + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) endif() if (APP) @@ -79,9 +88,6 @@ py_proto_compile(pyserving_channel_py_proto SRCS proto/pyserving_channel.proto) add_custom_target(pyserving_channel_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_dependencies(pyserving_channel_py_proto pyserving_channel_py_proto_init) -py_grpc_proto_compile(general_python_service_py_proto SRCS proto/general_python_service.proto) -add_custom_target(general_python_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) -add_dependencies(general_python_service_py_proto general_python_service_py_proto_init) if (NOT WITH_GPU) add_custom_command(TARGET server_config_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto diff --git a/python/paddle_serving_server/general_python_service.proto b/core/configure/proto/general_python_service.proto similarity index 84% rename from python/paddle_serving_server/general_python_service.proto rename to core/configure/proto/general_python_service.proto index 7f3af66d..4ced29e6 100644 --- a/python/paddle_serving_server/general_python_service.proto +++ b/core/configure/proto/general_python_service.proto @@ -13,6 +13,7 @@ // limitations under the License. syntax = "proto2"; +package baidu.paddle_serving.pyserving; service GeneralPythonService { rpc inference(Request) returns (Response) {} @@ -21,11 +22,15 @@ service GeneralPythonService { message Request { repeated bytes feed_insts = 1; repeated string feed_var_names = 2; + repeated bytes shape = 3; + repeated string type = 4; } message Response { repeated bytes fetch_insts = 1; repeated string fetch_var_names = 2; - required int32 is_error = 3; + required int32 ecode = 3; optional string error_info = 4; + repeated bytes shape = 5; + repeated string type = 6; } diff --git a/python/paddle_serving_server/python_service_channel.proto b/core/configure/proto/pyserving_channel.proto similarity index 83% rename from python/paddle_serving_server/python_service_channel.proto rename to core/configure/proto/pyserving_channel.proto index 76a0d99c..060f4d72 100644 --- a/python/paddle_serving_server/python_service_channel.proto +++ b/core/configure/proto/pyserving_channel.proto @@ -13,17 +13,19 @@ // limitations under the License. syntax = "proto2"; +package baidu.paddle_serving.pyserving; message ChannelData { repeated Inst insts = 1; required int32 id = 2; - optional string type = 3 - [ default = "CD" ]; // CD(channel data), CF(channel futures) - required int32 is_error = 4; + required int32 type = 3 [ default = 0 ]; + required int32 ecode = 4; optional string error_info = 5; } message Inst { required bytes data = 1; required string name = 2; + required bytes shape = 3; + required string type = 4; } diff --git a/python/examples/fit_a_line/test_py_client.py b/python/examples/fit_a_line/test_py_client.py index 76fee280..5dd46616 100644 --- a/python/examples/fit_a_line/test_py_client.py +++ b/python/examples/fit_a_line/test_py_client.py @@ -30,8 +30,7 @@ lp = LineProfiler() lp_wrapper = lp(client.predict) for i in range(1): - fetch_map = lp_wrapper( - feed={"x": x}, fetch_with_type={"combine_op_output": "float"}) + fetch_map = lp_wrapper(feed={"x": x}, fetch=["combine_op_output"]) # fetch_map = client.predict( # feed={"x": x}, fetch_with_type={"combine_op_output": "float"}) print(fetch_map) diff --git a/python/examples/fit_a_line/test_py_server.py b/python/examples/fit_a_line/test_py_server.py index ff454256..55e14d3b 100644 --- a/python/examples/fit_a_line/test_py_server.py +++ b/python/examples/fit_a_line/test_py_server.py @@ -16,29 +16,26 @@ from paddle_serving_server.pyserver import Op from paddle_serving_server.pyserver import Channel from paddle_serving_server.pyserver import PyServer -from paddle_serving_server import python_service_channel_pb2 import numpy as np import logging logging.basicConfig( format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', datefmt='%Y-%m-%d %H:%M', + #level=logging.DEBUG) level=logging.INFO) -# channel data: {name(str): data(bytes)} +# channel data: {name(str): data(narray)} class CombineOp(Op): def preprocess(self, input_data): cnt = 0 - for op_name, data in input_data.items(): + for op_name, channeldata in input_data.items(): logging.debug("CombineOp preprocess: {}".format(op_name)) - cnt += np.frombuffer(data.insts[0].data, dtype='float') - data = python_service_channel_pb2.ChannelData() - inst = python_service_channel_pb2.Inst() - inst.data = np.ndarray.tobytes(cnt) - inst.name = "combine_op_output" - data.insts.append(inst) + data = channeldata.parse() + cnt += data["prediction"] + data = {"combine_op_output": cnt} return data def postprocess(self, output_data): @@ -47,12 +44,8 @@ class CombineOp(Op): class UciOp(Op): def postprocess(self, output_data): - data = python_service_channel_pb2.ChannelData() - inst = python_service_channel_pb2.Inst() - pred = np.array(output_data["price"][0][0], dtype='float') - inst.data = np.ndarray.tobytes(pred) - inst.name = "prediction" - data.insts.append(inst) + pred = np.array(output_data["price"][0][0], dtype='float32') + data = {"prediction": pred} return data @@ -60,12 +53,10 @@ read_channel = Channel(name="read_channel") combine_channel = Channel(name="combine_channel") out_channel = Channel(name="out_channel") -cnn_op = UciOp( - name="cnn", +uci1_op = UciOp( + name="uci1", input=read_channel, - in_dtype='float', outputs=[combine_channel], - out_dtype='float', server_model="./uci_housing_model", server_port="9393", device="cpu", @@ -73,15 +64,13 @@ cnn_op = UciOp( server_name="127.0.0.1:9393", fetch_names=["price"], concurrency=1, - timeout=0.01, + timeout=0.1, retry=2) -bow_op = UciOp( - name="bow", +uci2_op = UciOp( + name="uci2", input=read_channel, - in_dtype='float', outputs=[combine_channel], - out_dtype='float', server_model="./uci_housing_model", server_port="9292", device="cpu", @@ -95,9 +84,7 @@ bow_op = UciOp( combine_op = CombineOp( name="combine", input=combine_channel, - in_dtype='float', outputs=[out_channel], - out_dtype='float', concurrency=1, timeout=-1, retry=1) @@ -109,8 +96,8 @@ pyserver = PyServer(profile=False, retry=1) pyserver.add_channel(read_channel) pyserver.add_channel(combine_channel) pyserver.add_channel(out_channel) -pyserver.add_op(cnn_op) -pyserver.add_op(bow_op) +pyserver.add_op(uci1_op) +pyserver.add_op(uci2_op) pyserver.add_op(combine_op) pyserver.prepare_server(port=8080, worker_num=2) pyserver.run_server() diff --git a/python/paddle_serving_client/general_python_service.proto b/python/paddle_serving_client/general_python_service.proto deleted file mode 100644 index 56135586..00000000 --- a/python/paddle_serving_client/general_python_service.proto +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto2"; - -service GeneralPythonService { - rpc inference(Request) returns (Response) {} -} - -message Request { - repeated bytes feed_insts = 1; - repeated string feed_var_names = 2; -} - -message Response { - repeated bytes fetch_insts = 1; - repeated string fetch_var_names = 2; -} diff --git a/python/paddle_serving_client/pyclient.py b/python/paddle_serving_client/pyclient.py index 29df85f0..c31449f4 100644 --- a/python/paddle_serving_client/pyclient.py +++ b/python/paddle_serving_client/pyclient.py @@ -13,8 +13,8 @@ # limitations under the License. # pylint: disable=doc-string-missing import grpc -import general_python_service_pb2 -import general_python_service_pb2_grpc +from .proto import general_python_service_pb2 +from .proto import general_python_service_pb2_grpc import numpy as np @@ -33,24 +33,27 @@ class PyClient(object): if not isinstance(data, np.ndarray): raise TypeError( "only numpy array type is supported temporarily.") - data2bytes = np.ndarray.tobytes(data) req.feed_var_names.append(name) - req.feed_insts.append(data2bytes) + req.feed_insts.append(data.tobytes()) + req.shape.append(np.array(data.shape, dtype="int32").tobytes()) + req.type.append(str(data.dtype)) return req - def predict(self, feed, fetch_with_type): + def predict(self, feed, fetch): if not isinstance(feed, dict): raise TypeError( "feed must be dict type with format: {name: value}.") - if not isinstance(fetch_with_type, dict): + if not isinstance(fetch, list): raise TypeError( - "fetch_with_type must be dict type with format: {name : type}.") + "fetch_with_type must be list type with format: [name].") req = self._pack_data_for_infer(feed) resp = self._stub.inference(req) fetch_map = {} for idx, name in enumerate(resp.fetch_var_names): - if name not in fetch_with_type: + if name not in fetch: continue fetch_map[name] = np.frombuffer( - resp.fetch_insts[idx], dtype=fetch_with_type[name]) + resp.fetch_insts[idx], dtype=resp.type[idx]) + fetch_map[name].shape = np.frombuffer( + resp.shape[idx], dtype="int32") return fetch_map diff --git a/python/paddle_serving_server/pyserver.py b/python/paddle_serving_server/pyserver.py index 420df4ef..73d9caa2 100644 --- a/python/paddle_serving_server/pyserver.py +++ b/python/paddle_serving_server/pyserver.py @@ -18,17 +18,19 @@ import Queue import os import sys import paddle_serving_server -from paddle_serving_client import Client +from paddle_serving_client import MultiLangClient as Client from concurrent import futures import numpy as np import grpc -import general_python_service_pb2 -import general_python_service_pb2_grpc -import python_service_channel_pb2 +from .proto import general_model_config_pb2 as m_config +from .proto import general_python_service_pb2 as pyservice_pb2 +from .proto import pyserving_channel_pb2 as channel_pb2 +from .proto import general_python_service_pb2_grpc import logging import random import time import func_timeout +import enum class _TimeProfiler(object): @@ -71,6 +73,51 @@ class _TimeProfiler(object): _profiler = _TimeProfiler() +class ChannelDataEcode(enum.Enum): + OK = 0 + TIMEOUT = 1 + + +class ChannelDataType(enum.Enum): + CHANNEL_PBDATA = 0 + CHANNEL_FUTURE = 1 + + +class ChannelData(object): + def __init__(self, + future=None, + pbdata=None, + data_id=None, + callback_func=None): + self.future = future + if pbdata is None: + if data_id is None: + raise ValueError("data_id cannot be None") + pbdata = channel_pb2.ChannelData() + pbdata.type = ChannelDataType.CHANNEL_FUTURE.value + pbdata.ecode = ChannelDataEcode.OK.value + pbdata.id = data_id + self.pbdata = pbdata + self.callback_func = callback_func + + def parse(self): + # return narray + feed = {} + if self.pbdata.type == ChannelDataType.CHANNEL_PBDATA.value: + for inst in self.pbdata.insts: + feed[inst.name] = np.frombuffer(inst.data, dtype=inst.type) + feed[inst.name].shape = np.frombuffer(inst.shape, dtype="int32") + elif self.pbdata.type == ChannelDataType.CHANNEL_FUTURE.value: + feed = self.future.result() + if self.callback_func is not None: + feed = self.callback_func(feed) + else: + raise TypeError( + self._log("Error type({}) in pbdata.type.".format( + self.pbdata.type))) + return feed + + class Channel(Queue.Queue): """ The channel used for communication between Ops. @@ -94,6 +141,7 @@ class Channel(Queue.Queue): self._maxsize = maxsize self._timeout = timeout self._name = name + self._stop = False self._cv = threading.Condition() @@ -101,7 +149,6 @@ class Channel(Queue.Queue): self._producer_res_count = {} # {data_id: count} self._push_res = {} # {data_id: {op_name: data}} - self._front_wait_interval = 0.1 # second self._consumers = {} # {op_name: idx} self._idx_consumer_num = {} # {idx: num} self._consumer_base_idx = 0 @@ -138,9 +185,10 @@ class Channel(Queue.Queue): self._idx_consumer_num[0] = 0 self._idx_consumer_num[0] += 1 - def push(self, data, op_name=None): + def push(self, channeldata, op_name=None): logging.debug( - self._log("{} try to push data: {}".format(op_name, data))) + self._log("{} try to push data: {}".format(op_name, + channeldata.pbdata))) if len(self._producers) == 0: raise Exception( self._log( @@ -148,9 +196,9 @@ class Channel(Queue.Queue): )) elif len(self._producers) == 1: with self._cv: - while True: + while self._stop is False: try: - self.put(data, timeout=0) + self.put(channeldata, timeout=0) break except Queue.Empty: self._cv.wait() @@ -163,17 +211,17 @@ class Channel(Queue.Queue): "There are multiple producers, so op_name cannot be None.")) producer_num = len(self._producers) - data_id = data.id + data_id = channeldata.pbdata.id put_data = None with self._cv: - logging.debug(self._log("{} get lock ~".format(op_name))) + logging.debug(self._log("{} get lock".format(op_name))) if data_id not in self._push_res: self._push_res[data_id] = { name: None for name in self._producers } self._producer_res_count[data_id] = 0 - self._push_res[data_id][op_name] = data + self._push_res[data_id][op_name] = channeldata if self._producer_res_count[data_id] + 1 == producer_num: put_data = self._push_res[data_id] self._push_res.pop(data_id) @@ -183,10 +231,10 @@ class Channel(Queue.Queue): if put_data is None: logging.debug( - self._log("{} push data succ, not not push to queue.". + self._log("{} push data succ, but not push to queue.". format(op_name))) else: - while True: + while self._stop is False: try: self.put(put_data, timeout=0) break @@ -208,7 +256,7 @@ class Channel(Queue.Queue): elif len(self._consumers) == 1: resp = None with self._cv: - while resp is None: + while self._stop is False and resp is None: try: resp = self.get(timeout=0) break @@ -223,11 +271,11 @@ class Channel(Queue.Queue): with self._cv: # data_idx = consumer_idx - base_idx - while self._consumers[op_name] - self._consumer_base_idx >= len( - self._front_res): + while self._stop is False and self._consumers[ + op_name] - self._consumer_base_idx >= len(self._front_res): try: - data = self.get(timeout=0) - self._front_res.append(data) + channeldata = self.get(timeout=0) + self._front_res.append(channeldata) break except Queue.Empty: self._cv.wait() @@ -256,14 +304,17 @@ class Channel(Queue.Queue): logging.debug(self._log("multi | {} get data succ!".format(op_name))) return resp # reference, read only + def stop(self): + #TODO + self.close() + self._stop = True + class Op(object): def __init__(self, name, input, - in_dtype, outputs, - out_dtype, server_model=None, server_port=None, device=None, @@ -278,9 +329,7 @@ class Op(object): self._name = name # to identify the type of OP, it must be globally unique self._concurrency = concurrency # amount of concurrency self.set_input(input) - self._in_dtype = in_dtype self.set_outputs(outputs) - self._out_dtype = out_dtype self._client = None if client_config is not None and \ server_name is not None and \ @@ -324,15 +373,13 @@ class Op(object): channel.add_producer(self._name) self._outputs = channels - def preprocess(self, data): - if isinstance(data, dict): + def preprocess(self, channeldata): + if isinstance(channeldata, dict): raise Exception( self._log( 'this Op has multiple previous inputs. Please override this method' )) - feed = {} - for inst in data.insts: - feed[inst.name] = np.frombuffer(inst.data, dtype=self._in_dtype) + feed = channeldata.parse() return feed def midprocess(self, data): @@ -343,47 +390,58 @@ class Op(object): format(type(data)))) logging.debug(self._log('data: {}'.format(data))) logging.debug(self._log('fetch: {}'.format(self._fetch_names))) - fetch_map = self._client.predict(feed=data, fetch=self._fetch_names) - logging.debug(self._log("finish predict")) - return fetch_map + call_future = self._client.predict( + feed=data, fetch=self._fetch_names, asyn=True) + logging.debug(self._log("get call_future")) + return call_future def postprocess(self, output_data): raise Exception( self._log( - 'Please override this method to convert data to the format in channel.' + 'Please override this method to convert data to the format in channel.' \ + ' The return value format should be in {name(str): var(narray)}' )) - def errorprocess(self, error_info): - data = python_service_channel_pb2.ChannelData() - data.is_error = 1 + def errorprocess(self, error_info, data_id): + data = channel_pb2.ChannelData() + data.ecode = 1 + data.id = data_id data.error_info = error_info return data def stop(self): + self._input.stop() + for channel in self._outputs: + channel.stop() self._run = False + def _parse_channeldata(self, channeldata): + data_id, error_data = None, None + if isinstance(channeldata, dict): + parsed_data = {} + key = channeldata.keys()[0] + data_id = channeldata[key].pbdata.id + for _, data in channeldata.items(): + if data.pbdata.ecode != 0: + error_data = data + break + else: + data_id = channeldata.pbdata.id + if channeldata.pbdata.ecode != 0: + error_data = channeldata.pbdata + return data_id, error_data + def start(self, concurrency_idx): self._run = True while self._run: _profiler.record("{}{}-get_0".format(self._name, concurrency_idx)) input_data = self._input.front(self._name) _profiler.record("{}{}-get_1".format(self._name, concurrency_idx)) - data_id = None - output_data = None - error_data = None logging.debug(self._log("input_data: {}".format(input_data))) - if isinstance(input_data, dict): - key = input_data.keys()[0] - data_id = input_data[key].id - for _, data in input_data.items(): - if data.is_error != 0: - error_data = data - break - else: - data_id = input_data.id - if input_data.is_error != 0: - error_data = input_data + data_id, error_data = self._parse_channeldata(input_data) + + output_data = None if error_data is None: _profiler.record("{}{}-prep_0".format(self._name, concurrency_idx)) @@ -391,6 +449,7 @@ class Op(object): _profiler.record("{}{}-prep_1".format(self._name, concurrency_idx)) + call_future = None error_info = None if self.with_serving(): for i in range(self._retry): @@ -398,7 +457,7 @@ class Op(object): concurrency_idx)) if self._timeout > 0: try: - middata = func_timeout.func_timeout( + call_future = func_timeout.func_timeout( self._timeout, self.midprocess, args=(data, )) @@ -411,38 +470,48 @@ class Op(object): error_info = "{}({}): {}".format( self._name, concurrency_idx, e) else: - middata = self.midprocess(data) + call_future = self.midprocess(data) _profiler.record("{}{}-midp_1".format(self._name, concurrency_idx)) - if error_info is None: - data = middata - break if i + 1 < self._retry: error_info = None logging.warn( self._log("warn: timeout, retry({})".format(i + 1))) - _profiler.record("{}{}-postp_0".format(self._name, concurrency_idx)) if error_info is not None: - output_data = self.errorprocess(error_info) + error_data = self.errorprocess(error_info, data_id) + output_data = ChannelData(pbdata=error_data) else: - output_data = self.postprocess(data) - - if not isinstance(output_data, - python_service_channel_pb2.ChannelData): - raise TypeError( - self._log( - 'output_data must be ChannelData type, but get {}'. - format(type(output_data)))) - output_data.is_error = 0 + if self.with_serving(): # use call_future + output_data = ChannelData( + future=call_future, + data_id=data_id, + callback_func=self.postprocess) + else: + post_data = self.postprocess(data) + if not isinstance(post_data, dict): + raise TypeError( + self._log( + 'output_data must be dict type, but get {}'. + format(type(output_data)))) + pbdata = channel_pb2.ChannelData() + for name, value in post_data.items(): + inst = channel_pb2.Inst() + inst.data = value.tobytes() + inst.name = name + inst.shape = np.array( + value.shape, dtype="int32").tobytes() + inst.type = str(value.dtype) + pbdata.insts.append(inst) + pbdata.ecode = 0 + pbdata.id = data_id + output_data = ChannelData(pbdata=pbdata) _profiler.record("{}{}-postp_1".format(self._name, concurrency_idx)) - - output_data.id = data_id else: - output_data = error_data + output_data = ChannelData(pbdata=error_data) _profiler.record("{}{}-push_0".format(self._name, concurrency_idx)) for channel in self._outputs: @@ -498,13 +567,14 @@ class GeneralPythonService( def _recive_out_channel_func(self): while True: - data = self._out_channel.front(self._name) - if not isinstance(data, python_service_channel_pb2.ChannelData): + channeldata = self._out_channel.front(self._name) + if not isinstance(channeldata, ChannelData): raise TypeError( self._log('data must be ChannelData type, but get {}'. - format(type(data)))) + format(type(channeldata)))) with self._cv: - self._globel_resp_dict[data.id] = data + data_id = channeldata.pbdata.id + self._globel_resp_dict[data_id] = channeldata self._cv.notify_all() def _get_next_id(self): @@ -523,34 +593,58 @@ class GeneralPythonService( def _pack_data_for_infer(self, request): logging.debug(self._log('start inferce')) - data = python_service_channel_pb2.ChannelData() + pbdata = channel_pb2.ChannelData() data_id = self._get_next_id() - data.id = data_id - data.is_error = 0 + pbdata.id = data_id for idx, name in enumerate(request.feed_var_names): logging.debug( self._log('name: {}'.format(request.feed_var_names[idx]))) logging.debug(self._log('data: {}'.format(request.feed_insts[idx]))) - inst = python_service_channel_pb2.Inst() + inst = channel_pb2.Inst() inst.data = request.feed_insts[idx] + inst.shape = request.shape[idx] inst.name = name - data.insts.append(inst) - return data, data_id + inst.type = request.type[idx] + pbdata.insts.append(inst) + pbdata.ecode = 0 #TODO: parse request error + return ChannelData(pbdata=pbdata), data_id - def _pack_data_for_resp(self, data): - logging.debug(self._log('get data')) - resp = general_python_service_pb2.Response() + def _pack_data_for_resp(self, channeldata): + logging.debug(self._log('get channeldata')) logging.debug(self._log('gen resp')) - logging.debug(data) - resp.is_error = data.is_error - if resp.is_error == 0: - for inst in data.insts: - logging.debug(self._log('append data')) - resp.fetch_insts.append(inst.data) - logging.debug(self._log('append name')) - resp.fetch_var_names.append(inst.name) + resp = pyservice_pb2.Response() + resp.ecode = channeldata.pbdata.ecode + if resp.ecode == 0: + if channeldata.pbdata.type == ChannelDataType.CHANNEL_PBDATA.value: + for inst in channeldata.pbdata.insts: + logging.debug(self._log('append data')) + resp.fetch_insts.append(inst.data) + logging.debug(self._log('append name')) + resp.fetch_var_names.append(inst.name) + logging.debug(self._log('append shape')) + resp.shape.append(inst.shape) + logging.debug(self._log('append type')) + resp.type.append(inst.type) + elif channeldata.pbdata.type == ChannelDataType.CHANNEL_FUTURE.value: + feed = channeldata.futures.result() + if channeldata.callback_func is not None: + feed = channeldata.callback_func(feed) + for name, var in feed: + logging.debug(self._log('append data')) + resp.fetch_insts.append(var.tobytes()) + logging.debug(self._log('append name')) + resp.fetch_var_names.append(name) + logging.debug(self._log('append shape')) + resp.shape.append( + np.array( + var.shape, dtype="int32").tobytes()) + resp.type.append(str(var.dtype)) + else: + raise TypeError( + self._log("Error type({}) in pbdata.type.".format( + self.pbdata.type))) else: - resp.error_info = data.error_info + resp.error_info = channeldata.pbdata.error_info return resp def inference(self, request, context): @@ -558,6 +652,7 @@ class GeneralPythonService( data, data_id = self._pack_data_for_infer(request) _profiler.record("{}-prepack_1".format(self._name)) + resp_channeldata = None for i in range(self._retry): logging.debug(self._log('push data')) _profiler.record("{}-push_0".format(self._name)) @@ -565,17 +660,17 @@ class GeneralPythonService( _profiler.record("{}-push_1".format(self._name)) logging.debug(self._log('wait for infer')) - resp_data = None _profiler.record("{}-fetch_0".format(self._name)) - resp_data = self._get_data_in_globel_resp_dict(data_id) + resp_channeldata = self._get_data_in_globel_resp_dict(data_id) _profiler.record("{}-fetch_1".format(self._name)) - if resp_data.is_error == 0: + if resp_channeldata.pbdata.ecode == 0: break - logging.warn("retry({}): {}".format(i + 1, resp_data.error_info)) + logging.warn("retry({}): {}".format( + i + 1, resp_channeldata.pbdata.error_info)) _profiler.record("{}-postpack_0".format(self._name)) - resp = self._pack_data_for_resp(resp_data) + resp = self._pack_data_for_resp(resp_channeldata) _profiler.record("{}-postpack_1".format(self._name)) _profiler.print_profile() return resp @@ -600,7 +695,7 @@ class PyServer(object): self._ops.append(op) def gen_desc(self): - logging.info('here will generate desc for paas') + logging.info('here will generate desc for PAAS') pass def prepare_server(self, port, worker_num): @@ -638,6 +733,10 @@ class PyServer(object): th.start() self._op_threads.append(th) + def _stop_ops(self): + for op in self._ops: + op.stop() + def run_server(self): self._run_ops() server = grpc.server( @@ -647,12 +746,10 @@ class PyServer(object): self._retry), server) server.add_insecure_port('[::]:{}'.format(self._port)) server.start() - try: - for th in self._op_threads: - th.join() - server.join() - except KeyboardInterrupt: - server.stop(0) + server.wait_for_termination() + self._stop_ops() # TODO + for th in self._op_threads: + th.join() def prepare_serving(self, op): model_path = op._server_model @@ -660,11 +757,11 @@ class PyServer(object): device = op._device if device == "cpu": - cmd = "python -m paddle_serving_server.serve --model {} --thread 4 --port {} &>/dev/null &".format( - model_path, port) + cmd = "(Use MultiLangServer) python -m paddle_serving_server.serve" \ + " --model {} --thread 4 --port {} --use_multilang &>/dev/null &".format(model_path, port) else: - cmd = "python -m paddle_serving_server_gpu.serve --model {} --thread 4 --port {} &>/dev/null &".format( - model_path, port) + cmd = "(Use MultiLangServer) python -m paddle_serving_server_gpu.serve" \ + " --model {} --thread 4 --port {} --use_multilang &>/dev/null &".format(model_path, port) # run a server (not in PyServing) logging.info("run a server (not in PyServing): {}".format(cmd)) return diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index 894b0c5b..6ddee04f 100644 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -49,6 +49,10 @@ def parse_args(): # pylint: disable=doc-string-missing type=int, default=512 * 1024 * 1024, help="Limit sizes of messages") + parser.add_argument( + "--use_multilang", + action='store_true', + help="Use Multi-language-service") return parser.parse_args() @@ -63,6 +67,7 @@ def start_standard_model(): # pylint: disable=doc-string-missing ir_optim = args.ir_optim max_body_size = args.max_body_size use_mkl = args.use_mkl + use_multilang = args.use_multilang if model == "": print("You must specify your serving model") @@ -79,14 +84,19 @@ def start_standard_model(): # pylint: disable=doc-string-missing op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_response_op) - server = serving.Server() - server.set_op_sequence(op_seq_maker.get_op_sequence()) - server.set_num_threads(thread_num) - server.set_memory_optimize(mem_optim) - server.set_ir_optimize(ir_optim) - server.use_mkl(use_mkl) - server.set_max_body_size(max_body_size) - server.set_port(port) + server = None + if use_multilang: + server = serving.MultiLangServer() + server.set_op_sequence(op_seq_maker.get_op_sequence()) + else: + server = serving.Server() + server.set_op_sequence(op_seq_maker.get_op_sequence()) + server.set_num_threads(thread_num) + server.set_memory_optimize(mem_optim) + server.set_ir_optimize(ir_optim) + server.use_mkl(use_mkl) + server.set_max_body_size(max_body_size) + server.set_port(port) server.load_model_config(model) server.prepare_server(workdir=workdir, port=port, device=device) -- GitLab