# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=doc-string-missing from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.channel import ChannelDataEcode import numpy as np import logging from paddle_serving_app.reader import IMDBDataset _LOGGER = logging.getLogger(__name__) logging.basicConfig( format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', datefmt='%Y-%m-%d %H:%M', level=logging.DEBUG) class ImdbRequestOp(RequestOp): def init_op(self): self.imdb_dataset = IMDBDataset() self.imdb_dataset.load_resource('imdb.vocab') def unpack_request_package(self, request): dictdata = {} for idx, key in enumerate(request.key): if key != "words": continue words = request.value[idx] word_ids, _ = self.imdb_dataset.get_words_and_label(words) dictdata[key] = np.array(word_ids) return dictdata class CombineOp(Op): def preprocess(self, input_data, private_obj): combined_prediction = 0 for op_name, data in input_data.items(): _LOGGER.info("{}: {}".format(op_name, data["prediction"])) combined_prediction += data["prediction"] data = {"prediction": combined_prediction / 2} return data class ImdbResponseOp(ResponseOp): # Here ImdbResponseOp is consistent with the default ResponseOp implementation def pack_response_package(self, channeldata): resp = pipeline_service_pb2.Response() resp.ecode = channeldata.ecode if resp.ecode == ChannelDataEcode.OK.value: feed = channeldata.parse() # ndarray to string for name, var in feed.items(): resp.value.append(var.__repr__()) resp.key.append(name) else: resp.error_info = channeldata.error_info return resp read_op = ImdbRequestOp() bow_op = Op(name="bow", input_ops=[read_op], server_endpoints=["127.0.0.1:9393"], fetch_list=["prediction"], client_config="imdb_bow_client_conf/serving_client_conf.prototxt", concurrency=1, timeout=-1, retry=1) cnn_op = Op(name="cnn", input_ops=[read_op], server_endpoints=["127.0.0.1:9292"], fetch_list=["prediction"], client_config="imdb_cnn_client_conf/serving_client_conf.prototxt", concurrency=1, timeout=-1, retry=1) combine_op = CombineOp( name="combine", input_ops=[bow_op, cnn_op], concurrency=1, timeout=-1, retry=1) # fetch output of bow_op # response_op = ImdbResponseOp(input_ops=[bow_op]) # fetch output of combine_op response_op = ImdbResponseOp(input_ops=[combine_op]) # use default ResponseOp implementation # response_op = ResponseOp(input_ops=[combine_op]) server = PipelineServer() server.set_response_op(response_op) server.prepare_server('config.yml') server.run_server()