test_pipeline_server.py 3.7 KB
Newer Older
B
barrierye 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
B
barriery 已提交
15
import numpy as np
16
from paddle_serving_app.reader.imdb_reader import IMDBDataset
B
barriery 已提交
17
import logging
18 19 20 21 22
from paddle_serving_server.web_service import WebService
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataErrcode
B
barrierye 已提交
23

B
barrierye 已提交
24
_LOGGER = logging.getLogger()
B
barriery 已提交
25 26 27
user_handler = logging.StreamHandler()
user_handler.setLevel(logging.INFO)
user_handler.setFormatter(
B
barriery 已提交
28 29
    logging.Formatter(
        "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
B
barriery 已提交
30
_LOGGER.addHandler(user_handler)
B
barrierye 已提交
31

B
barriery 已提交
32

B
barrierye 已提交
33
class ImdbRequestOp(RequestOp):
B
barrierye 已提交
34
    def init_op(self):
B
barrierye 已提交
35 36 37
        self.imdb_dataset = IMDBDataset()
        self.imdb_dataset.load_resource('imdb.vocab')

B
barrierye 已提交
38 39 40 41 42 43 44
    def unpack_request_package(self, request):
        dictdata = {}
        for idx, key in enumerate(request.key):
            if key != "words":
                continue
            words = request.value[idx]
            word_ids, _ = self.imdb_dataset.get_words_and_label(words)
W
wangjiawei04 已提交
45 46
            word_len = len(word_ids)
            dictdata[key] = np.array(word_ids).reshape(word_len, 1)
47 48 49 50 51 52
            dictdata["{}.lod".format(key)] = np.array([0, word_len])

        log_id = None
        if request.logid is not None:
            log_id = request.logid
        return dictdata, log_id, None, ""
B
barrierye 已提交
53 54


B
barrierye 已提交
55
class CombineOp(Op):
56 57
    def preprocess(self, input_data, data_id, log_id):
        #_LOGGER.info("Enter CombineOp::preprocess")
58
        combined_prediction = 0
B
barrierye 已提交
59 60
        for op_name, data in input_data.items():
            _LOGGER.info("{}: {}".format(op_name, data["prediction"]))
61
            combined_prediction += data["prediction"]
B
barrierye 已提交
62
        data = {"prediction": combined_prediction / 2}
63
        return data, False, None, ""
B
barrierye 已提交
64

B
barriery 已提交
65

B
barrierye 已提交
66 67 68 69
class ImdbResponseOp(ResponseOp):
    # Here ImdbResponseOp is consistent with the default ResponseOp implementation
    def pack_response_package(self, channeldata):
        resp = pipeline_service_pb2.Response()
70 71
        resp.err_no = channeldata.error_code
        if resp.err_no == ChannelDataErrcode.OK.value:
B
barrierye 已提交
72 73 74 75 76 77
            feed = channeldata.parse()
            # ndarray to string
            for name, var in feed.items():
                resp.value.append(var.__repr__())
                resp.key.append(name)
        else:
78
            resp.err_msg = channeldata.error_info
B
barrierye 已提交
79 80 81 82
        return resp


read_op = ImdbRequestOp()
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97


class BowOp(Op):
    def init_op(self):
        pass


class CnnOp(Op):
    def init_op(self):
        pass


bow_op = BowOp("bow", input_ops=[read_op])
cnn_op = CnnOp("cnn", input_ops=[read_op])
combine_op = CombineOp("combine", input_ops=[bow_op, cnn_op])
B
barrierye 已提交
98

B
barrierye 已提交
99
# fetch output of bow_op
100
#response_op = ImdbResponseOp(input_ops=[bow_op])
B
barrierye 已提交
101 102 103 104 105

# fetch output of combine_op
response_op = ImdbResponseOp(input_ops=[combine_op])

# use default ResponseOp implementation
106
#response_op = ResponseOp(input_ops=[combine_op])
B
barrierye 已提交
107

B
barrierye 已提交
108
server = PipelineServer()
B
barrierye 已提交
109
server.set_response_op(response_op)
B
barrierye 已提交
110 111
server.prepare_server('config.yml')
server.run_server()