local_service_handler.py 14.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

B
barriery 已提交
15
import os
16
import logging
B
barriery 已提交
17
import multiprocessing
18
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode
Z
zhangjun 已提交
19 20
#from paddle_serving_server import OpMaker, OpSeqMaker
#from paddle_serving_server import Server as GpuServer
21
#from paddle_serving_server import Server as CpuServer
B
barriery 已提交
22
from . import util
23
#from paddle_serving_app.local_predict import LocalPredictor
24 25

_LOGGER = logging.getLogger(__name__)
B
barriery 已提交
26
_workdir_name_gen = util.NameGenerator("workdir_")
27 28


W
fix bug  
wangjiawei04 已提交
29
class LocalServiceHandler(object):
30 31 32 33 34 35 36
    """
    LocalServiceHandler is the processor of the local service, contains
    three client types, brpc, grpc and local_predictor.If you use the 
    brpc or grpc, serveing startup ability is provided.If you use
    local_predictor, local predict ability is provided by paddle_serving_app.
    """

37
    def __init__(self,
B
barriery 已提交
38
                 model_config,
W
wangjiawei04 已提交
39
                 client_type='local_predictor',
B
barriery 已提交
40
                 workdir="",
41
                 thread_num=2,
42
                 device_type=-1,
43
                 devices="",
44
                 fetch_names=None,
45 46
                 mem_optim=True,
                 ir_optim=False,
47
                 available_port_generator=None,
48
                 use_profile=False,
T
TeslaZhao 已提交
49 50 51 52
                 precision="fp32",
                 use_mkldnn=False,
                 mkldnn_cache_capacity=0,
                 mkldnn_op_list=None,
F
felixhjh 已提交
53 54
                 mkldnn_bf16_op_list=None,
                 min_subgraph_size=3,
55 56
                 dynamic_shape_info={},
                 use_calib=False):
57 58 59 60 61 62 63 64
        """
        Initialization of localservicehandler

        Args:
           model_config: model config path
           client_type: brpc, grpc and local_predictor[default]
           workdir: work directory
           thread_num: number of threads, concurrent quantity.
65 66
           device_type: support multiple devices. -1=Not set, determined by
               `devices`. 0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
67 68 69 70 71 72 73
           devices: gpu id list[gpu], "" default[cpu]
           fetch_names: get fetch names out of LocalServiceHandler in 
               local_predictor mode. fetch_names_ is compatible for Client().
           mem_optim: use memory/graphics memory optimization, True default.
           ir_optim: use calculation chart optimization, False default.
           available_port_generator: generate available ports
           use_profile: use profiling, False default.
74
           precision: inference precesion, e.g. "fp32", "fp16", "int8"
T
TeslaZhao 已提交
75 76 77 78
           use_mkldnn: use mkldnn, default False.
           mkldnn_cache_capacity: cache capacity of mkldnn, 0 means no limit.
           mkldnn_op_list: OP list optimized by mkldnn, None default.
           mkldnn_bf16_op_list: OP list optimized by mkldnn bf16, None default.
79
           use_calib: set inference use_calib_mode param, False default.
80 81 82 83

        Returns:
           None
        """
84
        if available_port_generator is None:
B
barriery 已提交
85
            available_port_generator = util.GetAvailablePortGenerator()
86

B
barriery 已提交
87
        self._model_config = model_config
88
        self._port_list = []
89 90 91 92 93
        self._device_name = "cpu"
        self._use_gpu = False
        self._use_trt = False
        self._use_lite = False
        self._use_xpu = False
94
        self._use_ascend_cl = False
T
TeslaZhao 已提交
95 96 97 98
        self._use_mkldnn = False
        self._mkldnn_cache_capacity = 0
        self._mkldnn_op_list = None
        self._mkldnn_bf16_op_list = None
F
felixhjh 已提交
99 100
        self.min_subgraph_size = 3
        self.dynamic_shape_info = {}
