web_service.py 9.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
B
barrierye 已提交
15 16
# pylint: disable=doc-string-missing

17 18 19 20
from flask import Flask, request, abort
from multiprocessing import Pool, Process
from paddle_serving_server import OpMaker, OpSeqMaker, Server
from paddle_serving_client import Client
M
MRXLT 已提交
21 22
from contextlib import closing
import socket
W
wangjiawei04 已提交
23
import numpy as np
H
HexToString 已提交
24
import os
25
from paddle_serving_server import pipeline
B
barriery 已提交
26
from paddle_serving_server.pipeline import Op
27

B
barrierye 已提交
28

H
HexToString 已提交
29 30 31 32 33 34 35 36 37 38
def port_is_available(port):
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.settimeout(2)
        result = sock.connect_ex(('0.0.0.0', port))
    if result != 0:
        return True
    else:
        return False


39 40 41
class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
B
barriery 已提交
42
        # pipeline
B
barriery 已提交
43
        self._server = pipeline.PipelineServer(self.name)
44

B
barriery 已提交
45 46
    def get_pipeline_response(self, read_op):
        return None
47

B
barriery 已提交
48 49 50 51 52 53 54 55 56 57 58
    def prepare_pipeline_config(self, yaml_file):
        # build dag
        read_op = pipeline.RequestOp()
        last_op = self.get_pipeline_response(read_op)
        if not isinstance(last_op, Op):
            raise ValueError("The return value type of `get_pipeline_response` "
                             "function is not Op type, please check function "
                             "`get_pipeline_response`.")
        response_op = pipeline.ResponseOp(input_ops=[last_op])
        self._server.set_response_op(response_op)
        self._server.prepare_server(yaml_file)
59 60

    def run_service(self):
B
barriery 已提交
61
        self._server.run_server()
62

H
HexToString 已提交
63 64 65 66 67 68 69 70 71 72 73 74
    def load_model_config(self, server_config_dir_paths, client_config_path=None):
        if isinstance(server_config_dir_paths, str):
            server_config_dir_paths = [server_config_dir_paths]
        elif isinstance(server_config_dir_paths, list):
            pass

        for single_model_config in server_config_dir_paths:
            if os.path.isdir(single_model_config):
                pass
            elif os.path.isfile(single_model_config):
                raise ValueError("The input of --model should be a dir not file.")
        self.server_config_dir_paths = server_config_dir_paths
75 76
        from .proto import general_model_config_pb2 as m_config
        import google.protobuf.text_format
H
HexToString 已提交
77 78 79 80
        file_path_list = []
        for single_model_config in self.server_config_dir_paths:
            file_path_list.append( "{}/serving_server_conf.prototxt".format(single_model_config) )
        
81
        model_conf = m_config.GeneralModelConfig()
H
HexToString 已提交
82
        f = open(file_path_list[0], 'r')
83 84
        model_conf = google.protobuf.text_format.Merge(
            str(f.read()), model_conf)
W
wangjiawei04 已提交
85
        self.feed_vars = {var.name: var for var in model_conf.feed_var}
H
HexToString 已提交
86 87 88 89 90 91 92

        if len(file_path_list) > 1:
            model_conf = m_config.GeneralModelConfig()
            f = open(file_path_list[-1], 'r')
            model_conf = google.protobuf.text_format.Merge(
                str(f.read()), model_conf)

W
wangjiawei04 已提交
93
        self.fetch_vars = {var.name: var for var in model_conf.fetch_var}
H
HexToString 已提交
94 95
        if client_config_path == None:
            self.client_config_path = self.server_config_dir_paths
96 97 98 99

    def _launch_rpc_service(self):
        op_maker = OpMaker()
        op_seq_maker = OpSeqMaker()
H
HexToString 已提交
100 101

        read_op = op_maker.create('general_reader')
102
        op_seq_maker.add_op(read_op)
H
HexToString 已提交
103 104 105 106 107 108 109 110 111 112 113

        for idx, single_model in enumerate(self.server_config_dir_paths):
            infer_op_name = "general_infer"
            if len(self.server_config_dir_paths) == 2 and idx == 0:
                infer_op_name = "general_detection"
            else:
                infer_op_name = "general_infer"
            general_infer_op = op_maker.create(infer_op_name)
            op_seq_maker.add_op(general_infer_op)
        
        general_response_op = op_maker.create('general_response')
114
        op_seq_maker.add_op(general_response_op)
H
HexToString 已提交
115

116 117 118
        server = Server()
        server.set_op_sequence(op_seq_maker.get_op_sequence())
        server.set_num_threads(16)
M
MRXLT 已提交
119 120
        server.set_memory_optimize(self.mem_optim)
        server.set_ir_optimize(self.ir_optim)
H
HexToString 已提交
121
        server.load_model_config(self.server_config_dir_paths)#brpc Server support server_config_dir_paths 
122
        server.prepare_server(
M
MRXLT 已提交
123
            workdir=self.workdir, port=self.port_list[0], device=self.device)
