server.py 30.2 KB
Newer Older
Z
update  
zhangjun 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
M
MRXLT 已提交
14 15 16

import os
import tarfile
M
MRXLT 已提交
17
import socket
Z
zhangjun 已提交
18
import paddle_serving_server as paddle_serving_server
H
HexToString 已提交
19
from paddle_serving_server.serve import format_gpu_to_strlist
Z
update  
zhangjun 已提交
20 21 22
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
23
import time
24
from .version import version_tag, version_suffix, device_type
M
MRXLT 已提交
25
from contextlib import closing
G
guru4elephant 已提交
26
import argparse
Z
zhangjun 已提交
27

J
Jiawei Wang 已提交
28
import sys
W
wangjiawei04 已提交
29 30
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
31
import shutil
Z
update  
zhangjun 已提交
32
import platform
B
barrierye 已提交
33
import numpy as np
B
barrierye 已提交
34
import sys
35
import collections
Z
update  
zhangjun 已提交
36
import subprocess
Z
zhangjun 已提交
37

B
barrierye 已提交
38 39 40
from multiprocessing import Pool, Process
from concurrent import futures

Z
update  
zhangjun 已提交
41

H
HexToString 已提交
42 43
# The whole file is about to be discarded.
# We will use default config-file to start C++Server.
M
MRXLT 已提交
44 45
class Server(object):
    def __init__(self):
H
HexToString 已提交
46 47 48 49 50 51
        """
        self.model_toolkit_conf:'list'=[] # The quantity of self.model_toolkit_conf is equal to the InferOp quantity/Engine--OP
        self.model_conf:'collections.OrderedDict()' # Save the serving_server_conf.prototxt content (feed and fetch information) this is a map for multi-model in a workflow
        self.workflow_fn:'str'="workflow.prototxt" # Only one for one Service/Workflow
        self.resource_fn:'str'="resource.prototxt" # Only one for one Service,model_toolkit_fn and general_model_config_fn is recorded in this file
        self.infer_service_fn:'str'="infer_service.prototxt" # Only one for one Service,Service--Workflow
H
HexToString 已提交
52 53
        self.model_toolkit_fn:'list'=[] # ["GeneralInferOp_0/model_toolkit.prototxt"]The quantity is equal to the InferOp quantity,Engine--OP
        self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
H
HexToString 已提交
54 55
        self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name
        self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
56 57 58 59 60 61 62 63
        self.enable_dist_model: bool, enable distributed model, false default
        self.dist_carrier_id: string, mark distributed model carrier name, "" default.
        self.dist_cfg_file: string, file name of distributed configure, "" default.
        self.dist_nranks: int, number of distributed nodes, 0 default.
        self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default.
        self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default.
        self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default.
        self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default.
H
HexToString 已提交
64
        """
M
MRXLT 已提交
65 66
        self.server_handle_ = None
        self.infer_service_conf = None
H
HexToString 已提交
67
        self.model_toolkit_conf = []
M
MRXLT 已提交
68 69
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
70
        self.ir_optimization = False
Z
zhangjun 已提交
71
        self.model_conf = collections.OrderedDict()
H
HexToString 已提交
72 73 74
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
Z
zhangjun 已提交
75 76 77
        self.model_toolkit_fn = []
        self.general_model_config_fn = []
        self.subdirectory = []
W
wangjiawei04 已提交
78
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
79 80
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
81
        self.num_threads = 2
M
MRXLT 已提交
82
        self.port = 8080
83 84
        self.precision = "fp32"
        self.use_calib = False
M
MRXLT 已提交
85
        self.reload_interval_s = 10
M
MRXLT 已提交
86
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
87 88
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
89
        self.use_local_bin = False
Z
zhangjun 已提交
90
        self.mkl_flag = False
Z
zhangjun 已提交
91
        self.device = "cpu"
92
        self.gpuid = []
H
HexToString 已提交
93 94
        self.runtime_thread_num = [0]
        self.batch_infer_size = [32]
M
add trt  
MRXLT 已提交
95
        self.use_trt = False
96
        self.gpu_multi_stream = False
Z
zhangjun 已提交
97 98
        self.use_lite = False
        self.use_xpu = False
99
        self.use_ascend_cl = False