101
        self._use_calib = False
102 103 104 105 106 107 108

        if device_type == -1:
            # device_type is not set, determined by `devices`, 
            if devices == "":
                # CPU
                self._device_name = "cpu"
                devices = [-1]
Z
zhangjun 已提交
109
            else:
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
                # GPU
                self._device_name = "gpu"
                self._use_gpu = True
                devices = [int(x) for x in devices.split(",")]

        elif device_type == 0:
            # CPU
            self._device_name = "cpu"
            devices = [-1]
        elif device_type == 1:
            # GPU
            self._device_name = "gpu"
            self._use_gpu = True
            devices = [int(x) for x in devices.split(",")]
        elif device_type == 2:
            # Nvidia Tensor RT
            self._device_name = "gpu"
            self._use_gpu = True
128
            devices = [int(x) for x in devices.split(",")]
129
            self._use_trt = True
F
felixhjh 已提交
130 131
            self.min_subgraph_size = min_subgraph_size
            self.dynamic_shape_info = dynamic_shape_info
132 133 134 135 136 137 138 139 140 141 142
        elif device_type == 3:
            # ARM CPU
            self._device_name = "arm"
            devices = [-1]
            self._use_lite = True
        elif device_type == 4:
            # Kunlun XPU
            self._device_name = "arm"
            devices = [int(x) for x in devices.split(",")]
            self._use_lite = True
            self._use_xpu = True
143 144 145 146 147 148
        elif device_type == 5:
            # Ascend 310 ARM CPU
            self._device_name = "arm"
            devices = [int(x) for x in devices.split(",")]
            self._use_lite = True
            self._use_ascend_cl = True
S
ShiningZhang 已提交
149 150 151 152 153
        elif device_type == 6:
            # Ascend 910 ARM CPU
            self._device_name = "arm"
            devices = [int(x) for x in devices.split(",")]
            self._use_ascend_cl = True
154 155 156 157 158 159
        else:
            _LOGGER.error(
                "LocalServiceHandler initialization fail. device_type={}"
                .format(device_type))

        if client_type == "brpc" or client_type == "grpc":
160 161
            for _ in devices:
                self._port_list.append(available_port_generator.next())
162 163 164
            _LOGGER.info("Create ports for devices:{}. Port:{}"
                         .format(devices, self._port_list))

165
        self._client_type = client_type
166 167 168 169 170
        self._workdir = workdir
        self._devices = devices
        self._thread_num = thread_num
        self._mem_optim = mem_optim
        self._ir_optim = ir_optim
171
        self._local_predictor_client = None
172 173
        self._rpc_service_list = []
        self._server_pros = []
174
        self._use_profile = use_profile
175
        self._fetch_names = fetch_names
176
        self._precision = precision
T
TeslaZhao 已提交
177 178 179 180
        self._use_mkldnn = use_mkldnn
        self._mkldnn_cache_capacity = mkldnn_cache_capacity
        self._mkldnn_op_list = mkldnn_op_list
        self._mkldnn_bf16_op_list = mkldnn_bf16_op_list
181
        self._use_calib = use_calib
182 183 184 185 186

        _LOGGER.info(
            "Models({}) will be launched by device {}. use_gpu:{}, "
            "use_trt:{}, use_lite:{}, use_xpu:{}, device_type:{}, devices:{}, "
            "mem_optim:{}, ir_optim:{}, use_profile:{}, thread_num:{}, "
187 188
            "client_type:{}, fetch_names:{}, precision:{}, use_calib:{}, "
            "use_mkldnn:{}, mkldnn_cache_capacity:{}, mkldnn_op_list:{}, "
F
felixhjh 已提交
189 190
            "mkldnn_bf16_op_list:{}, use_ascend_cl:{}, min_subgraph_size:{},"
            "is_set_dynamic_shape_info:{}".format(
191
                model_config, self._device_name, self._use_gpu, self._use_trt,
H
fix_bug  
HexToString 已提交
192 193 194 195 196
                self._use_lite, self._use_xpu, device_type, self._devices, self.
                _mem_optim, self._ir_optim, self._use_profile, self._thread_num,
                self._client_type, self._fetch_names, self._precision, self.
                _use_calib, self._use_mkldnn, self._mkldnn_cache_capacity, self.
                _mkldnn_op_list, self._mkldnn_bf16_op_list, self._use_ascend_cl,
197
                self.min_subgraph_size, bool(len(self.dynamic_shape_info))))
