web_service.py 5.0 KB
Newer Older
M
MRXLT 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
from flask import Flask, request, abort
from multiprocessing import Pool, Process
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
18
import paddle_serving_server_gpu as serving
M
MRXLT 已提交
19
from paddle_serving_client import Client
20 21 22
from .serve import start_multi_card
import time
import random
M
MRXLT 已提交
23 24 25 26 27


class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
28 29
        self.gpus = []
        self.rpc_service_list = []
M
MRXLT 已提交
30 31 32 33

    def load_model_config(self, model_config):
        self.model_config = model_config

34 35 36 37 38 39 40
    def set_gpus(self, gpus):
        self.gpus = gpus

    def default_rpc_service(self, workdir="conf", port=9292,
                            gpuid=0, thread_num=10):
        device = "gpu"
        if gpuid == -1:
G
guru4elephant 已提交
41
            device = "cpu"
42
        op_maker = serving.OpMaker()
M
MRXLT 已提交
43 44 45
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
46 47
        
        op_seq_maker = serving.OpSeqMaker()
M
MRXLT 已提交
48 49 50
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
51 52
        
        server = serving.Server()
M
MRXLT 已提交
53
        server.set_op_sequence(op_seq_maker.get_op_sequence())
54 55 56
        server.set_num_threads(thread_num)
        
        server.load_model_config(self.model_config)
G
guru4elephant 已提交
57 58
        if gpuid >= 0:
            server.set_gpuid(gpuid)
59 60 61 62 63
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
64 65 66 67 68 69

    def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
        self.workdir = workdir
        self.port = port
        self.device = device
        self.gpuid = gpuid
70 71 72 73 74 75 76 77 78 79 80
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
                self.default_rpc_service(self.workdir, self.port+1,
                                         -1, thread_num=10))
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
                    self.default_rpc_service("{}_{}".format(self.workdir, i),
                                             self.port+1+i,
                                             gpuid, thread_num=10))
M
MRXLT 已提交
81

82
    def _launch_web_service(self, gpu_num):
M
MRXLT 已提交
83
        app_instance = Flask(__name__)
84 85 86 87 88 89 90 91 92 93
        client_list = []
        if gpu_num > 1:
            gpu_num = 0
        for i in range(gpu_num):
            client_service = Client()
            client_service.load_client_config(
                "{}/serving_server_conf.prototxt".format(self.model_config))
            client_service.connect(["127.0.0.1:{}".format(self.port + i + 1)])
            client_list.append(client_service)
            time.sleep(1)
M
MRXLT 已提交
94 95 96 97 98 99 100 101 102
        service_name = "/" + self.name + "/prediction"

        @app_instance.route(service_name, methods=['POST'])
        def get_prediction():
            if not request.json:
                abort(400)
            if "fetch" not in request.json:
                abort(400)
            feed, fetch = self.preprocess(request.json, request.json["fetch"])
103 104
            fetch_map = client_list[0].predict(
                feed=feed, fetch=fetch)
M
MRXLT 已提交
105 106 107 108 109 110 111 112 113
            fetch_map = self.postprocess(
                feed=request.json, fetch=fetch, fetch_map=fetch_map)
            return fetch_map

        app_instance.run(host="127.0.0.1",
                         port=self.port,
                         threaded=False,
                         processes=1)

114
    def run_server(self):
M
MRXLT 已提交
115 116 117 118 119
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
G
guru4elephant 已提交
120

121 122 123 124 125 126 127 128 129 130 131 132 133
        rpc_processes = []
        for idx in range(len(self.rpc_service_list)):
            p_rpc = Process(target=self._launch_rpc_service, args=(idx,))
            rpc_processes.append(p_rpc)

        for p in rpc_processes:
            p.start()

        p_web = Process(target=self._launch_web_service, args=(len(self.gpus),))
        p_web.start()
        for p in rpc_processes:
            p.join()
        p_web.join()
M
MRXLT 已提交
134 135 136 137 138 139

    def preprocess(self, feed={}, fetch=[]):
        return feed, fetch

    def postprocess(self, feed={}, fetch=[], fetch_map={}):
        return fetch_map