web_service.py 15.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
B
barrierye 已提交
15 16
# pylint: disable=doc-string-missing

H
HexToString 已提交
17
# Now, this is only for Pipeline.
18
from flask import Flask, request, abort
M
MRXLT 已提交
19
from contextlib import closing
Z
zhangjun 已提交
20 21
from multiprocessing import Pool, Process, Queue
from paddle_serving_client import Client
Z
zhangjun 已提交
22 23
from paddle_serving_server import OpMaker, OpSeqMaker, Server
from paddle_serving_server.serve import start_multi_card
M
MRXLT 已提交
24
import socket
Z
zhangjun 已提交
25
import sys
W
wangjiawei04 已提交
26
import numpy as np
H
HexToString 已提交
27
import os
Z
zhangjun 已提交
28 29
from paddle_serving_server import pipeline
from paddle_serving_server.pipeline import Op
H
HexToString 已提交
30
from paddle_serving_server.serve import format_gpu_to_strlist
F
felixhjh 已提交
31
from paddle_serving_server.util import dump_pid_file
B
barrierye 已提交
32

H
HexToString 已提交
33

H
HexToString 已提交
34 35 36
def port_is_available(port):
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.settimeout(2)
H
HexToString 已提交
37
        result = sock.connect_ex(('127.0.0.1', port))
H
HexToString 已提交
38 39 40 41 42
    if result != 0:
        return True
    else:
        return False

H
HexToString 已提交
43

44 45 46
class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
B
barriery 已提交
47
        # pipeline
B
barriery 已提交
48
        self._server = pipeline.PipelineServer(self.name)
49

H
HexToString 已提交
50
        self.gpus = ["-1"]  # deprecated
Z
zhangjun 已提交
51 52
        self.rpc_service_list = []  # deprecated

B
barriery 已提交
53 54
    def get_pipeline_response(self, read_op):
        return None
55

B
barriery 已提交
56 57 58 59 60 61 62 63 64 65 66
    def prepare_pipeline_config(self, yaml_file):
        # build dag
        read_op = pipeline.RequestOp()
        last_op = self.get_pipeline_response(read_op)
        if not isinstance(last_op, Op):
            raise ValueError("The return value type of `get_pipeline_response` "
                             "function is not Op type, please check function "
                             "`get_pipeline_response`.")
        response_op = pipeline.ResponseOp(input_ops=[last_op])
        self._server.set_response_op(response_op)
        self._server.prepare_server(yaml_file)
67 68

    def run_service(self):
B
barriery 已提交
69
        self._server.run_server()
70

H
HexToString 已提交
71 72 73
    def load_model_config(self,
                          server_config_dir_paths,
                          client_config_path=None):
H
HexToString 已提交
74 75 76 77 78 79 80 81 82
        if isinstance(server_config_dir_paths, str):
            server_config_dir_paths = [server_config_dir_paths]
        elif isinstance(server_config_dir_paths, list):
            pass

        for single_model_config in server_config_dir_paths:
            if os.path.isdir(single_model_config):
                pass
            elif os.path.isfile(single_model_config):
H
HexToString 已提交
83 84
                raise ValueError(
                    "The input of --model should be a dir not file.")
H
HexToString 已提交
85
        self.server_config_dir_paths = server_config_dir_paths
86 87
        from .proto import general_model_config_pb2 as m_config
        import google.protobuf.text_format
H
HexToString 已提交
88 89
        file_path_list = []
        for single_model_config in self.server_config_dir_paths:
H
HexToString 已提交
90 91 92
            file_path_list.append("{}/serving_server_conf.prototxt".format(
                single_model_config))

93
        model_conf = m_config.GeneralModelConfig()
H
HexToString 已提交
94
        f = open(file_path_list[0], 'r')
95 96
        model_conf = google.protobuf.text_format.Merge(
            str(f.read()), model_conf)
T
TeslaZhao 已提交
97
        self.feed_vars = {var.alias_name: var for var in model_conf.feed_var}
H
HexToString 已提交
98 99 100 101 102 103 104

        if len(file_path_list) > 1:
            model_conf = m_config.GeneralModelConfig()
            f = open(file_path_list[-1], 'r')
            model_conf = google.protobuf.text_format.Merge(
                str(f.read()), model_conf)

T
TeslaZhao 已提交
105
        self.fetch_vars = {var.alias_name: var for var in model_conf.fetch_var}
H
HexToString 已提交
106
        if client_config_path == None:
H
HexToString 已提交
107
            self.client_config_path = file_path_list
108

H
HexToString 已提交
109
    # after this function, self.gpus should be a list of str or [].
Z
zhangjun 已提交
110 111
    def set_gpus(self, gpus):
        print("This API will be deprecated later. Please do not use it")
H
HexToString 已提交
112 113 114 115 116 117 118 119 120 121
        self.gpus = format_gpu_to_strlist(gpus)

# this function can be called by user
# or by Function create_rpc_config
# if by user, user can set_gpus or pass the `gpus`
# if `gpus` == None, which means it`s not set at all.
# at this time, we should use self.gpus instead.
# otherwise, we should use the `gpus` first.
# which means if set_gpus and `gpus` is both set.
# `gpus` will be used.
Z
zhangjun 已提交
122 123

    def default_rpc_service(self,
124
                            workdir,
Z
zhangjun 已提交
125
                            port=9292,
H
HexToString 已提交
126
                            gpus=None,
127
                            thread_num=4,
Z
zhangjun 已提交
128 129 130
                            mem_optim=True,
                            use_lite=False,
                            use_xpu=False,
131 132
                            ir_optim=False,
                            precision="fp32",
133 134 135
                            use_calib=False,
                            use_trt=False,
                            gpu_multi_stream=False,
H
HexToString 已提交
136 137
                            runtime_thread_num=None,
                            batch_infer_size=None):
H
HexToString 已提交
138

H
HexToString 已提交
139
        device = "cpu"
140
        server = Server()
H
HexToString 已提交
141 142 143 144
        # only when `gpus == None`, which means it`s not set at all
        # we will use the self.gpus.
        if gpus == None:
            gpus = self.gpus
145

H
HexToString 已提交
146 147 148 149
        gpus = format_gpu_to_strlist(gpus)
        server.set_gpuid(gpus)

        if len(gpus) == 0 or gpus == ["-1"]:
Z
zhangjun 已提交
150 151 152 153
            if use_lite:
                device = "arm"
            else:
                device = "cpu"
H
HexToString 已提交
154 155
        else:
            device = "gpu"
H
HexToString 已提交
156

157
        op_maker = OpMaker()
158
        op_seq_maker = OpSeqMaker()
H
HexToString 已提交
159 160

        read_op = op_maker.create('general_reader')
161
        op_seq_maker.add_op(read_op)
H
HexToString 已提交
162 163 164 165 166 167 168 169 170

        for idx, single_model in enumerate(self.server_config_dir_paths):
            infer_op_name = "general_infer"
            if len(self.server_config_dir_paths) == 2 and idx == 0:
                infer_op_name = "general_detection"
            else:
                infer_op_name = "general_infer"
            general_infer_op = op_maker.create(infer_op_name)
            op_seq_maker.add_op(general_infer_op)
H
HexToString 已提交
171

H
HexToString 已提交
172
        general_response_op = op_maker.create('general_response')
173
        op_seq_maker.add_op(general_response_op)
Z
zhangjun 已提交
174

175
        server.set_op_sequence(op_seq_maker.get_op_sequence())
Z
zhangjun 已提交
176 177 178 179
        server.set_num_threads(thread_num)
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)
        server.set_device(device)
180 181
        server.set_precision(precision)
        server.set_use_calib(use_calib)
Z
zhangjun 已提交
182

183 184 185 186 187 188 189
        if use_trt and device == "gpu":
            server.set_trt()
            server.set_ir_optimize(True)

        if gpu_multi_stream and device == "gpu":
            server.set_gpu_multi_stream()

H
HexToString 已提交
190 191
        if runtime_thread_num:
            server.set_runtime_thread_num(runtime_thread_num)
192

H
HexToString 已提交
193 194
        if batch_infer_size:
            server.set_batch_infer_size(batch_infer_size)
195

Z
zhangjun 已提交
196 197 198 199 200
        if use_lite:
            server.set_lite()
        if use_xpu:
            server.set_xpu()

H
HexToString 已提交
201 202
        server.load_model_config(self.server_config_dir_paths
                                 )  #brpc Server support server_config_dir_paths
203

Z
zhangjun 已提交
204 205 206 207 208
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
209

H
HexToString 已提交
210 211 212
    # if use this function, self.gpus must be set before.
    # if not, we will use the default value, self.gpus = ["-1"].
    # so we always pass the `gpus` = self.gpus. 