198 199

    def get_fetch_list(self):
200
        return self._fetch_names
201 202 203 204

    def get_port_list(self):
        return self._port_list

205
    def get_client(self, concurrency_idx):
206 207 208
        """
        Function get_client is only used for local predictor case, creates one
        LocalPredictor object, and initializes the paddle predictor by function
209
        load_model_config.The concurrency_idx is used to select running devices.  
210 211

        Args:
212
            concurrency_idx: process/thread index
213 214 215 216

        Returns:
            _local_predictor_client
        """
217 218 219 220 221 222 223 224

        #checking the legality of concurrency_idx.
        device_num = len(self._devices)
        if device_num <= 0:
            _LOGGER.error("device_num must be not greater than 0. devices({})".
                          format(self._devices))
            raise ValueError("The number of self._devices error")

T
TeslaZhao 已提交
225
        if concurrency_idx < 0:
226 227 228 229 230 231
            _LOGGER.error("concurrency_idx({}) must be one positive number".
                          format(concurrency_idx))
            concurrency_idx = 0
        elif concurrency_idx >= device_num:
            concurrency_idx = concurrency_idx % device_num

T
TeslaZhao 已提交
232 233
        _LOGGER.info("GET_CLIENT : concurrency_idx={}, device_num={}".format(
            concurrency_idx, device_num))
234 235 236
        from paddle_serving_app.local_predict import LocalPredictor
        if self._local_predictor_client is None:
            self._local_predictor_client = LocalPredictor()
T
TeslaZhao 已提交
237
            # load model config and init predictor
238 239
            self._local_predictor_client.load_model_config(
                model_path=self._model_config,
240
                use_gpu=self._use_gpu,
241
                gpu_id=self._devices[concurrency_idx],
242 243 244 245
                use_profile=self._use_profile,
                thread_num=self._thread_num,
                mem_optim=self._mem_optim,
                ir_optim=self._ir_optim,
Z
zhangjun 已提交
246
                use_trt=self._use_trt,
247
                use_lite=self._use_lite,
248
                use_xpu=self._use_xpu,
T
TeslaZhao 已提交
249 250 251 252
                precision=self._precision,
                use_mkldnn=self._use_mkldnn,
                mkldnn_cache_capacity=self._mkldnn_cache_capacity,
                mkldnn_op_list=self._mkldnn_op_list,
253
                mkldnn_bf16_op_list=self._mkldnn_bf16_op_list,
F
felixhjh 已提交
254 255
                use_ascend_cl=self._use_ascend_cl,
                min_subgraph_size=self.min_subgraph_size,
256 257
                dynamic_shape_info=self.dynamic_shape_info,
                use_calib=self._use_calib)
258
        return self._local_predictor_client
W
wangjiawei04 已提交
259

B
barriery 已提交
260 261
    def get_client_config(self):
        return os.path.join(self._model_config, "serving_server_conf.prototxt")
262 263

    def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
264
                            ir_optim, precision):
265
        """
266
        According to self._device_name, generating one Cpu/Gpu/Arm Server, and
267 268 269 270 271 272 273 274 275
        setting the model config amd startup params.

        Args:
            workdir: work directory
            port: network port
            gpuid: gpu id
            thread_num: thread num
            mem_optim: use memory/graphics memory optimization
            ir_optim: use calculation chart optimization
276
            precision: inference precison, e.g."fp32", "fp16", "int8"
277 278 279 280

        Returns:
            server: CpuServer/GpuServer
        """
