web_service.py 9.8 KB
Newer Older
1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
M
MRXLT 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
# pylint: disable=doc-string-missing
B
barrierye 已提交
15

M
MRXLT 已提交
16
from flask import Flask, request, abort
M
MRXLT 已提交
17
from contextlib import closing
M
MRXLT 已提交
18
from multiprocessing import Pool, Process, Queue
M
MRXLT 已提交
19
from paddle_serving_client import Client
M
MRXLT 已提交
20
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
M
MRXLT 已提交
21
from paddle_serving_server_gpu.serve import start_multi_card
M
MRXLT 已提交
22
import socket
M
MRXLT 已提交
23 24
import sys
import numpy as np
M
MRXLT 已提交
25
import paddle_serving_server_gpu as serving
M
MRXLT 已提交
26

B
barriery 已提交
27 28
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator
M
MRXLT 已提交
29

B
barriery 已提交
30 31 32 33 34

class DefaultRpcServer(object):
    def __init__(self, available_port_generator):
        self.available_port_generator = available_port_generator
        self.gpus = None
35
        self.rpc_service_list = []
B
barriery 已提交
36 37 38 39 40 41 42 43
        self.server_pros = []
        self.port_list = []
        self.model_config = None
        self.workdir = None
        self.device = None

    def get_port_list(self):
        return self.port_list
44

M
MRXLT 已提交
45 46 47
    def load_model_config(self, model_config):
        self.model_config = model_config

48
    def set_gpus(self, gpus):
G
guru4elephant 已提交
49
        self.gpus = [int(x) for x in gpus.split(",")]
50

B
barriery 已提交
51
    def _prepare_one_server(self,
B
barrierye 已提交
52 53 54
                            workdir="conf",
                            port=9292,
                            gpuid=0,
M
MRXLT 已提交
55 56 57
                            thread_num=2,
                            mem_optim=True,
                            ir_optim=False):
58 59
        device = "gpu"
        if gpuid == -1:
G
guru4elephant 已提交
60
            device = "cpu"
61
        op_maker = serving.OpMaker()
M
MRXLT 已提交
62 63 64
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
B
barrierye 已提交
65

G
gongweibao 已提交
66
        op_seq_maker = OpSeqMaker()
M
MRXLT 已提交
67 68 69
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
B
barrierye 已提交
70

G
gongweibao 已提交
71
        server = Server()
M
MRXLT 已提交
72
        server.set_op_sequence(op_seq_maker.get_op_sequence())
73
        server.set_num_threads(thread_num)
M
bug fix  
MRXLT 已提交
74 75
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)
B
barrierye 已提交
76

77
        server.load_model_config(self.model_config)
G
guru4elephant 已提交
78 79
        if gpuid >= 0:
            server.set_gpuid(gpuid)
80 81 82
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

B
barriery 已提交
83
    def _start_one_server(self, service_idx):
84
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
85

M
MRXLT 已提交
86 87 88 89 90
    def prepare_server(self,
                       workdir="",
                       device="gpu",
                       mem_optim=True,
                       ir_optim=False):
M
MRXLT 已提交
91 92
        self.workdir = workdir
        self.device = device
M
MRXLT 已提交
93
        default_port = 12000
B
barriery 已提交
94 95
        while len(self.port_list) < len(self.gpus):
            self.port_list.append(self.available_port_generator.next())
M
MRXLT 已提交
96

97 98 99
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
B
barrierye 已提交
100
                self.default_rpc_service(
M
MRXLT 已提交
101 102 103 104
                    self.workdir,
                    self.port_list[0],
                    -1,
                    thread_num=2,
M
bug fix  
MRXLT 已提交
105 106
                    mem_optim=mem_optim,
                    ir_optim=ir_optim))
107 108 109
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
B
barriery 已提交
110
                    self._prepare_one_server(
B
barrierye 已提交
111
                        "{}_{}".format(self.workdir, i),
M
MRXLT 已提交
112
                        self.port_list[i],
B
barrierye 已提交
113
                        gpuid,
M
MRXLT 已提交
114
                        thread_num=2,
M
bug fix  
MRXLT 已提交
115 116
                        mem_optim=mem_optim,
                        ir_optim=ir_optim))
M
MRXLT 已提交
117

B
barriery 已提交
118
    def start_server(self):
M
MRXLT 已提交
119
        import socket
120
        for i, service in enumerate(self.rpc_service_list):
B
barriery 已提交
121 122 123
            p = Process(target=self._start_one_server, args=(i, ))
            self.server_pros.append(p)
        for p in self.server_pros:
124 125
            p.start()

M
MRXLT 已提交
126

B
barriery 已提交
127 128 129 130
class DefaultPipelineServer(object):
    def __init__(self, available_port_generator):
        self.server = pipeline.PipelineServer()
        self.available_port_generator = available_port_generator
M
MRXLT 已提交
131

B
barriery 已提交
132 133 134 135
    def create_internel_op_class(self, f_preprocess, f_postprocess):
        class InternelOp(pipeline.Op):
            def init_op(self):
                pass