H
HexToString 已提交
213
    def create_rpc_config(self):
H
HexToString 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227
        self.rpc_service_list.append(
            self.default_rpc_service(
                self.workdir,
                self.port_list[0],
                self.gpus,
                thread_num=self.thread_num,
                mem_optim=self.mem_optim,
                use_lite=self.use_lite,
                use_xpu=self.use_xpu,
                ir_optim=self.ir_optim,
                precision=self.precision,
                use_calib=self.use_calib,
                use_trt=self.use_trt,
                gpu_multi_stream=self.gpu_multi_stream,
H
HexToString 已提交
228 229
                runtime_thread_num=self.runtime_thread_num,
                batch_infer_size=self.batch_infer_size))
H
HexToString 已提交
230

M
MRXLT 已提交
231
    def prepare_server(self,
232
                       workdir,
M
MRXLT 已提交
233
                       port=9393,
H
HexToString 已提交
234
                       device="cpu",
235 236
                       precision="fp32",
                       use_calib=False,
Z
zhangjun 已提交
237 238 239
                       use_lite=False,
                       use_xpu=False,
                       ir_optim=False,
240
                       thread_num=4,
241 242 243
                       mem_optim=True,
                       use_trt=False,
                       gpu_multi_stream=False,
H
HexToString 已提交
244 245
                       runtime_thread_num=None,
                       batch_infer_size=None,
H
HexToString 已提交
246
                       gpuid=None):
B
barriery 已提交
247
        print("This API will be deprecated later. Please do not use it")
248 249
        self.workdir = workdir
        self.port = port
H
HexToString 已提交
250
        self.thread_num = thread_num
H
HexToString 已提交
251 252
        # self.device is not used at all.
        # device is set by gpuid.
H
HexToString 已提交
253 254 255 256 257 258
        self.precision = precision
        self.use_calib = use_calib
        self.use_lite = use_lite
        self.use_xpu = use_xpu
        self.ir_optim = ir_optim
        self.mem_optim = mem_optim
M
MRXLT 已提交
259
        self.port_list = []
260 261
        self.use_trt = use_trt
        self.gpu_multi_stream = gpu_multi_stream
H
HexToString 已提交
262 263 264
        self.runtime_thread_num = runtime_thread_num
        self.batch_infer_size = batch_infer_size

F
felixhjh 已提交
265 266
        # record port and pid info for stopping process
        dump_pid_file([self.port], "web_service")
H
HexToString 已提交
267 268 269 270 271
        # if gpuid != None, we will use gpuid first.
        # otherwise, keep the self.gpus unchanged.
        # maybe self.gpus is set by the Function set_gpus.
        if gpuid != None:
            self.gpus = format_gpu_to_strlist(gpuid)
H
HexToString 已提交
272
        else:
H
HexToString 已提交
273
            pass
274

Z
zhangjun 已提交
275
        default_port = 12000
M
MRXLT 已提交
276
        for i in range(1000):
W
wangjiawei04 已提交
277
            if port_is_available(default_port + i):
M
MRXLT 已提交
278 279
                self.port_list.append(default_port + i)
                break
280 281

    def _launch_web_service(self):
M
MRXLT 已提交
282
        self.client = Client()
H
HexToString 已提交
283
        self.client.load_client_config(self.client_config_path)
Z
zhangjun 已提交
284
        endpoints = ""
285
        endpoints = "127.0.0.1:{}".format(self.port_list[0])
Z
zhangjun 已提交
286
        self.client.connect([endpoints])
B
barrierye 已提交
287

D
dongdaxiang 已提交
288
    def get_prediction(self, request):
D
dongdaxiang 已提交
289 290 291 292 293
        if not request.json:
            abort(400)
        if "fetch" not in request.json:
            abort(400)
        try:
294 295
            feed, fetch, is_batch = self.preprocess(request.json["feed"],
                                                    request.json["fetch"])
B
barrierye 已提交
296 297
            if isinstance(feed, dict) and "fetch" in feed:
                del feed["fetch"]
W
wangjiawei04 已提交
298 299
            if len(feed) == 0:
                raise ValueError("empty input")
300 301
            fetch_map = self.client.predict(
                feed=feed, fetch=fetch, batch=is_batch)