281
        if self._device_name == "cpu":
282 283
            from paddle_serving_server import OpMaker, OpSeqMaker, Server
            op_maker = OpMaker()
H
fix_bug  
HexToString 已提交
284 285 286
            read_op = op_maker.create('GeneralReaderOp')
            general_infer_op = op_maker.create('GeneralInferOp')
            general_response_op = op_maker.create('GeneralResponseOp')
287 288 289 290 291 292 293 294

            op_seq_maker = OpSeqMaker()
            op_seq_maker.add_op(read_op)
            op_seq_maker.add_op(general_infer_op)
            op_seq_maker.add_op(general_response_op)

            server = Server()
        else:
Z
zhangjun 已提交
295
            #gpu or arm
Z
zhangjun 已提交
296
            from paddle_serving_server import OpMaker, OpSeqMaker, Server
297
            op_maker = OpMaker()
H
fix_bug  
HexToString 已提交
298 299 300
            read_op = op_maker.create('GeneralReaderOp')
            general_infer_op = op_maker.create('GeneralInferOp')
            general_response_op = op_maker.create('GeneralResponseOp')
301 302 303 304 305 306 307 308 309

            op_seq_maker = OpSeqMaker()
            op_seq_maker.add_op(read_op)
            op_seq_maker.add_op(general_infer_op)
            op_seq_maker.add_op(general_response_op)

            server = Server()
            if gpuid >= 0:
                server.set_gpuid(gpuid)
Z
zhangjun 已提交
310
            # TODO: support arm or arm + xpu later
Z
fix bug  
zhangjun 已提交
311
            server.set_device(self._device_name)
S
ShiningZhang 已提交
312 313 314 315
            if self._use_xpu:
                server.set_xpu()
            if self._use_lite:
                server.set_lite()
316 317
            if self._use_ascend_cl:
                server.set_ascend_cl()
318

319 320 321 322
        server.set_op_sequence(op_seq_maker.get_op_sequence())
        server.set_num_threads(thread_num)
        server.set_memory_optimize(mem_optim)
        server.set_ir_optimize(ir_optim)
323
        server.set_precision(precision)
324 325

        server.load_model_config(self._model_config)
326
        server.prepare_server(
327 328 329
            workdir=workdir, port=port, device=self._device_name)
        if self._fetch_names is None:
            self._fetch_names = server.get_fetch_list()
330 331 332
        return server

    def _start_one_server(self, service_idx):
333 334 335 336 337 338 339 340 341
        """
        Start one server
     
        Args:
            service_idx: server index
 
        Returns:
            None
        """
342 343 344
        self._rpc_service_list[service_idx].run_server()

    def prepare_server(self):
345 346 347
        """
        Prepare all servers to be started, and append them into list. 
        """
348
        for i, device_id in enumerate(self._devices):
B
barriery 已提交
349
            if self._workdir != "":
350 351 352 353 354 355 356 357 358 359
                workdir = "{}_{}".format(self._workdir, i)
            else:
                workdir = _workdir_name_gen.next()
            self._rpc_service_list.append(
                self._prepare_one_server(
                    workdir,
                    self._port_list[i],
                    device_id,
                    thread_num=self._thread_num,
                    mem_optim=self._mem_optim,
360 361
                    ir_optim=self._ir_optim,
                    precision=self._precision))
362 363

    def start_server(self):
364 365 366
        """
        Start multiple processes and start one server in each process
        """
367
        for i, _ in enumerate(self._rpc_service_list):
B
barriery 已提交
368 369 370
            p = multiprocessing.Process(
                target=self._start_one_server, args=(i, ))
            p.daemon = True
371 372 373
            self._server_pros.append(p)
        for p in self._server_pros:
            p.start()