Z
zhangjun 已提交
100
        self.model_config_paths = collections.OrderedDict()
101 102
        self.product_name = None
        self.container_id = None
H
HexToString 已提交
103 104 105 106 107 108
        self.default_engine_types = [
            'GeneralInferOp',
            'GeneralDistKVInferOp',
            'GeneralDistKVQuantInferOp',
            'GeneralDetectionOp',
        ]
S
ShiningZhang 已提交
109 110
        self.enable_prometheus = False
        self.prometheus_port = 19393
S
ShiningZhang 已提交
111
        self.request_cache_size = 0
112 113 114 115 116 117 118 119
        self.enable_dist_model = False
        self.dist_carrier_id = ""
        self.dist_cfg_file = ""
        self.dist_nranks = 0
        self.dist_endpoints = []
        self.dist_subgraph_index = -1
        self.dist_worker_serving_endpoints = []
        self.dist_master_serving = False
S
ShiningZhang 已提交
120 121
        self.min_subgraph_size = []
        self.trt_dynamic_shape_info = []
M
MRXLT 已提交
122

Z
zhangjun 已提交
123 124 125 126 127
    def get_fetch_list(self, infer_node_idx=-1):
        fetch_names = [
            var.alias_name
            for var in list(self.model_conf.values())[infer_node_idx].fetch_var
        ]
B
fix cpu  
barriery 已提交
128 129
        return fetch_names

M
MRXLT 已提交
130 131 132 133 134 135
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
136 137 138 139 140 141 142 143
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

144 145 146
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
147 148 149
    def set_port(self, port):
        self.port = port

150 151 152 153 154 155
    def set_precision(self, precision="fp32"):
        self.precision = precision

    def set_use_calib(self, use_calib=False):
        self.use_calib = use_calib

M
MRXLT 已提交
156 157 158 159 160 161
    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
162 163 164
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
165 166 167
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
168 169 170
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

171
    # Multi-Server does not have this Function.
172 173 174 175 176
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

177
    # Multi-Server does not have this Function.
178 179 180 181 182
    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
183 184 185 186
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
187

M
MRXLT 已提交
188
    def check_cuda(self):
M
MRXLT 已提交
189 190 191
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
192
            raise SystemExit(
M
MRXLT 已提交
193
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
194 195
            )

Z
zhangjun 已提交
196 197 198
    def set_device(self, device="cpu"):
        self.device = device

199
    def set_gpuid(self, gpuid):
H
HexToString 已提交
200
        self.gpuid = format_gpu_to_strlist(gpuid)
M
MRXLT 已提交
201

H
HexToString 已提交
202 203
    def set_runtime_thread_num(self, runtime_thread_num):
        self.runtime_thread_num = runtime_thread_num
204

H
HexToString 已提交
205 206
    def set_batch_infer_size(self, batch_infer_size):
        self.batch_infer_size = batch_infer_size
207

M
bug fix  
MRXLT 已提交
208
    def set_trt(self):
M
add trt  
MRXLT 已提交
209 210
        self.use_trt = True

211 212 213
    def set_gpu_multi_stream(self):
        self.gpu_multi_stream = True

Z
zhangjun 已提交
214 215 216 217 218 219
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

220 221 222
    def set_ascend_cl(self):
        self.use_ascend_cl = True

S
ShiningZhang 已提交
223 224 225 226 227 228
    def set_enable_prometheus(self, flag=False):
        self.enable_prometheus = flag

    def set_prometheus_port(self, prometheus_port):
        self.prometheus_port = prometheus_port

S
ShiningZhang 已提交
229 230 231
    def set_request_cache_size(self, request_cache_size):
        self.request_cache_size = request_cache_size

