提交 937e7719 编写于 作者: B barrierye

use future in channel && remove type def

上级 1ad33514
...@@ -39,6 +39,9 @@ py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi ...@@ -39,6 +39,9 @@ py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi
add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init) add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init)
py_grpc_proto_compile(general_python_service_py_proto SRCS proto/general_python_service.proto)
add_custom_target(general_python_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_python_service_py_proto general_python_service_py_proto_init)
if (CLIENT) if (CLIENT)
py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto) py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto)
add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(sdk_configure_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
...@@ -60,6 +63,12 @@ add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD ...@@ -60,6 +63,12 @@ add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto." COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_command(TARGET general_python_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMAND cp *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated general_python_service proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif() endif()
if (APP) if (APP)
...@@ -79,9 +88,6 @@ py_proto_compile(pyserving_channel_py_proto SRCS proto/pyserving_channel.proto) ...@@ -79,9 +88,6 @@ py_proto_compile(pyserving_channel_py_proto SRCS proto/pyserving_channel.proto)
add_custom_target(pyserving_channel_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(pyserving_channel_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(pyserving_channel_py_proto pyserving_channel_py_proto_init) add_dependencies(pyserving_channel_py_proto pyserving_channel_py_proto_init)
py_grpc_proto_compile(general_python_service_py_proto SRCS proto/general_python_service.proto)
add_custom_target(general_python_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_python_service_py_proto general_python_service_py_proto_init)
if (NOT WITH_GPU) if (NOT WITH_GPU)
add_custom_command(TARGET server_config_py_proto POST_BUILD add_custom_command(TARGET server_config_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
syntax = "proto2"; syntax = "proto2";
package baidu.paddle_serving.pyserving;
service GeneralPythonService { service GeneralPythonService {
rpc inference(Request) returns (Response) {} rpc inference(Request) returns (Response) {}
...@@ -21,11 +22,15 @@ service GeneralPythonService { ...@@ -21,11 +22,15 @@ service GeneralPythonService {
message Request { message Request {
repeated bytes feed_insts = 1; repeated bytes feed_insts = 1;
repeated string feed_var_names = 2; repeated string feed_var_names = 2;
repeated bytes shape = 3;
repeated string type = 4;
} }
message Response { message Response {
repeated bytes fetch_insts = 1; repeated bytes fetch_insts = 1;
repeated string fetch_var_names = 2; repeated string fetch_var_names = 2;
required int32 is_error = 3; required int32 ecode = 3;
optional string error_info = 4; optional string error_info = 4;
repeated bytes shape = 5;
repeated string type = 6;
} }
...@@ -13,17 +13,19 @@ ...@@ -13,17 +13,19 @@
// limitations under the License. // limitations under the License.
syntax = "proto2"; syntax = "proto2";
package baidu.paddle_serving.pyserving;
message ChannelData { message ChannelData {
repeated Inst insts = 1; repeated Inst insts = 1;
required int32 id = 2; required int32 id = 2;
optional string type = 3 required int32 type = 3 [ default = 0 ];
[ default = "CD" ]; // CD(channel data), CF(channel futures) required int32 ecode = 4;
required int32 is_error = 4;
optional string error_info = 5; optional string error_info = 5;
} }
message Inst { message Inst {
required bytes data = 1; required bytes data = 1;
required string name = 2; required string name = 2;
required bytes shape = 3;
required string type = 4;
} }
...@@ -30,8 +30,7 @@ lp = LineProfiler() ...@@ -30,8 +30,7 @@ lp = LineProfiler()
lp_wrapper = lp(client.predict) lp_wrapper = lp(client.predict)
for i in range(1): for i in range(1):
fetch_map = lp_wrapper( fetch_map = lp_wrapper(feed={"x": x}, fetch=["combine_op_output"])
feed={"x": x}, fetch_with_type={"combine_op_output": "float"})
# fetch_map = client.predict( # fetch_map = client.predict(
# feed={"x": x}, fetch_with_type={"combine_op_output": "float"}) # feed={"x": x}, fetch_with_type={"combine_op_output": "float"})
print(fetch_map) print(fetch_map)
......
...@@ -16,29 +16,26 @@ ...@@ -16,29 +16,26 @@
from paddle_serving_server.pyserver import Op from paddle_serving_server.pyserver import Op
from paddle_serving_server.pyserver import Channel from paddle_serving_server.pyserver import Channel
from paddle_serving_server.pyserver import PyServer from paddle_serving_server.pyserver import PyServer
from paddle_serving_server import python_service_channel_pb2
import numpy as np import numpy as np
import logging import logging
logging.basicConfig( logging.basicConfig(
format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M', datefmt='%Y-%m-%d %H:%M',
#level=logging.DEBUG)
level=logging.INFO) level=logging.INFO)
# channel data: {name(str): data(bytes)} # channel data: {name(str): data(narray)}
class CombineOp(Op): class CombineOp(Op):
def preprocess(self, input_data): def preprocess(self, input_data):
cnt = 0 cnt = 0
for op_name, data in input_data.items(): for op_name, channeldata in input_data.items():
logging.debug("CombineOp preprocess: {}".format(op_name)) logging.debug("CombineOp preprocess: {}".format(op_name))
cnt += np.frombuffer(data.insts[0].data, dtype='float') data = channeldata.parse()
data = python_service_channel_pb2.ChannelData() cnt += data["prediction"]
inst = python_service_channel_pb2.Inst() data = {"combine_op_output": cnt}
inst.data = np.ndarray.tobytes(cnt)
inst.name = "combine_op_output"
data.insts.append(inst)
return data return data
def postprocess(self, output_data): def postprocess(self, output_data):
...@@ -47,12 +44,8 @@ class CombineOp(Op): ...@@ -47,12 +44,8 @@ class CombineOp(Op):
class UciOp(Op): class UciOp(Op):
def postprocess(self, output_data): def postprocess(self, output_data):
data = python_service_channel_pb2.ChannelData() pred = np.array(output_data["price"][0][0], dtype='float32')
inst = python_service_channel_pb2.Inst() data = {"prediction": pred}
pred = np.array(output_data["price"][0][0], dtype='float')
inst.data = np.ndarray.tobytes(pred)
inst.name = "prediction"
data.insts.append(inst)
return data return data
...@@ -60,12 +53,10 @@ read_channel = Channel(name="read_channel") ...@@ -60,12 +53,10 @@ read_channel = Channel(name="read_channel")
combine_channel = Channel(name="combine_channel") combine_channel = Channel(name="combine_channel")
out_channel = Channel(name="out_channel") out_channel = Channel(name="out_channel")
cnn_op = UciOp( uci1_op = UciOp(
name="cnn", name="uci1",
input=read_channel, input=read_channel,
in_dtype='float',
outputs=[combine_channel], outputs=[combine_channel],
out_dtype='float',
server_model="./uci_housing_model", server_model="./uci_housing_model",
server_port="9393", server_port="9393",
device="cpu", device="cpu",
...@@ -73,15 +64,13 @@ cnn_op = UciOp( ...@@ -73,15 +64,13 @@ cnn_op = UciOp(
server_name="127.0.0.1:9393", server_name="127.0.0.1:9393",
fetch_names=["price"], fetch_names=["price"],
concurrency=1, concurrency=1,
timeout=0.01, timeout=0.1,
retry=2) retry=2)
bow_op = UciOp( uci2_op = UciOp(
name="bow", name="uci2",
input=read_channel, input=read_channel,
in_dtype='float',
outputs=[combine_channel], outputs=[combine_channel],
out_dtype='float',
server_model="./uci_housing_model", server_model="./uci_housing_model",
server_port="9292", server_port="9292",
device="cpu", device="cpu",
...@@ -95,9 +84,7 @@ bow_op = UciOp( ...@@ -95,9 +84,7 @@ bow_op = UciOp(
combine_op = CombineOp( combine_op = CombineOp(
name="combine", name="combine",
input=combine_channel, input=combine_channel,
in_dtype='float',
outputs=[out_channel], outputs=[out_channel],
out_dtype='float',
concurrency=1, concurrency=1,
timeout=-1, timeout=-1,
retry=1) retry=1)
...@@ -109,8 +96,8 @@ pyserver = PyServer(profile=False, retry=1) ...@@ -109,8 +96,8 @@ pyserver = PyServer(profile=False, retry=1)
pyserver.add_channel(read_channel) pyserver.add_channel(read_channel)
pyserver.add_channel(combine_channel) pyserver.add_channel(combine_channel)
pyserver.add_channel(out_channel) pyserver.add_channel(out_channel)
pyserver.add_op(cnn_op) pyserver.add_op(uci1_op)
pyserver.add_op(bow_op) pyserver.add_op(uci2_op)
pyserver.add_op(combine_op) pyserver.add_op(combine_op)
pyserver.prepare_server(port=8080, worker_num=2) pyserver.prepare_server(port=8080, worker_num=2)
pyserver.run_server() pyserver.run_server()
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
service GeneralPythonService {
rpc inference(Request) returns (Response) {}
}
message Request {
repeated bytes feed_insts = 1;
repeated string feed_var_names = 2;
}
message Response {
repeated bytes fetch_insts = 1;
repeated string fetch_var_names = 2;
}
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import grpc import grpc
import general_python_service_pb2 from .proto import general_python_service_pb2
import general_python_service_pb2_grpc from .proto import general_python_service_pb2_grpc
import numpy as np import numpy as np
...@@ -33,24 +33,27 @@ class PyClient(object): ...@@ -33,24 +33,27 @@ class PyClient(object):
if not isinstance(data, np.ndarray): if not isinstance(data, np.ndarray):
raise TypeError( raise TypeError(
"only numpy array type is supported temporarily.") "only numpy array type is supported temporarily.")
data2bytes = np.ndarray.tobytes(data)
req.feed_var_names.append(name) req.feed_var_names.append(name)
req.feed_insts.append(data2bytes) req.feed_insts.append(data.tobytes())
req.shape.append(np.array(data.shape, dtype="int32").tobytes())
req.type.append(str(data.dtype))
return req return req
def predict(self, feed, fetch_with_type): def predict(self, feed, fetch):
if not isinstance(feed, dict): if not isinstance(feed, dict):
raise TypeError( raise TypeError(
"feed must be dict type with format: {name: value}.") "feed must be dict type with format: {name: value}.")
if not isinstance(fetch_with_type, dict): if not isinstance(fetch, list):
raise TypeError( raise TypeError(
"fetch_with_type must be dict type with format: {name : type}.") "fetch_with_type must be list type with format: [name].")
req = self._pack_data_for_infer(feed) req = self._pack_data_for_infer(feed)
resp = self._stub.inference(req) resp = self._stub.inference(req)
fetch_map = {} fetch_map = {}
for idx, name in enumerate(resp.fetch_var_names): for idx, name in enumerate(resp.fetch_var_names):
if name not in fetch_with_type: if name not in fetch:
continue continue
fetch_map[name] = np.frombuffer( fetch_map[name] = np.frombuffer(
resp.fetch_insts[idx], dtype=fetch_with_type[name]) resp.fetch_insts[idx], dtype=resp.type[idx])
fetch_map[name].shape = np.frombuffer(
resp.shape[idx], dtype="int32")
return fetch_map return fetch_map
...@@ -49,6 +49,10 @@ def parse_args(): # pylint: disable=doc-string-missing ...@@ -49,6 +49,10 @@ def parse_args(): # pylint: disable=doc-string-missing
type=int, type=int,
default=512 * 1024 * 1024, default=512 * 1024 * 1024,
help="Limit sizes of messages") help="Limit sizes of messages")
parser.add_argument(
"--use_multilang",
action='store_true',
help="Use Multi-language-service")
return parser.parse_args() return parser.parse_args()
...@@ -63,6 +67,7 @@ def start_standard_model(): # pylint: disable=doc-string-missing ...@@ -63,6 +67,7 @@ def start_standard_model(): # pylint: disable=doc-string-missing
ir_optim = args.ir_optim ir_optim = args.ir_optim
max_body_size = args.max_body_size max_body_size = args.max_body_size
use_mkl = args.use_mkl use_mkl = args.use_mkl
use_multilang = args.use_multilang
if model == "": if model == "":
print("You must specify your serving model") print("You must specify your serving model")
...@@ -79,6 +84,11 @@ def start_standard_model(): # pylint: disable=doc-string-missing ...@@ -79,6 +84,11 @@ def start_standard_model(): # pylint: disable=doc-string-missing
op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
server = None
if use_multilang:
server = serving.MultiLangServer()
server.set_op_sequence(op_seq_maker.get_op_sequence())
else:
server = serving.Server() server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num) server.set_num_threads(thread_num)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册