web_service.py 7.1 KB
Newer Older
1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
M
MRXLT 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
B
barrierye 已提交
15 16
# pylint: disable=doc-string-missing

M
MRXLT 已提交
17
from flask import Flask, request, abort
G
guru4elephant 已提交
18
from multiprocessing import Pool, Process, Queue
M
MRXLT 已提交
19
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
20
import paddle_serving_server_gpu as serving
M
MRXLT 已提交
21
from paddle_serving_client import Client
22 23 24
from .serve import start_multi_card
import time
import random
M
MRXLT 已提交
25 26 27 28 29


class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
30 31
        self.gpus = []
        self.rpc_service_list = []
G
guru4elephant 已提交
32
        self.input_queues = []
33

M
MRXLT 已提交
34 35 36
    def load_model_config(self, model_config):
        self.model_config = model_config

37
    def set_gpus(self, gpus):
G
guru4elephant 已提交
38
        self.gpus = [int(x) for x in gpus.split(",")]
39

B
barrierye 已提交
40 41 42 43 44
    def default_rpc_service(self,
                            workdir="conf",
                            port=9292,
                            gpuid=0,
                            thread_num=10):
45 46
        device = "gpu"
        if gpuid == -1:
G
guru4elephant 已提交
47
            device = "cpu"
48
        op_maker = serving.OpMaker()
M
MRXLT 已提交
49 50 51
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
B
barrierye 已提交
52

53
        op_seq_maker = serving.OpSeqMaker()
M
MRXLT 已提交
54 55 56
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
B
barrierye 已提交
57

58
        server = serving.Server()
M
MRXLT 已提交
59
        server.set_op_sequence(op_seq_maker.get_op_sequence())
60
        server.set_num_threads(thread_num)
B
barrierye 已提交
61

62
        server.load_model_config(self.model_config)
G
guru4elephant 已提交
63 64
        if gpuid >= 0:
            server.set_gpuid(gpuid)
65 66 67 68
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
69 70 71 72 73 74
        if service_idx == 0:
            self.rpc_service_list[service_idx].check_local_bin()
            if not self.rpc_service_list[service_idx].use_local_bin:
                self.rpc_service_list[service_idx].download_bin()
        else:
            time.sleep(3)
75
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
76 77 78 79 80 81

    def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
        self.workdir = workdir
        self.port = port
        self.device = device
        self.gpuid = gpuid
82 83 84
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
B
barrierye 已提交
85 86
                self.default_rpc_service(
                    self.workdir, self.port + 1, -1, thread_num=10))
87 88 89
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
B
barrierye 已提交
90 91 92 93 94
                    self.default_rpc_service(
                        "{}_{}".format(self.workdir, i),
                        self.port + 1 + i,
                        gpuid,
                        thread_num=10))
M
MRXLT 已提交
95

96 97 98 99 100 101
    def producers(self, inputqueue, endpoint):
        client = Client()
        client.load_client_config("{}/serving_server_conf.prototxt".format(
            self.model_config))
        client.connect([endpoint])
        while True:
G
guru4elephant 已提交
102
            request_json = inputqueue.get()
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
            try:
                feed, fetch = self.preprocess(request_json,
                                              request_json["fetch"])
                if isinstance(feed, list):
                    fetch_map_batch = client.predict(
                        feed_batch=feed, fetch=fetch)
                    fetch_map_batch = self.postprocess(
                        feed=request_json,
                        fetch=fetch,
                        fetch_map=fetch_map_batch)
                    result = {"result": fetch_map_batch}
                elif isinstance(feed, dict):
                    if "fetch" in feed:
                        del feed["fetch"]
                    fetch_map = client.predict(feed=feed, fetch=fetch)
                    result = self.postprocess(
                        feed=request_json, fetch=fetch, fetch_map=fetch_map)
                self.output_queue.put(result)
            except ValueError:
                self.output_queue.put(-1)
123

124
    def _launch_web_service(self, gpu_num):
M
MRXLT 已提交
125 126 127
        app_instance = Flask(__name__)
        service_name = "/" + self.name + "/prediction"

G
guru4elephant 已提交
128 129
        self.input_queues = []
        self.output_queue = Queue()
130
        for i in range(gpu_num):
G
guru4elephant 已提交
131
            self.input_queues.append(Queue())
132

133
        producer_list = []
G
guru4elephant 已提交
134
        for i, input_q in enumerate(self.input_queues):
135 136
            producer_processes = Process(
                target=self.producers,
G
guru4elephant 已提交
137 138 139
                args=(
                    input_q,
                    "0.0.0.0:{}".format(self.port + 1 + i), ))
140 141 142 143 144
            producer_list.append(producer_processes)

        for p in producer_list:
            p.start()

G
guru4elephant 已提交
145 146 147 148 149 150
        client = Client()
        client.load_client_config("{}/serving_server_conf.prototxt".format(
            self.model_config))
        client.connect(["0.0.0.0:{}".format(self.port + 1)])

        self.idx = 0
151

M
MRXLT 已提交
152 153 154 155 156 157
        @app_instance.route(service_name, methods=['POST'])
        def get_prediction():
            if not request.json:
                abort(400)
            if "fetch" not in request.json:
                abort(400)
158

G
guru4elephant 已提交
159 160 161 162 163 164 165
            self.input_queues[self.idx].put(request.json)

            #self.input_queues[0].put(request.json)
            self.idx += 1
            if self.idx >= len(self.gpus):
                self.idx = 0
            result = self.output_queue.get()
166 167
            if not isinstance(result, dict) and result == -1:
                result = {"result": "Request Value Error"}
168
            return result
M
MRXLT 已提交
169

G
guru4elephant 已提交
170
        app_instance.run(host="0.0.0.0",
M
MRXLT 已提交
171 172 173 174
                         port=self.port,
                         threaded=False,
                         processes=1)

175 176 177
        for p in producer_list:
            p.join()

178
    def run_server(self):
M
MRXLT 已提交
179 180 181 182 183
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
184 185
        server_pros = []
        for i, service in enumerate(self.rpc_service_list):
G
guru4elephant 已提交
186
            p = Process(target=self._launch_rpc_service, args=(i, ))
187 188
            server_pros.append(p)
        for p in server_pros:
189 190
            p.start()

B
barrierye 已提交
191 192
        p_web = Process(
            target=self._launch_web_service, args=(len(self.gpus), ))
193 194
        p_web.start()
        p_web.join()
195 196
        for p in server_pros:
            p.join()
M
MRXLT 已提交
197 198 199 200

    def preprocess(self, feed={}, fetch=[]):
        return feed, fetch

M
MRXLT 已提交
201
    def postprocess(self, feed={}, fetch=[], fetch_map=None):
M
MRXLT 已提交
202
        return fetch_map