web_service.py 7.9 KB
Newer Older
1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
M
MRXLT 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
# pylint: disable=doc-string-missing
B
barrierye 已提交
15

M
MRXLT 已提交
16
from flask import Flask, request, abort
M
MRXLT 已提交
17
from contextlib import closing
M
MRXLT 已提交
18
from multiprocessing import Pool, Process, Queue
M
MRXLT 已提交
19
from paddle_serving_client import Client
M
MRXLT 已提交
20
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
M
MRXLT 已提交
21
from paddle_serving_server_gpu.serve import start_multi_card
M
MRXLT 已提交
22
import socket
M
MRXLT 已提交
23 24
import sys
import numpy as np
M
MRXLT 已提交
25
import paddle_serving_server_gpu as serving
M
MRXLT 已提交
26 27 28 29 30


class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
31 32
        self.gpus = []
        self.rpc_service_list = []
33

M
MRXLT 已提交
34 35 36
    def load_model_config(self, model_config):
        self.model_config = model_config

37
    def set_gpus(self, gpus):
G
guru4elephant 已提交
38
        self.gpus = [int(x) for x in gpus.split(",")]
39

B
barrierye 已提交
40 41 42 43
    def default_rpc_service(self,
                            workdir="conf",
                            port=9292,
                            gpuid=0,
M
MRXLT 已提交
44 45 46
                            thread_num=2,
                            mem_optim=True,
                            ir_optim=False):
47 48
        device = "gpu"
        if gpuid == -1:
G
guru4elephant 已提交
49
            device = "cpu"
50
        op_maker = serving.OpMaker()
M
MRXLT 已提交
51 52 53
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
B
barrierye 已提交
54

G
gongweibao 已提交
55
        op_seq_maker = OpSeqMaker()
M
MRXLT 已提交
56 57 58
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
B
barrierye 已提交
59

G
gongweibao 已提交
60
        server = Server()
M
MRXLT 已提交
61
        server.set_op_sequence(op_seq_maker.get_op_sequence())
62
        server.set_num_threads(thread_num)
M
bug fix  
MRXLT 已提交
63 64
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)
B
barrierye 已提交
65

66
        server.load_model_config(self.model_config)
G
guru4elephant 已提交
67 68
        if gpuid >= 0:
            server.set_gpuid(gpuid)
69 70 71 72 73
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
74

M
MRXLT 已提交
75 76 77 78 79 80 81 82 83
    def port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
84 85 86 87
    def prepare_server(self,
                       workdir="",
                       port=9393,
                       device="gpu",
M
MRXLT 已提交
88
                       inner_port=12000,
M
MRXLT 已提交
89 90 91
                       gpuid=0,
                       mem_optim=True,
                       ir_optim=False):
M
MRXLT 已提交
92 93 94 95
        self.workdir = workdir
        self.port = port
        self.device = device
        self.gpuid = gpuid
M
MRXLT 已提交
96
        self.port_list = []
M
MRXLT 已提交
97
        default_port = inner_port
M
MRXLT 已提交
98 99 100 101 102 103
        for i in range(1000):
            if self.port_is_available(default_port + i):
                self.port_list.append(default_port + i)
            if len(self.port_list) > len(self.gpus):
                break

104 105 106
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
B
barrierye 已提交
107
                self.default_rpc_service(
M
MRXLT 已提交
108 109 110 111
                    self.workdir,
                    self.port_list[0],
                    -1,
                    thread_num=2,
M
bug fix  
MRXLT 已提交
112 113
                    mem_optim=mem_optim,
                    ir_optim=ir_optim))
114 115 116
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
B
barrierye 已提交
117 118
                    self.default_rpc_service(
                        "{}_{}".format(self.workdir, i),
M
MRXLT 已提交
119
                        self.port_list[i],
B
barrierye 已提交
120
                        gpuid,
M
MRXLT 已提交
121
                        thread_num=2,
M
bug fix  
MRXLT 已提交
122 123
                        mem_optim=mem_optim,
                        ir_optim=ir_optim))
