web_service.py 11.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
B
barrierye 已提交
15 16
# pylint: disable=doc-string-missing

17
from flask import Flask, request, abort
M
MRXLT 已提交
18
from contextlib import closing
Z
zhangjun 已提交
19 20
from multiprocessing import Pool, Process, Queue
from paddle_serving_client import Client
Z
zhangjun 已提交
21 22
from paddle_serving_server import OpMaker, OpSeqMaker, Server
from paddle_serving_server.serve import start_multi_card
M
MRXLT 已提交
23
import socket
Z
zhangjun 已提交
24
import sys
W
wangjiawei04 已提交
25
import numpy as np
Z
zhangjun 已提交
26
import paddle_serving_server as serving
27

Z
zhangjun 已提交
28 29
from paddle_serving_server import pipeline
from paddle_serving_server.pipeline import Op
B
barrierye 已提交
30

H
HexToString 已提交
31 32 33 34 35 36 37 38 39
def port_is_available(port):
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.settimeout(2)
        result = sock.connect_ex(('0.0.0.0', port))
    if result != 0:
        return True
    else:
        return False

40 41 42
class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
B
barriery 已提交
43
        # pipeline
B
barriery 已提交
44
        self._server = pipeline.PipelineServer(self.name)
45

Z
zhangjun 已提交
46 47 48
        self.gpus = []  # deprecated
        self.rpc_service_list = []  # deprecated

B
barriery 已提交
49 50
    def get_pipeline_response(self, read_op):
        return None
51

B
barriery 已提交
52 53 54 55 56 57 58 59 60 61 62
    def prepare_pipeline_config(self, yaml_file):
        # build dag
        read_op = pipeline.RequestOp()
        last_op = self.get_pipeline_response(read_op)
        if not isinstance(last_op, Op):
            raise ValueError("The return value type of `get_pipeline_response` "
                             "function is not Op type, please check function "
                             "`get_pipeline_response`.")
        response_op = pipeline.ResponseOp(input_ops=[last_op])
        self._server.set_response_op(response_op)
        self._server.prepare_server(yaml_file)
63 64

    def run_service(self):
B
barriery 已提交
65
        self._server.run_server()
66 67

    def load_model_config(self, model_config):
B
barriery 已提交
68
        print("This API will be deprecated later. Please do not use it")
69
        self.model_config = model_config
70 71 72 73
        import os
        from .proto import general_model_config_pb2 as m_config
        import google.protobuf.text_format
        if os.path.isdir(model_config):
74 75
            client_config = "{}/serving_server_conf.prototxt".format(
                model_config)
T
TeslaZhao 已提交
76
        elif os.path.isfile(model_config):
77 78 79 80 81
            client_config = model_config
        model_conf = m_config.GeneralModelConfig()
        f = open(client_config, 'r')
        model_conf = google.protobuf.text_format.Merge(
            str(f.read()), model_conf)
W
wangjiawei04 已提交
82 83
        self.feed_vars = {var.name: var for var in model_conf.feed_var}
        self.fetch_vars = {var.name: var for var in model_conf.fetch_var}
84

Z
zhangjun 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    def set_gpus(self, gpus):
        print("This API will be deprecated later. Please do not use it")
        self.gpus = [int(x) for x in gpus.split(",")]

    def default_rpc_service(self,
                            workdir="conf",
                            port=9292,
                            gpuid=0,
                            thread_num=2,
                            mem_optim=True,
                            use_lite=False,
                            use_xpu=False,
                            ir_optim=False):
        device = "gpu"
        if gpuid == -1:
            if use_lite:
                device = "arm"
            else:
                device = "cpu"
        op_maker = serving.OpMaker()
105 106 107
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
Z
zhangjun 已提交
108

109 110 111 112
        op_seq_maker = OpSeqMaker()
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
Z
zhangjun 已提交
113

114 115
        server = Server()
        server.set_op_sequence(op_seq_maker.get_op_sequence())
Z
zhangjun 已提交
116 117 118 119 120 121 122 123 124 125
        server.set_num_threads(thread_num)
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)
        server.set_device(device)

        if use_lite:
            server.set_lite()
        if use_xpu:
            server.set_xpu()

126
        server.load_model_config(self.model_config)
Z
zhangjun 已提交
127 128 129 130 131 132 133
        if gpuid >= 0:
            server.set_gpuid(gpuid)
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
134

M
MRXLT 已提交
135 136 137
    def prepare_server(self,
                       workdir="",
                       port=9393,
Z
zhangjun 已提交
138 139 140 141 142 143
                       device="gpu",
                       use_lite=False,
                       use_xpu=False,
                       ir_optim=False,
                       gpuid=0,
                       mem_optim=True):
B
barriery 已提交
144
        print("This API will be deprecated later. Please do not use it")
145 146 147
        self.workdir = workdir
        self.port = port
        self.device = device
Z
zhangjun 已提交
148
        self.gpuid = gpuid
M
MRXLT 已提交
149
        self.port_list = []
Z
zhangjun 已提交
150
        default_port = 12000
M
MRXLT 已提交
151
        for i in range(1000):
W
wangjiawei04 已提交
152
            if port_is_available(default_port + i):
M
MRXLT 已提交
153
                self.port_list.append(default_port + i)
Z
zhangjun 已提交
154
            if len(self.port_list) > len(self.gpus):
M
MRXLT 已提交
155
                break
156

