local_service_handler.py 10.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

B
barriery 已提交
15
import os
16
import logging
B
barriery 已提交
17
import multiprocessing
18 19 20
#from paddle_serving_server_gpu import OpMaker, OpSeqMaker
#from paddle_serving_server_gpu import Server as GpuServer
#from paddle_serving_server import Server as CpuServer
B
barriery 已提交
21
from . import util
22
#from paddle_serving_app.local_predict import LocalPredictor
23 24

_LOGGER = logging.getLogger(__name__)
B
barriery 已提交
25
_workdir_name_gen = util.NameGenerator("workdir_")
26 27


W
fix bug  
wangjiawei04 已提交
28
class LocalServiceHandler(object):
29 30 31 32 33 34 35
    """
    LocalServiceHandler is the processor of the local service, contains
    three client types, brpc, grpc and local_predictor.If you use the 
    brpc or grpc, serveing startup ability is provided.If you use
    local_predictor, local predict ability is provided by paddle_serving_app.
    """

36
    def __init__(self,
B
barriery 已提交
37
                 model_config,
W
wangjiawei04 已提交
38
                 client_type='local_predictor',
B
barriery 已提交
39
                 workdir="",
40 41
                 thread_num=2,
                 devices="",
42
                 fetch_names=None,
43 44
                 mem_optim=True,
                 ir_optim=False,
45 46
                 available_port_generator=None,
                 use_trt=False,
Z
zhangjun 已提交
47 48
                 use_lite=False,
                 use_xpu=False,
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
                 use_profile=False):
        """
        Initialization of localservicehandler

        Args:
           model_config: model config path
           client_type: brpc, grpc and local_predictor[default]
           workdir: work directory
           thread_num: number of threads, concurrent quantity.
           devices: gpu id list[gpu], "" default[cpu]
           fetch_names: get fetch names out of LocalServiceHandler in 
               local_predictor mode. fetch_names_ is compatible for Client().
           mem_optim: use memory/graphics memory optimization, True default.
           ir_optim: use calculation chart optimization, False default.
           available_port_generator: generate available ports
           use_trt: use nvidia tensorRt engine, False default.
Z
zhangjun 已提交
65 66
           use_lite: use Paddle-Lite engine, False default.
           use_xpu: run predict on Baidu Kunlun, False default.
67 68 69 70 71
           use_profile: use profiling, False default.

        Returns:
           None
        """
72
        if available_port_generator is None:
B
barriery 已提交
73
            available_port_generator = util.GetAvailablePortGenerator()
74

B
barriery 已提交
75
        self._model_config = model_config
76
        self._port_list = []
77
        self._device_type = "cpu"
78 79 80
        if devices == "":
            # cpu
            devices = [-1]
Z
zhangjun 已提交
81 82 83 84 85 86 87 88 89 90
            if use_lite:
                self._device_type = "arm"
                self._port_list.append(available_port_generator.next())
                _LOGGER.info("Model({}) will be launch in arm device. Port({})"
                             .format(model_config, self._port_list))
            else:
                self._device_type = "cpu"
                self._port_list.append(available_port_generator.next())
                _LOGGER.info("Model({}) will be launch in cpu device. Port({})"
                             .format(model_config, self._port_list))
91 92
        else:
            # gpu
93
            self._device_type = "gpu"
94 95 96
            devices = [int(x) for x in devices.split(",")]
            for _ in devices:
                self._port_list.append(available_port_generator.next())
B
barriery 已提交
97 98
            _LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})"
                         .format(model_config, devices, self._port_list))
99
        self._client_type = client_type
100 101 102 103 104
        self._workdir = workdir
        self._devices = devices
        self._thread_num = thread_num
        self._mem_optim = mem_optim
        self._ir_optim = ir_optim
105
        self._local_predictor_client = None
106 107
        self._rpc_service_list = []
        self._server_pros = []
108
        self._use_trt = use_trt
Z
zhangjun 已提交
109 110
        self._use_lite = use_lite
        self._use_xpu = use_xpu
111 112
        self._use_profile = use_profile
        self.fetch_names_ = fetch_names
113 114

    def get_fetch_list(self):
115
        return self.fetch_names_
116 117 118 119

    def get_port_list(self):
        return self._port_list

120
    def get_client(self, concurrency_idx):
121 122 123
        """
        Function get_client is only used for local predictor case, creates one
        LocalPredictor object, and initializes the paddle predictor by function
124
        load_model_config.The concurrency_idx is used to select running devices.  
125 126

        Args:
127
            concurrency_idx: process/thread index
128 129 130 131

        Returns:
            _local_predictor_client
        """
132 133 134 135 136 137 138 139

        #checking the legality of concurrency_idx.
        device_num = len(self._devices)
        if device_num <= 0:
            _LOGGER.error("device_num must be not greater than 0. devices({})".
                          format(self._devices))
            raise ValueError("The number of self._devices error")