232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
    def set_enable_dist_model(self, status):
        self.enable_dist_model = status

    def set_dist_carrier_id(self, carrier_id):
        if isinstance(carrier_id, int):
            carrier_id = str(carrier_id)
        self.dist_carrier_id = carrier_id

    def set_dist_cfg_file(self, dist_cfg_file):
        self.dist_cfg_file = dist_cfg_file

    def set_dist_nranks(self, nranks):
        if isinstance(nranks, str):
            nranks = int(nranks)
        elif not isinstance(nranks, int):
            raise ValueError("dist_nranks type error! must be int or string")

        self.dist_nranks = nranks

    def set_dist_endpoints(self, endpoints):
        if isinstance(endpoints, list):
            self.dist_endpoints = endpoints
        elif isinstance(endpoints, str):
            self.dist_endpoints = [endpoints]
        else:
            raise ValueError(
                "dist_endpoints type error! must be list or string")

    def set_dist_subgraph_index(self, subgraph_index):
        if isinstance(subgraph_index, str):
            subgraph_index = int(subgraph_index)
        elif not isinstance(subgraph_index, int):
            raise ValueError("subgraph type error! must be int or string")

        self.dist_subgraph_index = subgraph_index

    def set_dist_worker_serving_endpoint(self, serving_endpoints):
        if isinstance(serving_endpoints, list):
            self.dist_worker_serving_endpoint = serving_endpoints
        elif not isinstance(serving_endpoints, str):
            self.dist_worker_serving_endpoint = [serving_endpoints]
        else:
            raise ValueError(
                "dist_worker_serving_endpoint type error! must be list or string"
            )

    def set_dist_master_serving(self, is_master):
        self.dist_master_serving = is_master

S
ShiningZhang 已提交
281
    def set_min_subgraph_size(self, min_subgraph_size):
S
ShiningZhang 已提交
282 283 284 285 286 287
        for s in min_subgraph_size:
            try:
                size = int(s)
            except:
                size = 3
            self.min_subgraph_size.append(size)
T
TeslaZhao 已提交
288

S
ShiningZhang 已提交
289 290 291
    def set_trt_dynamic_shape_info(self, info):
        self.trt_dynamic_shape_info = info

H
HexToString 已提交
292
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
H
HexToString 已提交
293
        self.device = device
M
MRXLT 已提交
294
        if self.model_toolkit_conf == None:
H
HexToString 已提交
295
            self.model_toolkit_conf = []
H
HexToString 已提交
296

H
HexToString 已提交
297
        # Generally, self.gpuid = str[] or [].
H
HexToString 已提交
298 299 300
        # when len(self.gpuid) means no gpuid is specified.
        # if self.device == "gpu" or self.use_trt:
        # we assume you forget to set gpuid, so set gpuid = ['0'];
H
HexToString 已提交
301 302 303 304
        if len(self.gpuid) == 0 or self.gpuid == ["-1"]:
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
                self.gpuid = ["0"]
                self.device = "gpu"
305 306
            elif self.use_xpu or self.use_ascend_cl:
                self.gpuid = ["0"]
307
            else:
H
HexToString 已提交
308
                self.gpuid = ["-1"]
309

H
HexToString 已提交
310 311 312 313
        if isinstance(self.runtime_thread_num, int):
            self.runtime_thread_num = [self.runtime_thread_num]
        if len(self.runtime_thread_num) == 0:
            self.runtime_thread_num.append(0)
314

H
HexToString 已提交
315 316 317 318
        if isinstance(self.batch_infer_size, int):
            self.batch_infer_size = [self.batch_infer_size]
        if len(self.batch_infer_size) == 0:
            self.batch_infer_size.append(32)
319 320

        index = 0
M
MRXLT 已提交
321

B
barrierye 已提交
322 323 324 325
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
326
            engine.reloadable_meta = model_config_path + "/fluid_time_file"
B
barrierye 已提交
327 328
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
H
HexToString 已提交
329 330 331 332
            engine.runtime_thread_num = self.runtime_thread_num[index % len(
                self.runtime_thread_num)]
            engine.batch_infer_size = self.batch_infer_size[index % len(
                self.batch_infer_size)]
333

H
HexToString 已提交
334 335
            engine.enable_overrun = False
            engine.allow_split_request = True
Z
update  
zhangjun 已提交
336
            engine.model_dir = model_config_path
B
barrierye 已提交
337
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
338
            engine.enable_ir_optimization = self.ir_optimization
M
add trt  
MRXLT 已提交
339
            engine.use_trt = self.use_trt
340
            engine.gpu_multi_stream = self.gpu_multi_stream
Z
update  
zhangjun 已提交
341 342
            engine.use_lite = self.use_lite
            engine.use_xpu = self.use_xpu