M
MRXLT 已提交
124

M
MRXLT 已提交
125 126 127 128
    def _launch_web_service(self):
        gpu_num = len(self.gpus)
        self.client = Client()
        self.client.load_client_config("{}/serving_server_conf.prototxt".format(
129
            self.model_config))
M
MRXLT 已提交
130 131 132
        endpoints = ""
        if gpu_num > 0:
            for i in range(gpu_num):
M
MRXLT 已提交
133
                endpoints += "127.0.0.1:{},".format(self.port_list[i])
M
MRXLT 已提交
134
        else:
M
MRXLT 已提交
135
            endpoints = "127.0.0.1:{}".format(self.port_list[0])
M
MRXLT 已提交
136 137 138 139 140 141 142
        self.client.connect([endpoints])

    def get_prediction(self, request):
        if not request.json:
            abort(400)
        if "fetch" not in request.json:
            abort(400)
B
barrierye 已提交
143
        try:
M
MRXLT 已提交
144 145
            feed, fetch = self.preprocess(request.json["feed"],
                                          request.json["fetch"])
B
barrierye 已提交
146 147
            if isinstance(feed, dict) and "fetch" in feed:
                del feed["fetch"]
148 149
            if len(feed) == 0:
                raise ValueError("empty input")
B
barrierye 已提交
150 151
            fetch_map = self.client.predict(feed=feed, fetch=fetch)
            result = self.postprocess(
M
MRXLT 已提交
152
                feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
B
barrierye 已提交
153
            result = {"result": result}
M
bug fix  
MRXLT 已提交
154
        except ValueError as err:
M
MRXLT 已提交
155
            result = {"result": err}
M
MRXLT 已提交
156
        return result
157

M
MRXLT 已提交
158
    def run_rpc_service(self):
M
MRXLT 已提交
159 160 161 162 163
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
164 165
        server_pros = []
        for i, service in enumerate(self.rpc_service_list):
G
guru4elephant 已提交
166
            p = Process(target=self._launch_rpc_service, args=(i, ))
167 168
            server_pros.append(p)
        for p in server_pros:
169 170
            p.start()

M
MRXLT 已提交
171 172 173 174 175 176 177 178 179 180 181 182
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_web_service()

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

M
MRXLT 已提交
183 184
        self.app_instance = app_instance

185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    # TODO: maybe change another API name: maybe run_local_predictor?
    def run_debugger_service(self, gpu=False):
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
        app_instance = Flask(__name__)

        @app_instance.before_first_request
        def init():
            self._launch_local_predictor(gpu)

        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=["POST"])
        def run():
            return self.get_prediction(request)

        self.app_instance = app_instance

    def _launch_local_predictor(self, gpu):
        from paddle_serving_app.local_predict import Debugger
        self.client = Debugger()
W
wangjiawei04 已提交
209 210
        self.client.load_model_config(
            "{}".format(self.model_config), gpu=gpu, profile=False)
211

M
MRXLT 已提交
212
    def run_web_service(self):
M
fix bug  
MRXLT 已提交
213 214 215 216
        self.app_instance.run(host="0.0.0.0",
                              port=self.port,
                              threaded=False,
                              processes=1)
M
MRXLT 已提交
217 218

    def get_app_instance(self):
G
gongweibao 已提交
219
        return self.app_instance
M
MRXLT 已提交
220

M
MRXLT 已提交
221
    def preprocess(self, feed=[], fetch=[]):
M
MRXLT 已提交
222 223
        return feed, fetch

M
MRXLT 已提交
224
    def postprocess(self, feed=[], fetch=[], fetch_map=None):
W
wangjiawei04 已提交
225 226
        for key in fetch_map.iterkeys():
            fetch_map[key] = fetch_map[key].tolist()
M
MRXLT 已提交
227
        return fetch_map