T
TeslaZhao 已提交
140
        if concurrency_idx < 0:
141 142 143 144 145 146
            _LOGGER.error("concurrency_idx({}) must be one positive number".
                          format(concurrency_idx))
            concurrency_idx = 0
        elif concurrency_idx >= device_num:
            concurrency_idx = concurrency_idx % device_num

T
TeslaZhao 已提交
147 148
        _LOGGER.info("GET_CLIENT : concurrency_idx={}, device_num={}".format(
            concurrency_idx, device_num))
149 150 151 152
        from paddle_serving_app.local_predict import LocalPredictor
        if self._local_predictor_client is None:
            self._local_predictor_client = LocalPredictor()
            use_gpu = False
Z
zhangjun 已提交
153
            use_lite = False
154 155
            if self._device_type == "gpu":
                use_gpu = True
Z
zhangjun 已提交
156 157
            elif self._device_type == "arm":
                use_lite = True
158 159 160
            self._local_predictor_client.load_model_config(
                model_path=self._model_config,
                use_gpu=use_gpu,
161
                gpu_id=self._devices[concurrency_idx],
162 163 164 165
                use_profile=self._use_profile,
                thread_num=self._thread_num,
                mem_optim=self._mem_optim,
                ir_optim=self._ir_optim,
Z
zhangjun 已提交
166 167 168
                use_trt=self._use_trt,
                use_lite=use_lite,
                use_xpu=self._use_xpu)
169
        return self._local_predictor_client
W
wangjiawei04 已提交
170

B
barriery 已提交
171 172
    def get_client_config(self):
        return os.path.join(self._model_config, "serving_server_conf.prototxt")
173 174 175

    def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
                            ir_optim):
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
        """
        According to _device_type, generating one CpuServer or GpuServer, and
        setting the model config amd startup params.

        Args:
            workdir: work directory
            port: network port
            gpuid: gpu id
            thread_num: thread num
            mem_optim: use memory/graphics memory optimization
            ir_optim: use calculation chart optimization

        Returns:
            server: CpuServer/GpuServer
        """
        if self._device_type == "cpu":
            from paddle_serving_server import OpMaker, OpSeqMaker, Server
            op_maker = OpMaker()
            read_op = op_maker.create('general_reader')
            general_infer_op = op_maker.create('general_infer')
            general_response_op = op_maker.create('general_response')

            op_seq_maker = OpSeqMaker()
            op_seq_maker.add_op(read_op)
            op_seq_maker.add_op(general_infer_op)
            op_seq_maker.add_op(general_response_op)

            server = Server()
        else:
Z
zhangjun 已提交
205
            #gpu or arm
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
            from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
            op_maker = OpMaker()
            read_op = op_maker.create('general_reader')
            general_infer_op = op_maker.create('general_infer')
            general_response_op = op_maker.create('general_response')

            op_seq_maker = OpSeqMaker()
            op_seq_maker.add_op(read_op)
            op_seq_maker.add_op(general_infer_op)
            op_seq_maker.add_op(general_response_op)

            server = Server()
            if gpuid >= 0:
                server.set_gpuid(gpuid)

221 222 223 224 225 226
        server.set_op_sequence(op_seq_maker.get_op_sequence())
        server.set_num_threads(thread_num)
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)

        server.load_model_config(self._model_config)
227 228 229 230
        server.prepare_server(
            workdir=workdir, port=port, device=self._device_type)
        if self.fetch_names_ is None:
            self.fetch_names_ = server.get_fetch_list()
231 232 233
        return server

    def _start_one_server(self, service_idx):
234 235 236 237 238 239 240 241 242
        """
        Start one server
     
        Args:
            service_idx: server index
 
        Returns:
            None
        """
243 244 245
        self._rpc_service_list[service_idx].run_server()

    def prepare_server(self):
246 247 248
        """
        Prepare all servers to be started, and append them into list. 
        """
249
        for i, device_id in enumerate(self._devices):
B
barriery 已提交
250
            if self._workdir != "":
251 252 253 254 255 256 257 258 259 260 261 262 263
                workdir = "{}_{}".format(self._workdir, i)
            else:
                workdir = _workdir_name_gen.next()
            self._rpc_service_list.append(
                self._prepare_one_server(
                    workdir,
                    self._port_list[i],
                    device_id,
                    thread_num=self._thread_num,
                    mem_optim=self._mem_optim,
                    ir_optim=self._ir_optim))

    def start_server(self):
264 265 266
        """
        Start multiple processes and start one server in each process
        """
267
        for i, service in enumerate(self._rpc_service_list):
B
barriery 已提交
268 269 270
            p = multiprocessing.Process(
                target=self._start_one_server, args=(i, ))
            p.daemon = True
271 272 273
            self._server_pros.append(p)
        for p in self._server_pros:
            p.start()