G
gongweibao 已提交
302
            result = self.postprocess(
M
MRXLT 已提交
303
                feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
G
gongweibao 已提交
304
            result = {"result": result}
M
bug fix  
MRXLT 已提交
305
        except ValueError as err:
M
MRXLT 已提交
306
            result = {"result": str(err)}
D
dongdaxiang 已提交
307
        return result
308

M
MRXLT 已提交
309
    def run_rpc_service(self):
B
barriery 已提交
310
        print("This API will be deprecated later. Please do not use it")
311 312 313
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
B
barrierye 已提交
314 315
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
Z
zhangjun 已提交
316
        server_pros = []
H
HexToString 已提交
317
        self.create_rpc_config()
Z
zhangjun 已提交
318 319 320 321 322
        for i, service in enumerate(self.rpc_service_list):
            p = Process(target=self._launch_rpc_service, args=(i, ))
            server_pros.append(p)
        for p in server_pros:
            p.start()
323

M
MRXLT 已提交
324 325 326 327 328 329 330 331 332 333 334 335
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_web_service()

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

M
MRXLT 已提交
336 337
        self.app_instance = app_instance

Z
zhangjun 已提交
338 339 340
    # TODO: maybe change another API name: maybe run_local_predictor?
    def run_debugger_service(self, gpu=False):
        print("This API will be deprecated later. Please do not use it")
W
wangjiawei04 已提交
341 342 343 344 345 346 347 348 349
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
Z
zhangjun 已提交
350
            self._launch_local_predictor(gpu)
W
wangjiawei04 已提交
351 352 353 354 355 356 357 358 359

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

        self.app_instance = app_instance

Z
zhangjun 已提交
360
    def _launch_local_predictor(self, gpu):
H
HexToString 已提交
361 362 363 364
        # actually, LocalPredictor is like a server, but it is WebService Request initiator
        # for WebService it is a Client.
        # local_predictor only support single-Model DirPath - Type:str
        # so the input must be self.server_config_dir_paths[0]
W
wangjiawei04 已提交
365 366
        from paddle_serving_app.local_predict import LocalPredictor
        self.client = LocalPredictor()
Z
zhangjun 已提交
367
        if gpu:
H
HexToString 已提交
368 369
            # if user forget to call function `set_gpus` to set self.gpus.
            # default self.gpus = [0].
H
HexToString 已提交
370 371
            if len(self.gpus) == 0 or self.gpus == ["-1"]:
                self.gpus = ["0"]
H
HexToString 已提交
372 373
            # right now, local Predictor only support 1 card.
            # no matter how many gpu_id is in gpus, we only use the first one.
374
            gpu_id = (self.gpus[0].split(","))[0]
H
HexToString 已提交
375
            self.client.load_model_config(
376
                self.server_config_dir_paths[0], use_gpu=True, gpu_id=gpu_id)
Z
zhangjun 已提交
377
        else:
H
HexToString 已提交
378 379
            self.client.load_model_config(
                self.server_config_dir_paths[0], use_gpu=False)
W
wangjiawei04 已提交
380

M
MRXLT 已提交
381
    def run_web_service(self):
B
barriery 已提交
382
        print("This API will be deprecated later. Please do not use it")
383
        self.app_instance.run(host="0.0.0.0", port=self.port, threaded=True)
M
MRXLT 已提交
384 385 386

    def get_app_instance(self):
        return self.app_instance
M
MRXLT 已提交
387

M
MRXLT 已提交
388
    def preprocess(self, feed=[], fetch=[]):
B
barriery 已提交
389
        print("This API will be deprecated later. Please do not use it")
390
        is_batch = True
W
wangjiawei04 已提交
391 392 393 394 395
        feed_dict = {}
        for var_name in self.feed_vars.keys():
            feed_dict[var_name] = []
        for feed_ins in feed:
            for key in feed_ins:
W
wangjiawei04 已提交
396 397 398
                feed_dict[key].append(
                    np.array(feed_ins[key]).reshape(
                        list(self.feed_vars[key].shape))[np.newaxis, :])
W
wangjiawei04 已提交
399 400
        feed = {}
        for key in feed_dict:
W
wangjiawei04 已提交
401
            feed[key] = np.concatenate(feed_dict[key], axis=0)
402
        return feed, fetch, is_batch
403

M
MRXLT 已提交
404
    def postprocess(self, feed=[], fetch=[], fetch_map=None):
B
barriery 已提交
405
        print("This API will be deprecated later. Please do not use it")
M
bug fix  
MRXLT 已提交
406 407
        for key in fetch_map:
            fetch_map[key] = fetch_map[key].tolist()
408
        return fetch_map