M
MRXLT 已提交
136

B
barriery 已提交
137 138 139 140
            def preprocess(self, input_dicts):
                (_, input_dict), = input_dicts.items()
                preped_data = f_preprocess(input_dict)
                return preped_data
M
MRXLT 已提交
141

B
barriery 已提交
142 143 144 145
            def postprocess(self, input_dicts, fetch_dict):
                (_, input_dict), = input_dicts.items()
                postped_data = f_postprocess(input_dict, fetch_dict)
                return postped_data
M
MRXLT 已提交
146

B
barriery 已提交
147
        return InternelOp
148

B
barriery 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
    def init_server(self,
                    internel_op_class,
                    internel_op_name,
                    internel_op_endpoints,
                    internel_op_client_config,
                    internel_op_concurrency,
                    internel_op_timeout=-1,
                    internel_op_retry=1,
                    internel_op_batch_size=1,
                    internel_op_auto_batching_timeout=None):
        read_op = pipeline.RequestOp()
        internel_op = internel_op_class(
            name=internel_op_name,
            input_ops=[read_op],
            server_endpoints=internel_op_endpoints,
            client_config=internel_op_client_config,
            concurrency=internel_op_concurrency,
            timeout=internel_op_timeout,
            retry=internel_op_retry,
            batch_size=internel_op_batch_size,
            auto_batching_timeout=internel_op_auto_batching_timeout)
        response_op = pipeline.ResponseOp(input_ops=[internel_op])
        self.server.set_response_op(response_op)
172

B
barriery 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    def prepare_server(self,
                       rpc_port,
                       http_port,
                       worker_num,
                       build_dag_each_worker=False,
                       is_thread_op=False,
                       client_type="brpc",
                       retry=1,
                       use_profile=False,
                       tracer_interval_s=-1):
        default_server_conf = {
            "port": rpc_port,
            "worker_num": worker_num,
            "build_dag_each_worker": build_dag_each_worker,
            "grpc_gateway_port": http_port,
            "dag": {
                "is_thread_op": is_thread_op,
                "client_type": client_type,
                "retry": retry,
                "use_profile": use_profile,
                "tracer": {
                    "interval_s": tracer_interval_s,
                }
            }
        }
        self.server.prepare_server(yml_dict=default_server_conf)
199

B
barriery 已提交
200 201
    def start_server(self):
        self.server.run_server()
202 203


B
barriery 已提交
204 205 206 207 208 209 210 211 212 213
class PipelineWebService(object):
    def __init__(self, name="default"):
        self.name = name
        self.port = None
        self.model_config = None
        self.available_port_generator = AvailablePortGenerator(12000)
        self.default_rpc_server = DefaultRpcServer(
            self.available_port_generator)
        self.default_pipeline_server = DefaultPipelineServer(
            self.available_port_generator)
214

B
barriery 已提交
215 216 217 218 219 220
    def load_model_config(self, model_config):
        self.model_config = model_config
        self.default_rpc_server.load_model_config(model_config)

    def set_gpus(self, gpus):
        self.default_rpc_server.set_gpus(gpus)
M
MRXLT 已提交
221

B
barriery 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    def prepare_server(self,
                       workdir="",
                       port=9393,
                       device="gpu",
                       worker_num=4,
                       mem_optim=True,
                       ir_optim=False):
        if not self.available_port_generator.port_is_available(port):
            raise SystemExit(
                "Failed to prepare server: prot({}) is not available".format(
                    port))
        self.port = port

        # rpc server
        self.default_rpc_server.prepare_server(
            workdir=workdir,
            device=device,
            mem_optim=mem_optim,
            ir_optim=ir_optim)
        rpc_endpoints = self.default_rpc_server.get_port_list()

        # pipeline server
        internel_op_class = self.default_pipeline_server.create_internel_op_class(
            self.preprocess, self.postprocess)
        internel_op_endpoints = [
            "127.0.0.1:{}".format(port) for port in rpc_endpoints
        ]
        self.default_pipeline_server.init_server(
            internel_op_class=internel_op_class,
            internel_op_name=self.name,
            internel_op_endpoints=internel_op_endpoints,
            internel_op_client_config="{}/serving_server_conf.prototxt".format(
                self.model_config),
            internel_op_concurrency=worker_num)
        self.default_pipeline_server.prepare_server(
            rpc_port=self.available_port_generator.next(),
            http_port=self.port,
            worker_num=worker_num)

    def run_service(self):
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address: http://{}:{}/prediction"
              .format(localIP, self.port))
        self.default_rpc_server.start_server()
        self.default_pipeline_server.start_server()
M
MRXLT 已提交
268

B
barriery 已提交
269 270
    def preprocess(self, feed_dict):
        return feed_dict
M
MRXLT 已提交
271

B
barriery 已提交
272 273
    def postprocess(self, feed_dict, fetch_dict):
        return fetch_dict