提交 cccd953d 编写于 作者: B barrierye

make model ensemble succ

上级 9f44d825
......@@ -32,11 +32,7 @@ for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch = ["prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_client import MultiLangClient
from imdb_reader import IMDBDataset
client = MultiLangClient()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import MultiLangServer
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = MultiLangServer()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
......@@ -384,7 +384,7 @@ class Client(object):
class MultiLangClient(object):
def __init__(self):
self.channel_ = None
self.rpc_timeout_ms_ = 2000
self.rpc_timeout_s_ = 2
def load_client_config(self, path):
if not isinstance(path, str):
......@@ -393,17 +393,20 @@ class MultiLangClient(object):
def add_variant(self, tag, cluster, variant_weight):
# TODO
pass
raise Exception("cannot support ABtest yet")
def set_rpc_timeout_ms(self, rpc_timeout):
self.rpc_timeout_ms_ = rpc_timeout
if rpc_timeout > 2000:
print("WARN: you must also need to modify Server timeout, " \
"because the default timeout on Server side is 2000ms.")
self.rpc_timeout_s_ = rpc_timeout / 1000.0
def connect(self, endpoints):
# https://github.com/tensorflow/serving/issues/1382
options = [('grpc.max_receive_message_length', 512 * 1024 * 1024),
('grpc.max_send_message_length', 512 * 1024 * 1024),
('grpc.lb_policy_name', 'round_robin')]
# TODO: weight round robin
g_endpoint = 'ipv4:{}'.format(','.join(endpoints))
self.channel_ = grpc.insecure_channel(g_endpoint, options=options)
self.stub_ = multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelServiceStub(
......@@ -497,32 +500,42 @@ class MultiLangClient(object):
return req
def _unpack_resp(self, resp, fetch, is_python, need_variant_tag):
result_map = {}
inst = resp.outputs[0].insts[0]
tag = resp.tag
for i, name in enumerate(fetch):
var = inst.tensor_array[i]
v_type = self.fetch_types_[name]
if is_python:
if v_type == 0: # int64
result_map[name] = np.frombuffer(var.data, dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.frombuffer(var.data, dtype="float32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
result_map[name] = np.array(
list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.array(
list(var.float_data), dtype="float32")
multi_result_map = {}
for model_result in resp.outputs:
inst = model_result.insts[0]
result_map = {}
for i, name in enumerate(fetch):
var = inst.tensor_array[i]
v_type = self.fetch_types_[name]
if is_python:
if v_type == 0: # int64
result_map[name] = np.frombuffer(
var.data, dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.frombuffer(
var.data, dtype="float32")
else:
raise Exception("error type.")
else:
raise Exception("error type.")
result_map[name].shape = list(var.shape)
if name in self.lod_tensor_set_:
result_map["{}.lod".format(name)] = np.array(list(var.lod))
return result_map if not need_variant_tag else [result_map, tag]
if v_type == 0: # int64
result_map[name] = np.array(
list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
result_map[name] = np.array(
list(var.float_data), dtype="float32")
else:
raise Exception("error type.")
result_map[name].shape = list(var.shape)
if name in self.lod_tensor_set_:
result_map["{}.lod".format(name)] = np.array(list(var.lod))
multi_result_map[model_result.engine_name] = result_map
ret = None
if len(resp.outputs) == 1:
ret = multi_result_map.values()[0]
else:
ret = multi_result_map
return ret if not need_variant_tag else [ret, tag]
def _done_callback_func(self, fetch, is_python, need_variant_tag):
def unpack_resp(resp):
......@@ -539,9 +552,11 @@ class MultiLangClient(object):
need_variant_tag=False,
asyn=False,
is_python=True,
timeout=None):
if timeout is None:
timeout = self.rpc_timeout_ms_
timeout_ms=None):
if timeout_ms is None:
timeout = self.rpc_timeout_s_
else:
timeout = timeout_ms / 1000.0
req = self._pack_feed_data(feed, fetch, is_python=is_python)
if not asyn:
resp = self.stub_.inference(req, timeout=timeout)
......
......@@ -442,10 +442,17 @@ class Server(object):
class MultiLangServerService(
multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService):
def __init__(self, model_config_path, endpoints):
def __init__(self,
model_config_path,
is_multi_model,
endpoints,
timeout_ms=None):
self.is_multi_model_ = is_multi_model
from paddle_serving_client import Client
self._parse_model_config(model_config_path)
self.bclient_ = Client()
if timeout_ms is not None:
self.bclient_.set_rpc_timeout_ms(timeout_ms)
self.bclient_.load_client_config(
"{}/serving_server_conf.prototxt".format(model_config_path))
self.bclient_.connect(endpoints)
......@@ -510,34 +517,39 @@ class MultiLangServerService(
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python
def _pack_resp_package(self, result, fetch_names, is_python, tag):
def _pack_resp_package(self, results, fetch_names, is_python, tag):
if not self.is_multi_model_:
results = {'general_infer_0': results}
resp = multi_lang_general_model_service_pb2.Response()
# Only one model is supported temporarily
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(result[name].reshape(-1).tolist())
elif v_type == 1: # float32
tensor.float_data.extend(result[name].reshape(-1).tolist())
for model_name, model_result in results.items():
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = model_result[name].tobytes()
else:
raise Exception("error type.")
tensor.shape.extend(list(result[name].shape))
if name in self.lod_tensor_set_:
tensor.lod.extend(result["{}.lod".format(name)].tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
resp.outputs.append(model_output)
if v_type == 0: # int64
tensor.int64_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 1: # float32
tensor.float_data.extend(model_result[name].reshape(-1)
.tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(model_result[name].shape))
if name in self.lod_tensor_set_:
tensor.lod.extend(model_result["{}.lod".format(name)]
.tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
model_output.engine_name = model_name
resp.outputs.append(model_output)
resp.tag = tag
return resp
def inference(self, request, context):
print("get inference")
feed_dict, fetch_names, is_python = self._unpack_request(request)
data, tag = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True)
......@@ -550,6 +562,11 @@ class MultiLangServer(object):
self.worker_num_ = 4
self.body_size_ = 64 * 1024 * 1024
self.concurrency_ = 100000
self.bclient_timeout_ms_ = 2000
self.is_multi_model_ = False # for model ensemble
def set_bclient_timeout_ms(self, timeout):
self.bclient_timeout_ms_ = timeout
def set_max_concurrency(self, concurrency):
self.concurrency_ = concurrency
......@@ -590,12 +607,15 @@ class MultiLangServer(object):
def use_mkl(self, flag):
self.bserver_.use_mkl(flag)
def load_model_config(self, model_config_path):
if not isinstance(model_config_path, str):
raise Exception(
"MultiLangServer only supports multi-model temporarily")
self.bserver_.load_model_config(model_config_path)
self.model_config_path_ = model_config_path
def load_model_config(self, model_config_paths):
self.bserver_.load_model_config(model_config_paths)
if isinstance(model_config_paths, dict):
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
self.model_config_path_ = list(model_config_paths.items())[0][1]
self.is_multi_model_ = True
else:
self.model_config_path_ = model_config_paths
def prepare_server(self, workdir=None, port=9292, device="cpu"):
if not self._port_is_available(port):
......@@ -631,8 +651,11 @@ class MultiLangServer(object):
options=options,
maximum_concurrent_rpcs=self.concurrency_)
multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
MultiLangServerService(self.model_config_path_,
["0.0.0.0:{}".format(self.port_list_[0])]),
MultiLangServerService(
self.model_config_path_,
self.is_multi_model_,
["0.0.0.0:{}".format(self.port_list_[0])],
timeout_ms=self.bclient_timeout_ms_),
server)
server.add_insecure_port('[::]:{}'.format(self.gport_))
server.start()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册