diff --git a/python/examples/imdb/test_ensemble_client.py b/python/examples/imdb/test_ensemble_client.py index 6cafb3389fff5a25103bcb2b3a867b73b35b9e8e..eb1e29ddd6d5a02854e4859a35474306c1c4d073 100644 --- a/python/examples/imdb/test_ensemble_client.py +++ b/python/examples/imdb/test_ensemble_client.py @@ -32,11 +32,7 @@ for i in range(3): line = 'i am very sad | 0' word_ids, label = imdb_dataset.get_words_and_label(line) feed = {"words": word_ids} - fetch = ["acc", "cost", "prediction"] + fetch = ["prediction"] fetch_maps = client.predict(feed=feed, fetch=fetch) - if len(fetch_maps) == 1: - print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1])) - else: - for model, fetch_map in fetch_maps.items(): - print("step: {}, model: {}, res: {}".format(i, model, fetch_map[ - 'prediction'][0][1])) + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map)) diff --git a/python/examples/imdb/test_multilang_ensemble_client.py b/python/examples/imdb/test_multilang_ensemble_client.py new file mode 100644 index 0000000000000000000000000000000000000000..6d745e8bb536235251bd2d95d65470aa09348445 --- /dev/null +++ b/python/examples/imdb/test_multilang_ensemble_client.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient +from imdb_reader import IMDBDataset + +client = MultiLangClient() +# If you have more than one model, make sure that the input +# and output of more than one model are the same. +client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(3): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["prediction"] + fetch_maps = client.predict(feed=feed, fetch=fetch) + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map)) diff --git a/python/examples/imdb/test_multilang_ensemble_server.py b/python/examples/imdb/test_multilang_ensemble_server.py new file mode 100644 index 0000000000000000000000000000000000000000..053aa06f0219de231415ba178135782334e56c1f --- /dev/null +++ b/python/examples/imdb/test_multilang_ensemble_server.py @@ -0,0 +1,40 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import MultiLangServer + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) + +server = MultiLangServer() +server.set_op_graph(op_graph_maker.get_op_graph()) +model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'} +server.load_model_config(model_config) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py index 4bfbb936c74de79223ce703655350efc93b84885..c3da5cd9e5f031287efdc4f7b5c05abf550dc761 100644 --- a/python/paddle_serving_client/__init__.py +++ b/python/paddle_serving_client/__init__.py @@ -384,7 +384,7 @@ class Client(object): class MultiLangClient(object): def __init__(self): self.channel_ = None - self.rpc_timeout_ms_ = 2000 + self.rpc_timeout_s_ = 2 def load_client_config(self, path): if not isinstance(path, str): @@ -393,17 +393,20 @@ class MultiLangClient(object): def add_variant(self, tag, cluster, variant_weight): # TODO - pass + raise Exception("cannot support ABtest yet") def set_rpc_timeout_ms(self, rpc_timeout): - self.rpc_timeout_ms_ = rpc_timeout + if rpc_timeout > 2000: + print("WARN: you must also need to modify Server timeout, " \ + "because the default timeout on Server side is 2000ms.") + self.rpc_timeout_s_ = rpc_timeout / 1000.0 def connect(self, endpoints): # https://github.com/tensorflow/serving/issues/1382 options = [('grpc.max_receive_message_length', 512 * 1024 * 1024), ('grpc.max_send_message_length', 512 * 1024 * 1024), ('grpc.lb_policy_name', 'round_robin')] - + # TODO: weight round robin g_endpoint = 'ipv4:{}'.format(','.join(endpoints)) self.channel_ = grpc.insecure_channel(g_endpoint, options=options) self.stub_ = multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelServiceStub( @@ -497,32 +500,42 @@ class MultiLangClient(object): return req def _unpack_resp(self, resp, fetch, is_python, need_variant_tag): - result_map = {} - inst = resp.outputs[0].insts[0] tag = resp.tag - for i, name in enumerate(fetch): - var = inst.tensor_array[i] - v_type = self.fetch_types_[name] - if is_python: - if v_type == 0: # int64 - result_map[name] = np.frombuffer(var.data, dtype="int64") - elif v_type == 1: # float32 - result_map[name] = np.frombuffer(var.data, dtype="float32") - else: - raise Exception("error type.") - else: - if v_type == 0: # int64 - result_map[name] = np.array( - list(var.int64_data), dtype="int64") - elif v_type == 1: # float32 - result_map[name] = np.array( - list(var.float_data), dtype="float32") + multi_result_map = {} + for model_result in resp.outputs: + inst = model_result.insts[0] + result_map = {} + for i, name in enumerate(fetch): + var = inst.tensor_array[i] + v_type = self.fetch_types_[name] + if is_python: + if v_type == 0: # int64 + result_map[name] = np.frombuffer( + var.data, dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.frombuffer( + var.data, dtype="float32") + else: + raise Exception("error type.") else: - raise Exception("error type.") - result_map[name].shape = list(var.shape) - if name in self.lod_tensor_set_: - result_map["{}.lod".format(name)] = np.array(list(var.lod)) - return result_map if not need_variant_tag else [result_map, tag] + if v_type == 0: # int64 + result_map[name] = np.array( + list(var.int64_data), dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.array( + list(var.float_data), dtype="float32") + else: + raise Exception("error type.") + result_map[name].shape = list(var.shape) + if name in self.lod_tensor_set_: + result_map["{}.lod".format(name)] = np.array(list(var.lod)) + multi_result_map[model_result.engine_name] = result_map + ret = None + if len(resp.outputs) == 1: + ret = multi_result_map.values()[0] + else: + ret = multi_result_map + return ret if not need_variant_tag else [ret, tag] def _done_callback_func(self, fetch, is_python, need_variant_tag): def unpack_resp(resp): @@ -539,9 +552,11 @@ class MultiLangClient(object): need_variant_tag=False, asyn=False, is_python=True, - timeout=None): - if timeout is None: - timeout = self.rpc_timeout_ms_ + timeout_ms=None): + if timeout_ms is None: + timeout = self.rpc_timeout_s_ + else: + timeout = timeout_ms / 1000.0 req = self._pack_feed_data(feed, fetch, is_python=is_python) if not asyn: resp = self.stub_.inference(req, timeout=timeout) diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 33448c1bc927b9145d9ffae8e03f09b0e8cc7f3b..dfa493741fca47e1351949ea13deaabaf183dfb6 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -442,10 +442,17 @@ class Server(object): class MultiLangServerService( multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService): - def __init__(self, model_config_path, endpoints): + def __init__(self, + model_config_path, + is_multi_model, + endpoints, + timeout_ms=None): + self.is_multi_model_ = is_multi_model from paddle_serving_client import Client self._parse_model_config(model_config_path) self.bclient_ = Client() + if timeout_ms is not None: + self.bclient_.set_rpc_timeout_ms(timeout_ms) self.bclient_.load_client_config( "{}/serving_server_conf.prototxt".format(model_config_path)) self.bclient_.connect(endpoints) @@ -510,34 +517,39 @@ class MultiLangServerService( feed_batch.append(feed_dict) return feed_batch, fetch_names, is_python - def _pack_resp_package(self, result, fetch_names, is_python, tag): + def _pack_resp_package(self, results, fetch_names, is_python, tag): + if not self.is_multi_model_: + results = {'general_infer_0': results} resp = multi_lang_general_model_service_pb2.Response() - # Only one model is supported temporarily - model_output = multi_lang_general_model_service_pb2.ModelOutput() - inst = multi_lang_general_model_service_pb2.FetchInst() - for idx, name in enumerate(fetch_names): - tensor = multi_lang_general_model_service_pb2.Tensor() - v_type = self.fetch_types_[name] - if is_python: - tensor.data = result[name].tobytes() - else: - if v_type == 0: # int64 - tensor.int64_data.extend(result[name].reshape(-1).tolist()) - elif v_type == 1: # float32 - tensor.float_data.extend(result[name].reshape(-1).tolist()) + for model_name, model_result in results.items(): + model_output = multi_lang_general_model_service_pb2.ModelOutput() + inst = multi_lang_general_model_service_pb2.FetchInst() + for idx, name in enumerate(fetch_names): + tensor = multi_lang_general_model_service_pb2.Tensor() + v_type = self.fetch_types_[name] + if is_python: + tensor.data = model_result[name].tobytes() else: - raise Exception("error type.") - tensor.shape.extend(list(result[name].shape)) - if name in self.lod_tensor_set_: - tensor.lod.extend(result["{}.lod".format(name)].tolist()) - inst.tensor_array.append(tensor) - model_output.insts.append(inst) - resp.outputs.append(model_output) + if v_type == 0: # int64 + tensor.int64_data.extend(model_result[name].reshape(-1) + .tolist()) + elif v_type == 1: # float32 + tensor.float_data.extend(model_result[name].reshape(-1) + .tolist()) + else: + raise Exception("error type.") + tensor.shape.extend(list(model_result[name].shape)) + if name in self.lod_tensor_set_: + tensor.lod.extend(model_result["{}.lod".format(name)] + .tolist()) + inst.tensor_array.append(tensor) + model_output.insts.append(inst) + model_output.engine_name = model_name + resp.outputs.append(model_output) resp.tag = tag return resp def inference(self, request, context): - print("get inference") feed_dict, fetch_names, is_python = self._unpack_request(request) data, tag = self.bclient_.predict( feed=feed_dict, fetch=fetch_names, need_variant_tag=True) @@ -550,6 +562,11 @@ class MultiLangServer(object): self.worker_num_ = 4 self.body_size_ = 64 * 1024 * 1024 self.concurrency_ = 100000 + self.bclient_timeout_ms_ = 2000 + self.is_multi_model_ = False # for model ensemble + + def set_bclient_timeout_ms(self, timeout): + self.bclient_timeout_ms_ = timeout def set_max_concurrency(self, concurrency): self.concurrency_ = concurrency @@ -590,12 +607,15 @@ class MultiLangServer(object): def use_mkl(self, flag): self.bserver_.use_mkl(flag) - def load_model_config(self, model_config_path): - if not isinstance(model_config_path, str): - raise Exception( - "MultiLangServer only supports multi-model temporarily") - self.bserver_.load_model_config(model_config_path) - self.model_config_path_ = model_config_path + def load_model_config(self, model_config_paths): + self.bserver_.load_model_config(model_config_paths) + if isinstance(model_config_paths, dict): + print("You have specified multiple model paths, please ensure " + "that the input and output of multiple models are the same.") + self.model_config_path_ = list(model_config_paths.items())[0][1] + self.is_multi_model_ = True + else: + self.model_config_path_ = model_config_paths def prepare_server(self, workdir=None, port=9292, device="cpu"): if not self._port_is_available(port): @@ -631,8 +651,11 @@ class MultiLangServer(object): options=options, maximum_concurrent_rpcs=self.concurrency_) multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server( - MultiLangServerService(self.model_config_path_, - ["0.0.0.0:{}".format(self.port_list_[0])]), + MultiLangServerService( + self.model_config_path_, + self.is_multi_model_, + ["0.0.0.0:{}".format(self.port_list_[0])], + timeout_ms=self.bclient_timeout_ms_), server) server.add_insecure_port('[::]:{}'.format(self.gport_)) server.start()