343
            engine.use_ascend_cl = self.use_ascend_cl
Z
zhangjun 已提交
344 345
            engine.use_gpu = False

346 347 348 349 350 351 352 353 354
            # use distributed model.
            if self.dist_subgraph_index >= 0:
                engine.enable_dist_model = True
                engine.dist_carrier_id = self.dist_carrier_id
                engine.dist_cfg_file = self.dist_cfg_file
                engine.dist_nranks = self.dist_nranks
                engine.dist_endpoints.extend(self.dist_endpoints)
                engine.dist_subgraph_index = self.dist_subgraph_index

355 356 357 358 359 360
            if len(self.gpuid) == 0:
                raise ValueError("CPU: self.gpuid = -1, GPU: must set it ")
            op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",")
            for ids in op_gpu_list:
                engine.gpu_ids.extend([int(ids)])

H
HexToString 已提交
361
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
H
HexToString 已提交
362 363 364 365 366
                engine.use_gpu = True
                # this is for Mixed use of GPU and CPU
                # if model-1 use GPU and set the device="gpu"
                # but gpuid[1] = "-1" which means use CPU in Model-2
                # so config about GPU should be False.
H
HexToString 已提交
367 368
                # op_gpu_list = gpuid[index].split(",")
                # which is the gpuid for each engine.
H
HexToString 已提交
369 370 371 372 373 374
                if len(op_gpu_list) == 1:
                    if int(op_gpu_list[0]) == -1:
                        engine.use_gpu = False
                        engine.gpu_multi_stream = False
                        engine.use_trt = False

Z
fix  
zhangjun 已提交
375
            if os.path.exists('{}/__params__'.format(model_config_path)):
Z
update  
zhangjun 已提交
376
                engine.combined_model = True
Z
fix  
zhangjun 已提交
377 378
            else:
                engine.combined_model = False
Z
update  
zhangjun 已提交
379 380
            if use_encryption_model:
                engine.encrypted_model = True
Z
fix  
zhangjun 已提交
381
            engine.type = "PADDLE_INFER"
S
ShiningZhang 已提交
382 383 384 385 386
            if len(self.min_subgraph_size) > index:
                engine.min_subgraph_size = self.min_subgraph_size[index]
            if len(self.trt_dynamic_shape_info) > index:
                dynamic_shape_info = self.trt_dynamic_shape_info[index]
                try:
T
TeslaZhao 已提交
387
                    for key, value in dynamic_shape_info.items():
S
ShiningZhang 已提交
388 389 390 391 392 393 394
                        shape_type = key
                        if shape_type == "min_input_shape":
                            local_map = engine.min_input_shape
                        if shape_type == "max_input_shape":
                            local_map = engine.max_input_shape
                        if shape_type == "opt_input_shape":
                            local_map = engine.opt_input_shape
T
TeslaZhao 已提交
395
                        for name, shape in value.items():
S
ShiningZhang 已提交
396 397 398 399
                            local_value = ' '.join(str(i) for i in shape)
                            local_map[name] = local_value
                except:
                    raise ValueError("Set TRT dynamic shape info error!")
T
TeslaZhao 已提交
400

H
HexToString 已提交
401 402
            self.model_toolkit_conf.append(server_sdk.ModelToolkitConf())
            self.model_toolkit_conf[-1].engines.extend([engine])
403
            index = index + 1
M
MRXLT 已提交
404 405 406 407 408 409 410 411 412 413

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
414
    def _prepare_resource(self, workdir, cube_conf):
415
        self.workdir = workdir
M
MRXLT 已提交
416 417
        if self.resource_conf == None:
            self.resource_conf = server_sdk.ResourceConf()
Z
zhangjun 已提交
418 419
            for idx, op_general_model_config_fn in enumerate(
                    self.general_model_config_fn):
H
HexToString 已提交
420
                with open("{}/{}".format(workdir, op_general_model_config_fn),
Z
zhangjun 已提交
421
                          "w") as fout:
H
HexToString 已提交
422 423 424
                    fout.write(str(list(self.model_conf.values())[idx]))
                for workflow in self.workflow_conf.workflows:
                    for node in workflow.nodes:
B
bjjwwang 已提交
425
                        if "distkv" in node.name.lower():