Z
zhangjun 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
                self.default_rpc_service(
                    self.workdir,
                    self.port_list[0],
                    -1,
                    thread_num=2,
                    mem_optim=mem_optim,
                    use_lite=use_lite,
                    use_xpu=use_xpu,
                    ir_optim=ir_optim))
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
                    self.default_rpc_service(
                        "{}_{}".format(self.workdir, i),
                        self.port_list[i],
                        gpuid,
                        thread_num=2,
                        mem_optim=mem_optim,
                        use_lite=use_lite,
                        use_xpu=use_xpu,
                        ir_optim=ir_optim))

182
    def _launch_web_service(self):
Z
zhangjun 已提交
183
        gpu_num = len(self.gpus)
M
MRXLT 已提交
184 185 186
        self.client = Client()
        self.client.load_client_config("{}/serving_server_conf.prototxt".format(
            self.model_config))
Z
zhangjun 已提交
187 188 189 190 191 192 193
        endpoints = ""
        if gpu_num > 0:
            for i in range(gpu_num):
                endpoints += "127.0.0.1:{},".format(self.port_list[i])
        else:
            endpoints = "127.0.0.1:{}".format(self.port_list[0])
        self.client.connect([endpoints])
B
barrierye 已提交
194

D
dongdaxiang 已提交
195
    def get_prediction(self, request):
D
dongdaxiang 已提交
196 197 198 199 200
        if not request.json:
            abort(400)
        if "fetch" not in request.json:
            abort(400)
        try:
201 202
            feed, fetch, is_batch = self.preprocess(request.json["feed"],
                                                    request.json["fetch"])
B
barrierye 已提交
203 204
            if isinstance(feed, dict) and "fetch" in feed:
                del feed["fetch"]
W
wangjiawei04 已提交
205 206
            if len(feed) == 0:
                raise ValueError("empty input")
207 208
            fetch_map = self.client.predict(
                feed=feed, fetch=fetch, batch=is_batch)
G
gongweibao 已提交
209
            result = self.postprocess(
M
MRXLT 已提交
210
                feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
G
gongweibao 已提交
211
            result = {"result": result}
M
bug fix  
MRXLT 已提交
212
        except ValueError as err:
M
MRXLT 已提交
213
            result = {"result": str(err)}
D
dongdaxiang 已提交
214
        return result
215

M
MRXLT 已提交
216
    def run_rpc_service(self):
B
barriery 已提交
217
        print("This API will be deprecated later. Please do not use it")
218 219 220
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
B
barrierye 已提交
221 222
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
Z
zhangjun 已提交
223 224 225 226 227 228
        server_pros = []
        for i, service in enumerate(self.rpc_service_list):
            p = Process(target=self._launch_rpc_service, args=(i, ))
            server_pros.append(p)
        for p in server_pros:
            p.start()
229

M
MRXLT 已提交
230 231 232 233 234 235 236 237 238 239 240 241
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_web_service()

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

M
MRXLT 已提交
242 243
        self.app_instance = app_instance

Z
zhangjun 已提交
244 245 246
    # TODO: maybe change another API name: maybe run_local_predictor?
    def run_debugger_service(self, gpu=False):
        print("This API will be deprecated later. Please do not use it")
W
wangjiawei04 已提交
247 248 249 250 251 252 253 254 255
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
Z
zhangjun 已提交
256
            self._launch_local_predictor(gpu)
W
wangjiawei04 已提交
257 258 259 260 261 262 263 264 265

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

        self.app_instance = app_instance

Z
zhangjun 已提交
266
    def _launch_local_predictor(self, gpu):
W
wangjiawei04 已提交
267 268
        from paddle_serving_app.local_predict import LocalPredictor
        self.client = LocalPredictor()
Z
zhangjun 已提交
269
        if gpu:
H
HexToString 已提交
270 271 272 273
            # if user forget to call function `set_gpus` to set self.gpus.
            # default self.gpus = [0].
            if len(self.gpus) == 0:
                self.gpus.append(0)
Z
zhangjun 已提交
274 275 276 277 278
            self.client.load_model_config(
                "{}".format(self.model_config), use_gpu=True, gpu_id=self.gpus[0])
        else:
            self.client.load_model_config(
                "{}".format(self.model_config), use_gpu=False)
W
wangjiawei04 已提交
279

M
MRXLT 已提交
280
    def run_web_service(self):
B
barriery 已提交
281
        print("This API will be deprecated later. Please do not use it")
282
        self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
M
MRXLT 已提交
283 284 285

    def get_app_instance(self):
        return self.app_instance
M
MRXLT 已提交
286

M
MRXLT 已提交
287
    def preprocess(self, feed=[], fetch=[]):
B
barriery 已提交
288
        print("This API will be deprecated later. Please do not use it")
289
        is_batch = True
W
wangjiawei04 已提交
290 291 292 293 294
        feed_dict = {}
        for var_name in self.feed_vars.keys():
            feed_dict[var_name] = []
        for feed_ins in feed:
            for key in feed_ins:
W
wangjiawei04 已提交
295 296 297
                feed_dict[key].append(
                    np.array(feed_ins[key]).reshape(
                        list(self.feed_vars[key].shape))[np.newaxis, :])
W
wangjiawei04 已提交
298 299
        feed = {}
        for key in feed_dict:
W
wangjiawei04 已提交
300
            feed[key] = np.concatenate(feed_dict[key], axis=0)
301
        return feed, fetch, is_batch
302

M
MRXLT 已提交
303
    def postprocess(self, feed=[], fetch=[], fetch_map=None):
B
barriery 已提交
304
        print("This API will be deprecated later. Please do not use it")
M
bug fix  
MRXLT 已提交
305 306
        for key in fetch_map:
            fetch_map[key] = fetch_map[key].tolist()
307
        return fetch_map