124 125
        server.run_server()

M
MRXLT 已提交
126 127 128 129 130 131 132 133 134
    def port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
135 136 137 138 139 140
    def prepare_server(self,
                       workdir="",
                       port=9393,
                       device="cpu",
                       mem_optim=True,
                       ir_optim=False):
B
barriery 已提交
141
        print("This API will be deprecated later. Please do not use it")
142 143 144
        self.workdir = workdir
        self.port = port
        self.device = device
M
MRXLT 已提交
145
        default_port = 12000
M
MRXLT 已提交
146
        self.port_list = []
M
MRXLT 已提交
147 148
        self.mem_optim = mem_optim
        self.ir_optim = ir_optim
M
MRXLT 已提交
149
        for i in range(1000):
W
wangjiawei04 已提交
150
            if port_is_available(default_port + i):
M
MRXLT 已提交
151 152
                self.port_list.append(default_port + i)
                break
153 154

    def _launch_web_service(self):
M
MRXLT 已提交
155
        self.client = Client()
H
HexToString 已提交
156
        self.client.load_client_config(self.client_config_path)
M
MRXLT 已提交
157
        self.client.connect(["0.0.0.0:{}".format(self.port_list[0])])
B
barrierye 已提交
158

D
dongdaxiang 已提交
159
    def get_prediction(self, request):
D
dongdaxiang 已提交
160 161 162 163 164
        if not request.json:
            abort(400)
        if "fetch" not in request.json:
            abort(400)
        try:
165 166
            feed, fetch, is_batch = self.preprocess(request.json["feed"],
                                                    request.json["fetch"])
B
barrierye 已提交
167 168
            if isinstance(feed, dict) and "fetch" in feed:
                del feed["fetch"]
W
wangjiawei04 已提交
169 170
            if len(feed) == 0:
                raise ValueError("empty input")
171 172
            fetch_map = self.client.predict(
                feed=feed, fetch=fetch, batch=is_batch)
G
gongweibao 已提交
173
            result = self.postprocess(
M
MRXLT 已提交
174
                feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
G
gongweibao 已提交
175
            result = {"result": result}
M
bug fix  
MRXLT 已提交
176
        except ValueError as err:
M
MRXLT 已提交
177
            result = {"result": str(err)}
D
dongdaxiang 已提交
178
        return result
179

M
MRXLT 已提交
180
    def run_rpc_service(self):
B
barriery 已提交
181
        print("This API will be deprecated later. Please do not use it")
182 183 184
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
B
barrierye 已提交
185 186
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
187 188 189
        p_rpc = Process(target=self._launch_rpc_service)
        p_rpc.start()

M
MRXLT 已提交
190 191 192 193 194 195 196 197 198 199 200 201
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_web_service()

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

M
MRXLT 已提交
202 203
        self.app_instance = app_instance

W
wangjiawei04 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    def run_debugger_service(self):
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_local_predictor()

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

        self.app_instance = app_instance

    def _launch_local_predictor(self):
W
wangjiawei04 已提交
225 226
        from paddle_serving_app.local_predict import LocalPredictor
        self.client = LocalPredictor()
H
HexToString 已提交
227 228 229 230 231
        # actually, LocalPredictor is like a server, but it is WebService Request initiator
        # for WebService it is a Client.
        # local_predictor only support single-Model DirPath - Type:str
        # so the input must be self.server_config_dir_paths[0]
        self.client.load_model_config(self.server_config_dir_paths[0], use_gpu=False)
W
wangjiawei04 已提交
232

M
MRXLT 已提交
233
    def run_web_service(self):
B
barriery 已提交
234
        print("This API will be deprecated later. Please do not use it")
235
        self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
M
MRXLT 已提交
236 237 238

    def get_app_instance(self):
        return self.app_instance
M
MRXLT 已提交
239

M
MRXLT 已提交
240
    def preprocess(self, feed=[], fetch=[]):
B
barriery 已提交
241
        print("This API will be deprecated later. Please do not use it")
242
        is_batch = True
W
wangjiawei04 已提交
243 244 245 246 247
        feed_dict = {}
        for var_name in self.feed_vars.keys():
            feed_dict[var_name] = []
        for feed_ins in feed:
            for key in feed_ins:
W
wangjiawei04 已提交
248 249 250
                feed_dict[key].append(
                    np.array(feed_ins[key]).reshape(
                        list(self.feed_vars[key].shape))[np.newaxis, :])
W
wangjiawei04 已提交
251 252
        feed = {}
        for key in feed_dict:
W
wangjiawei04 已提交
253
            feed[key] = np.concatenate(feed_dict[key], axis=0)
254
        return feed, fetch, is_batch
255

M
MRXLT 已提交
256
    def postprocess(self, feed=[], fetch=[], fetch_map=None):
B
barriery 已提交
257
        print("This API will be deprecated later. Please do not use it")
M
bug fix  
MRXLT 已提交
258 259
        for key in fetch_map:
            fetch_map[key] = fetch_map[key].tolist()
260
        return fetch_map