H
HexToString 已提交
426 427 428 429 430 431 432
                            self.resource_conf.cube_config_path = workdir
                            self.resource_conf.cube_config_file = self.cube_config_fn
                            if cube_conf == None:
                                raise ValueError(
                                    "Please set the path of cube.conf while use dist_kv op."
                                )
                            shutil.copy(cube_conf, workdir)
B
bjjwwang 已提交
433
                            if "quant" in node.name.lower():
H
HexToString 已提交
434 435
                                self.resource_conf.cube_quant_bits = 8
                self.resource_conf.model_toolkit_path.extend([workdir])
Z
zhangjun 已提交
436 437
                self.resource_conf.model_toolkit_file.extend(
                    [self.model_toolkit_fn[idx]])
H
HexToString 已提交
438
                self.resource_conf.general_model_path.extend([workdir])
Z
zhangjun 已提交
439 440
                self.resource_conf.general_model_file.extend(
                    [op_general_model_config_fn])
H
HexToString 已提交
441 442 443 444 445
                #TODO:figure out the meaning of product_name and container_id.
                if self.product_name != None:
                    self.resource_conf.auth_product_name = self.product_name
                if self.container_id != None:
                    self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
446 447 448 449 450

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

H
HexToString 已提交
451
    def load_model_config(self, model_config_paths_args):
B
barrierye 已提交
452 453 454
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
455
        # of multiple models are the same.
H
HexToString 已提交
456 457 458 459 460 461 462
        if isinstance(model_config_paths_args, str):
            model_config_paths_args = [model_config_paths_args]

        for single_model_config in model_config_paths_args:
            if os.path.isdir(single_model_config):
                pass
            elif os.path.isfile(single_model_config):
Z
zhangjun 已提交
463 464 465
                raise ValueError(
                    "The input of --model should be a dir not file.")

H
HexToString 已提交
466
        if isinstance(model_config_paths_args, list):
B
barrierye 已提交
467
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
468
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
469
            # it from workflow_conf.
H
HexToString 已提交
470

H
HexToString 已提交
471 472 473
            # now only support single-workflow.
            # TODO:support multi-workflow
            model_config_paths_list_idx = 0
B
barrierye 已提交
474
            for node in self.workflow_conf.workflows[0].nodes:
H
HexToString 已提交
475
                if node.type in self.default_engine_types:
H
HexToString 已提交
476 477 478 479
                    if node.name is None:
                        raise Exception(
                            "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                        )
Z
zhangjun 已提交
480

H
HexToString 已提交
481
                    f = open("{}/serving_server_conf.prototxt".format(
Z
zhangjun 已提交
482 483 484 485 486 487 488 489 490 491 492 493
                        model_config_paths_args[model_config_paths_list_idx]),
                             'r')
                    self.model_conf[
                        node.name] = google.protobuf.text_format.Merge(
                            str(f.read()), m_config.GeneralModelConfig())
                    self.model_config_paths[
                        node.name] = model_config_paths_args[
                            model_config_paths_list_idx]
                    self.general_model_config_fn.append(
                        node.name + "/general_model.prototxt")
                    self.model_toolkit_fn.append(node.name +
                                                 "/model_toolkit.prototxt")
H
HexToString 已提交
494 495
                    self.subdirectory.append(node.name)
                    model_config_paths_list_idx += 1
Z
zhangjun 已提交
496 497
                    if model_config_paths_list_idx == len(
                            model_config_paths_args):
H
HexToString 已提交
498 499 500 501 502
                        break
        #Right now, this is not useful.
        elif isinstance(model_config_paths_args, dict):
            self.model_config_paths = collections.OrderedDict()
            for node_str, path in model_config_paths_args.items():
B
barrierye 已提交
503 504 505 506 507
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
H
HexToString 已提交
508 509
            f = open("{}/serving_server_conf.prototxt".format(path), 'r')
            self.model_conf[node.name] = google.protobuf.text_format.Merge(
Z
zhangjun 已提交
510
                str(f.read()), m_config.GeneralModelConfig())
B
barrierye 已提交
511
        else:
Z
zhangjun 已提交
512 513 514 515
            raise Exception(
                "The type of model_config_paths must be str or list or "
                "dict({op: model_path}), not {}.".format(
                    type(model_config_paths_args)))
M
MRXLT 已提交
516 517
        # check config here
        # print config here
Z
update  
zhangjun 已提交
518

Z
zhangjun 已提交
519 520 521
    def use_mkl(self, flag):
        self.mkl_flag = flag

Z
zhangjun 已提交
522
    def check_avx(self):
523 524 525 526 527
        p = subprocess.Popen(
            ['cat /proc/cpuinfo | grep avx 2>/dev/null'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True)
Z
zhangjun 已提交
528
        out, err = p.communicate()
Z
zhangjun 已提交
529
        if err == b'' and len(out) > 0:
Z
zhangjun 已提交
530 531 532 533
            return True
        else:
            return False

Z
zhangjun 已提交
534 535
    def get_device_version(self):
        avx_flag = False
Z
zhangjun 已提交
536
        avx_support = self.check_avx()
Z
update  
zhangjun 已提交
537
        if avx_support:
Z
zhangjun 已提交
538
            avx_flag = True
Z
zhangjun 已提交
539 540
            self.use_mkl(True)
        mkl_flag = self.mkl_flag
Z
zhangjun 已提交
541 542
        if avx_flag:
            if mkl_flag:
Z
update  
zhangjun 已提交
543
                device_version = "cpu-avx-mkl"
Z
zhangjun 已提交
544
            else:
Z
update  
zhangjun 已提交
545
                device_version = "cpu-avx-openblas"
Z
zhangjun 已提交
546 547 548 549 550
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
Z
update  
zhangjun 已提交
551 552 553 554 555 556 557
            device_version = "cpu-noavx-openblas"
        return device_version

    def get_serving_bin_name(self):
        if device_type == "0":
            device_version = self.get_device_version()
        elif device_type == "1":
B
bjjwwang 已提交
558
            if version_suffix == "101" or version_suffix == "102" or version_suffix == "1028" or version_suffix == "112":
Z
update  
zhangjun 已提交
559 560 561 562 563
                device_version = "gpu-" + version_suffix
            else:
                device_version = "gpu-cuda" + version_suffix
        elif device_type == "2":
            device_version = "xpu-" + platform.machine()
S
ShiningZhang 已提交
564 565
        elif device_type == "3":
            device_version = "rocm-" + platform.machine()
566 567 568 569 570
        elif device_type == "4":
            if self.use_lite:
                device_version = "ascendcl-lite-" + platform.machine()
            else:
                device_version = "ascendcl-" + platform.machine()
Z
zhangjun 已提交
571
        return device_version
M
MRXLT 已提交
572 573 574

    def download_bin(self):
        os.chdir(self.module_path)
M
MRXLT 已提交
575 576 577

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
Z
update  
zhangjun 已提交
578

Z
fix  
zhangjun 已提交
579
        folder_name = "serving-%s-%s" % (self.get_serving_bin_name(),
580
                                         version_tag)
Z
fix  
zhangjun 已提交
581
        tar_name = "%s.tar.gz" % folder_name
582
        bin_url = "https://paddle-serving.bj.bcebos.com/test-dev/bin/%s" % tar_name
Z
fix  
zhangjun 已提交
583

584 585 586 587
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
588 589 590

        fcntl.flock(version_file, fcntl.LOCK_EX)

591 592 593 594 595
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
596 597
        if not os.path.exists(self.server_path):
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
598

M
MRXLT 已提交
599 600 601 602
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
603
                raise SystemExit(
T
TeslaZhao 已提交
604 605
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
606 607 608 609 610 611
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
Z
zhangjun 已提交
612
                    open(download_flag, "a").close()
M
MRXLT 已提交
613
                except:
Z
zhangjun 已提交
614 615
                    if os.path.exists(self.server_path):
                        os.remove(self.server_path)
M
MRXLT 已提交
616
                    raise SystemExit(
T
TeslaZhao 已提交
617 618
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
619 620
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
621
        #release lock
B
barrierye 已提交
622
        version_file.close()
M
MRXLT 已提交
623 624
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
update  
zhangjun 已提交
625

M
MRXLT 已提交
626 627 628
    def prepare_server(self,
                       workdir=None,
                       port=9292,
H
HexToString 已提交
629
                       device=None,
W
wangjiawei04 已提交
630
                       use_encryption_model=False,
M
MRXLT 已提交
631
                       cube_conf=None):
H
HexToString 已提交
632 633 634 635 636 637 638 639
        # if `device` is not set, use self.device
        # self.device may not be changed.
        # or self.device may have changed by set_device.
        if device == None:
            device = self.device
        # if `device` is set, let self.device = device.
        else:
            self.device = device
M
MRXLT 已提交
640 641
        if workdir == None:
            workdir = "./tmp"
Z
zhangjun 已提交
642
            os.system("mkdir -p {}".format(workdir))
M
MRXLT 已提交
643
        else:
Z
zhangjun 已提交
644
            os.system("mkdir -p {}".format(workdir))
H
HexToString 已提交
645
        for subdir in self.subdirectory:
646
            os.system("mkdir -p {}/{}".format(workdir, subdir))
H
HexToString 已提交
647 648
            os.system("touch {}/{}/fluid_time_file".format(workdir, subdir))

M
MRXLT 已提交
649
        if not self.port_is_available(port):
G
gongweibao 已提交
650
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
651

G
guru4elephant 已提交
652
        self.set_port(port)
M
MRXLT 已提交
653
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
654 655
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
656 657 658 659 660
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        self._write_pb_str(infer_service_fn, self.infer_service_conf)
H
HexToString 已提交
661 662

        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
M
MRXLT 已提交
663
        self._write_pb_str(workflow_fn, self.workflow_conf)
H
HexToString 已提交
664 665

        resource_fn = "{}/{}".format(workdir, self.resource_fn)
M
MRXLT 已提交
666
        self._write_pb_str(resource_fn, self.resource_conf)
H
HexToString 已提交
667

Z
zhangjun 已提交
668
        for idx, single_model_toolkit_fn in enumerate(self.model_toolkit_fn):
H
HexToString 已提交
669 670
            model_toolkit_fn = "{}/{}".format(workdir, single_model_toolkit_fn)
            self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf[idx])
M
MRXLT 已提交
671

M
MRXLT 已提交
672
    def port_is_available(self, port):
M
MRXLT 已提交
673 674
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
H
HexToString 已提交
675
            result = sock.connect_ex(('127.0.0.1', port))
M
MRXLT 已提交
676 677 678 679 680
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
681 682 683
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
684
        self.check_local_bin()
M
MRXLT 已提交
685 686
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
687 688 689
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
690 691
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
692
        #self.check_cuda()
H
HexToString 已提交
693 694 695 696 697 698 699 700
        command = "{} " \
                    "-enable_model_toolkit " \
                    "-inferservice_path {} " \
                    "-inferservice_file {} " \
                    "-max_concurrency {} " \
                    "-num_threads {} " \
                    "-port {} " \
                    "-precision {} " \
701
                    "-use_calib={} " \
H
HexToString 已提交
702 703 704 705 706 707
                    "-reload_interval_s {} " \
                    "-resource_path {} " \
                    "-resource_file {} " \
                    "-workflow_path {} " \
                    "-workflow_file {} " \
                    "-bthread_concurrency {} " \
708
                    "-max_body_size {} ".format(
H
HexToString 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722
                        self.bin_path,
                        self.workdir,
                        self.infer_service_fn,
                        self.max_concurrency,
                        self.num_threads,
                        self.port,
                        self.precision,
                        self.use_calib,
                        self.reload_interval_s,
                        self.workdir,
                        self.resource_fn,
                        self.workdir,
                        self.workflow_fn,
                        self.num_threads,
723
                        self.max_body_size)
S
ShiningZhang 已提交
724 725 726 727 728 729 730 731 732 733 734
        if self.enable_prometheus:
            command =   command + \
                        "-enable_prometheus={} " \
                        "-prometheus_port {} ".format(
                        self.enable_prometheus,
                        self.prometheus_port)
        if self.request_cache_size > 0:
            command =   command + \
                        "-request_cache_size {} ".format(
                            self.request_cache_size
                        )
H
HexToString 已提交
735

M
MRXLT 已提交
736 737
        print("Going to Run Comand")
        print(command)
738

M
MRXLT 已提交
739
        os.system(command)