web_service.py 4.8 KB
Newer Older
1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
M
MRXLT 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
B
barrierye 已提交
14

M
MRXLT 已提交
15 16
from flask import Flask, request, abort
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
17
import paddle_serving_server_gpu as serving
M
MRXLT 已提交
18
from multiprocessing import Pool, Process, Queue
M
MRXLT 已提交
19
from paddle_serving_client import Client
M
MRXLT 已提交
20 21 22 23
from paddle_serving_server_gpu.serve import start_multi_card

import sys
import numpy as np
M
MRXLT 已提交
24 25 26 27 28


class WebService(object):
    def __init__(self, name="default_service"):
        self.name = name
29 30
        self.gpus = []
        self.rpc_service_list = []
31

M
MRXLT 已提交
32 33 34
    def load_model_config(self, model_config):
        self.model_config = model_config

35
    def set_gpus(self, gpus):
G
guru4elephant 已提交
36
        self.gpus = [int(x) for x in gpus.split(",")]
37

B
barrierye 已提交
38 39 40 41 42
    def default_rpc_service(self,
                            workdir="conf",
                            port=9292,
                            gpuid=0,
                            thread_num=10):
43 44
        device = "gpu"
        if gpuid == -1:
G
guru4elephant 已提交
45
            device = "cpu"
46
        op_maker = serving.OpMaker()
M
MRXLT 已提交
47 48 49
        read_op = op_maker.create('general_reader')
        general_infer_op = op_maker.create('general_infer')
        general_response_op = op_maker.create('general_response')
B
barrierye 已提交
50

51
        op_seq_maker = serving.OpSeqMaker()
M
MRXLT 已提交
52 53 54
        op_seq_maker.add_op(read_op)
        op_seq_maker.add_op(general_infer_op)
        op_seq_maker.add_op(general_response_op)
B
barrierye 已提交
55

56
        server = serving.Server()
M
MRXLT 已提交
57
        server.set_op_sequence(op_seq_maker.get_op_sequence())
58
        server.set_num_threads(thread_num)
B
barrierye 已提交
59

60
        server.load_model_config(self.model_config)
G
guru4elephant 已提交
61 62
        if gpuid >= 0:
            server.set_gpuid(gpuid)
63 64 65 66 67
        server.prepare_server(workdir=workdir, port=port, device=device)
        return server

    def _launch_rpc_service(self, service_idx):
        self.rpc_service_list[service_idx].run_server()
M
MRXLT 已提交
68 69 70 71 72 73

    def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
        self.workdir = workdir
        self.port = port
        self.device = device
        self.gpuid = gpuid
74 75 76
        if len(self.gpus) == 0:
            # init cpu service
            self.rpc_service_list.append(
B
barrierye 已提交
77 78
                self.default_rpc_service(
                    self.workdir, self.port + 1, -1, thread_num=10))
79 80 81
        else:
            for i, gpuid in enumerate(self.gpus):
                self.rpc_service_list.append(
B
barrierye 已提交
82 83 84 85 86
                    self.default_rpc_service(
                        "{}_{}".format(self.workdir, i),
                        self.port + 1 + i,
                        gpuid,
                        thread_num=10))
M
MRXLT 已提交
87

M
MRXLT 已提交
88 89 90 91
    def _launch_web_service(self):
        gpu_num = len(self.gpus)
        self.client = Client()
        self.client.load_client_config("{}/serving_server_conf.prototxt".format(
92
            self.model_config))
D
dongdaxiang 已提交
93 94 95 96 97 98 99
        endpoints = ""
        if gpu_num > 0:
            for i in range(gpu_num):
                endpoints += "127.0.0.1:{},".format(self.port + i + 1)
        else:
            endpoints = "127.0.0.1:{}".format(self.port + 1)
        self.client.connect([endpoints])
D
dongdaxiang 已提交
100

D
dongdaxiang 已提交
101
    def get_prediction(self, request):
D
dongdaxiang 已提交
102 103 104 105
        if not request.json:
            abort(400)
        if "fetch" not in request.json:
            abort(400)
D
dongdaxiang 已提交
106 107 108 109 110 111 112
        feed, fetch = self.preprocess(request.json, request.json["fetch"])
        fetch_map_batch = self.client.predict(feed=feed, fetch=fetch)
        fetch_map_batch = self.postprocess(
            feed=request.json, fetch=fetch, fetch_map=fetch_map_batch)
        for key in fetch_map_batch:
            fetch_map_batch[key] = fetch_map_batch[key].tolist()
        result = {"result": fetch_map_batch}
D
dongdaxiang 已提交
113
        return result
114

115
    def run_server(self):
M
MRXLT 已提交
116 117 118 119 120
        import socket
        localIP = socket.gethostbyname(socket.gethostname())
        print("web service address:")
        print("http://{}:{}/{}/prediction".format(localIP, self.port,
                                                  self.name))
121 122
        server_pros = []
        for i, service in enumerate(self.rpc_service_list):
G
guru4elephant 已提交
123
            p = Process(target=self._launch_rpc_service, args=(i, ))
124 125
            server_pros.append(p)
        for p in server_pros:
126 127
            p.start()

M
MRXLT 已提交
128 129 130
    def preprocess(self, feed={}, fetch=[]):
        return feed, fetch

M
MRXLT 已提交
131
    def postprocess(self, feed={}, fetch=[], fetch_map=None):
M
MRXLT 已提交
132
        return fetch_map