server.py 31.0 KB
Newer Older
Z
update  
zhangjun 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
M
MRXLT 已提交
14 15 16

import os
import tarfile
M
MRXLT 已提交
17
import socket
Z
zhangjun 已提交
18
import paddle_serving_server as paddle_serving_server
H
HexToString 已提交
19
from paddle_serving_server.serve import format_gpu_to_strlist
Z
update  
zhangjun 已提交
20 21 22
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
23
import time
24
from .version import version_tag, version_suffix, device_type
M
MRXLT 已提交
25
from contextlib import closing
G
guru4elephant 已提交
26
import argparse
Z
zhangjun 已提交
27

J
Jiawei Wang 已提交
28
import sys
W
wangjiawei04 已提交
29 30
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
31
import shutil
Z
update  
zhangjun 已提交
32
import platform
B
barrierye 已提交
33
import numpy as np
B
barrierye 已提交
34
import sys
35
import collections
Z
update  
zhangjun 已提交
36
import subprocess
Z
zhangjun 已提交
37

B
barrierye 已提交
38 39 40
from multiprocessing import Pool, Process
from concurrent import futures

Z
update  
zhangjun 已提交
41

H
HexToString 已提交
42 43
# The whole file is about to be discarded.
# We will use default config-file to start C++Server.
M
MRXLT 已提交
44 45
class Server(object):
    def __init__(self):
H
HexToString 已提交
46 47 48 49 50 51
        """
        self.model_toolkit_conf:'list'=[] # The quantity of self.model_toolkit_conf is equal to the InferOp quantity/Engine--OP
        self.model_conf:'collections.OrderedDict()' # Save the serving_server_conf.prototxt content (feed and fetch information) this is a map for multi-model in a workflow
        self.workflow_fn:'str'="workflow.prototxt" # Only one for one Service/Workflow
        self.resource_fn:'str'="resource.prototxt" # Only one for one Service,model_toolkit_fn and general_model_config_fn is recorded in this file
        self.infer_service_fn:'str'="infer_service.prototxt" # Only one for one Service,Service--Workflow
H
HexToString 已提交
52 53
        self.model_toolkit_fn:'list'=[] # ["GeneralInferOp_0/model_toolkit.prototxt"]The quantity is equal to the InferOp quantity,Engine--OP
        self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
H
HexToString 已提交
54 55
        self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name
        self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
56 57 58 59 60 61 62 63
        self.enable_dist_model: bool, enable distributed model, false default
        self.dist_carrier_id: string, mark distributed model carrier name, "" default.
        self.dist_cfg_file: string, file name of distributed configure, "" default.
        self.dist_nranks: int, number of distributed nodes, 0 default.
        self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default.
        self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default.
        self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default.
        self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default.
H
HexToString 已提交
64
        """
M
MRXLT 已提交
65 66
        self.server_handle_ = None
        self.infer_service_conf = None
H
HexToString 已提交
67
        self.model_toolkit_conf = []
M
MRXLT 已提交
68 69
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
70
        self.ir_optimization = False
Z
zhangjun 已提交
71
        self.model_conf = collections.OrderedDict()
H
HexToString 已提交
72 73 74
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
Z
zhangjun 已提交
75 76 77
        self.model_toolkit_fn = []
        self.general_model_config_fn = []
        self.subdirectory = []
W
wangjiawei04 已提交
78
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
79 80
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
81
        self.num_threads = 2
M
MRXLT 已提交
82
        self.port = 8080
83 84
        self.precision = "fp32"
        self.use_calib = False
M
MRXLT 已提交
85
        self.reload_interval_s = 10
M
MRXLT 已提交
86
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
87 88
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
89
        self.use_local_bin = False
Z
zhangjun 已提交
90
        self.mkl_flag = False
Z
zhangjun 已提交
91
        self.device = "cpu"
92
        self.gpuid = []
H
HexToString 已提交
93 94
        self.runtime_thread_num = [0]
        self.batch_infer_size = [32]
M
add trt  
MRXLT 已提交
95
        self.use_trt = False
96
        self.gpu_multi_stream = False
Z
zhangjun 已提交
97 98
        self.use_lite = False
        self.use_xpu = False
99
        self.use_ascend_cl = False
Z
zhangjun 已提交
100
        self.model_config_paths = collections.OrderedDict()
101 102
        self.product_name = None
        self.container_id = None
H
HexToString 已提交
103 104 105 106 107 108
        self.default_engine_types = [
            'GeneralInferOp',
            'GeneralDistKVInferOp',
            'GeneralDistKVQuantInferOp',
            'GeneralDetectionOp',
        ]
S
ShiningZhang 已提交
109 110
        self.enable_prometheus = False
        self.prometheus_port = 19393
S
ShiningZhang 已提交
111
        self.request_cache_size = 0
112 113 114 115 116 117 118 119
        self.enable_dist_model = False
        self.dist_carrier_id = ""
        self.dist_cfg_file = ""
        self.dist_nranks = 0
        self.dist_endpoints = []
        self.dist_subgraph_index = -1
        self.dist_worker_serving_endpoints = []
        self.dist_master_serving = False
S
ShiningZhang 已提交
120 121
        self.min_subgraph_size = []
        self.trt_dynamic_shape_info = []
T
TeslaZhao 已提交
122 123 124 125
        self.gpu_memory_mb = 50
        self.cpu_math_thread_num = 1
        self.trt_workspace_size = 33554432  # 1 << 25
        self.trt_use_static = False
M
MRXLT 已提交
126

Z
zhangjun 已提交
127 128 129 130 131
    def get_fetch_list(self, infer_node_idx=-1):
        fetch_names = [
            var.alias_name
            for var in list(self.model_conf.values())[infer_node_idx].fetch_var
        ]
B
fix cpu  
barriery 已提交
132 133
        return fetch_names

M
MRXLT 已提交
134 135 136 137 138 139
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
140 141 142 143 144 145 146 147
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

148 149 150
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
151 152 153
    def set_port(self, port):
        self.port = port

154 155 156 157 158 159
    def set_precision(self, precision="fp32"):
        self.precision = precision

    def set_use_calib(self, use_calib=False):
        self.use_calib = use_calib

M
MRXLT 已提交
160 161 162 163 164 165
    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
166 167 168
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
169 170 171
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
172 173 174
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

175
    # Multi-Server does not have this Function.
176 177 178 179 180
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

181
    # Multi-Server does not have this Function.
182 183 184 185 186
    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
187 188 189 190
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
191

M
MRXLT 已提交
192
    def check_cuda(self):
M
MRXLT 已提交
193 194 195
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
196
            raise SystemExit(
M
MRXLT 已提交
197
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
198 199
            )

Z
zhangjun 已提交
200 201 202
    def set_device(self, device="cpu"):
        self.device = device

203
    def set_gpuid(self, gpuid):
H
HexToString 已提交
204
        self.gpuid = format_gpu_to_strlist(gpuid)
M
MRXLT 已提交
205

H
HexToString 已提交
206 207
    def set_runtime_thread_num(self, runtime_thread_num):
        self.runtime_thread_num = runtime_thread_num
208

H
HexToString 已提交
209 210
    def set_batch_infer_size(self, batch_infer_size):
        self.batch_infer_size = batch_infer_size
211

M
bug fix  
MRXLT 已提交
212
    def set_trt(self):
M
add trt  
MRXLT 已提交
213 214
        self.use_trt = True

215 216 217
    def set_gpu_multi_stream(self):
        self.gpu_multi_stream = True

Z
zhangjun 已提交
218 219 220 221 222 223
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

224 225 226
    def set_ascend_cl(self):
        self.use_ascend_cl = True

S
ShiningZhang 已提交
227 228 229 230 231 232
    def set_enable_prometheus(self, flag=False):
        self.enable_prometheus = flag

    def set_prometheus_port(self, prometheus_port):
        self.prometheus_port = prometheus_port

S
ShiningZhang 已提交
233 234 235
    def set_request_cache_size(self, request_cache_size):
        self.request_cache_size = request_cache_size

236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    def set_enable_dist_model(self, status):
        self.enable_dist_model = status

    def set_dist_carrier_id(self, carrier_id):
        if isinstance(carrier_id, int):
            carrier_id = str(carrier_id)
        self.dist_carrier_id = carrier_id

    def set_dist_cfg_file(self, dist_cfg_file):
        self.dist_cfg_file = dist_cfg_file

    def set_dist_nranks(self, nranks):
        if isinstance(nranks, str):
            nranks = int(nranks)
        elif not isinstance(nranks, int):
            raise ValueError("dist_nranks type error! must be int or string")

        self.dist_nranks = nranks

    def set_dist_endpoints(self, endpoints):
        if isinstance(endpoints, list):
            self.dist_endpoints = endpoints
        elif isinstance(endpoints, str):
            self.dist_endpoints = [endpoints]
        else:
            raise ValueError(
                "dist_endpoints type error! must be list or string")

    def set_dist_subgraph_index(self, subgraph_index):
        if isinstance(subgraph_index, str):
            subgraph_index = int(subgraph_index)
        elif not isinstance(subgraph_index, int):
            raise ValueError("subgraph type error! must be int or string")

        self.dist_subgraph_index = subgraph_index

    def set_dist_worker_serving_endpoint(self, serving_endpoints):
        if isinstance(serving_endpoints, list):
            self.dist_worker_serving_endpoint = serving_endpoints
        elif not isinstance(serving_endpoints, str):
            self.dist_worker_serving_endpoint = [serving_endpoints]
        else:
            raise ValueError(
                "dist_worker_serving_endpoint type error! must be list or string"
            )

    def set_dist_master_serving(self, is_master):
        self.dist_master_serving = is_master

S
ShiningZhang 已提交
285
    def set_min_subgraph_size(self, min_subgraph_size):
S
ShiningZhang 已提交
286 287 288 289 290 291
        for s in min_subgraph_size:
            try:
                size = int(s)
            except:
                size = 3
            self.min_subgraph_size.append(size)
T
TeslaZhao 已提交
292

S
ShiningZhang 已提交
293 294 295
    def set_trt_dynamic_shape_info(self, info):
        self.trt_dynamic_shape_info = info

T
TeslaZhao 已提交
296 297 298 299 300 301 302 303 304 305 306 307
    def set_gpu_memory_mb(self, gpu_memory_mb):
        self.gpu_memory_mb = gpu_memory_mb

    def set_cpu_math_thread_num(self, cpu_math_thread_num):
        self.cpu_math_thread_num = cpu_math_thread_num

    def set_trt_workspace_size(self, trt_workspace_size):
        self.trt_workspace_size = trt_workspace_size

    def set_trt_use_static(self, trt_use_static):
        self.trt_use_static = trt_use_static

H
HexToString 已提交
308
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
H
HexToString 已提交
309
        self.device = device
M
MRXLT 已提交
310
        if self.model_toolkit_conf == None:
H
HexToString 已提交
311
            self.model_toolkit_conf = []
H
HexToString 已提交
312

H
HexToString 已提交
313
        # Generally, self.gpuid = str[] or [].
H
HexToString 已提交
314 315 316
        # when len(self.gpuid) means no gpuid is specified.
        # if self.device == "gpu" or self.use_trt:
        # we assume you forget to set gpuid, so set gpuid = ['0'];
H
HexToString 已提交
317 318 319 320
        if len(self.gpuid) == 0 or self.gpuid == ["-1"]:
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
                self.gpuid = ["0"]
                self.device = "gpu"
321 322
            elif self.use_xpu or self.use_ascend_cl:
                self.gpuid = ["0"]
323
            else:
H
HexToString 已提交
324
                self.gpuid = ["-1"]
325

H
HexToString 已提交
326 327 328 329
        if isinstance(self.runtime_thread_num, int):
            self.runtime_thread_num = [self.runtime_thread_num]
        if len(self.runtime_thread_num) == 0:
            self.runtime_thread_num.append(0)
330

H
HexToString 已提交
331 332 333 334
        if isinstance(self.batch_infer_size, int):
            self.batch_infer_size = [self.batch_infer_size]
        if len(self.batch_infer_size) == 0:
            self.batch_infer_size.append(32)
335 336

        index = 0
M
MRXLT 已提交
337

B
barrierye 已提交
338 339 340 341
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
342
            engine.reloadable_meta = model_config_path + "/fluid_time_file"
B
barrierye 已提交
343 344
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
H
HexToString 已提交
345 346 347 348
            engine.runtime_thread_num = self.runtime_thread_num[index % len(
                self.runtime_thread_num)]
            engine.batch_infer_size = self.batch_infer_size[index % len(
                self.batch_infer_size)]
349

H
HexToString 已提交
350 351
            engine.enable_overrun = False
            engine.allow_split_request = True
Z
update  
zhangjun 已提交
352
            engine.model_dir = model_config_path
B
barrierye 已提交
353
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
354
            engine.enable_ir_optimization = self.ir_optimization
M
add trt  
MRXLT 已提交
355
            engine.use_trt = self.use_trt
356
            engine.gpu_multi_stream = self.gpu_multi_stream
Z
update  
zhangjun 已提交
357 358
            engine.use_lite = self.use_lite
            engine.use_xpu = self.use_xpu
359
            engine.use_ascend_cl = self.use_ascend_cl
Z
zhangjun 已提交
360
            engine.use_gpu = False
T
TeslaZhao 已提交
361 362 363 364
            engine.gpu_memory_mb = self.gpu_memory_mb
            engine.cpu_math_thread_num = self.cpu_math_thread_num
            engine.trt_workspace_size = self.trt_workspace_size
            engine.trt_use_static = self.trt_use_static
Z
zhangjun 已提交
365

366 367 368 369 370 371 372 373 374
            # use distributed model.
            if self.dist_subgraph_index >= 0:
                engine.enable_dist_model = True
                engine.dist_carrier_id = self.dist_carrier_id
                engine.dist_cfg_file = self.dist_cfg_file
                engine.dist_nranks = self.dist_nranks
                engine.dist_endpoints.extend(self.dist_endpoints)
                engine.dist_subgraph_index = self.dist_subgraph_index

375 376 377 378 379 380
            if len(self.gpuid) == 0:
                raise ValueError("CPU: self.gpuid = -1, GPU: must set it ")
            op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",")
            for ids in op_gpu_list:
                engine.gpu_ids.extend([int(ids)])

H
HexToString 已提交
381
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
H
HexToString 已提交
382 383 384 385 386
                engine.use_gpu = True
                # this is for Mixed use of GPU and CPU
                # if model-1 use GPU and set the device="gpu"
                # but gpuid[1] = "-1" which means use CPU in Model-2
                # so config about GPU should be False.
H
HexToString 已提交
387 388
                # op_gpu_list = gpuid[index].split(",")
                # which is the gpuid for each engine.
H
HexToString 已提交
389 390 391 392 393 394
                if len(op_gpu_list) == 1:
                    if int(op_gpu_list[0]) == -1:
                        engine.use_gpu = False
                        engine.gpu_multi_stream = False
                        engine.use_trt = False

Z
fix  
zhangjun 已提交
395
            if os.path.exists('{}/__params__'.format(model_config_path)):
Z
update  
zhangjun 已提交
396
                engine.combined_model = True
Z
fix  
zhangjun 已提交
397 398
            else:
                engine.combined_model = False
Z
update  
zhangjun 已提交
399 400
            if use_encryption_model:
                engine.encrypted_model = True
Z
fix  
zhangjun 已提交
401
            engine.type = "PADDLE_INFER"
S
ShiningZhang 已提交
402 403 404 405 406
            if len(self.min_subgraph_size) > index:
                engine.min_subgraph_size = self.min_subgraph_size[index]
            if len(self.trt_dynamic_shape_info) > index:
                dynamic_shape_info = self.trt_dynamic_shape_info[index]
                try:
T
TeslaZhao 已提交
407
                    for key, value in dynamic_shape_info.items():
S
ShiningZhang 已提交
408 409 410 411 412 413 414
                        shape_type = key
                        if shape_type == "min_input_shape":
                            local_map = engine.min_input_shape
                        if shape_type == "max_input_shape":
                            local_map = engine.max_input_shape
                        if shape_type == "opt_input_shape":
                            local_map = engine.opt_input_shape
T
TeslaZhao 已提交
415
                        for name, shape in value.items():
S
ShiningZhang 已提交
416 417 418 419
                            local_value = ' '.join(str(i) for i in shape)
                            local_map[name] = local_value
                except:
                    raise ValueError("Set TRT dynamic shape info error!")
T
TeslaZhao 已提交
420

H
HexToString 已提交
421 422
            self.model_toolkit_conf.append(server_sdk.ModelToolkitConf())
            self.model_toolkit_conf[-1].engines.extend([engine])
423
            index = index + 1
M
MRXLT 已提交
424 425 426 427 428 429 430 431 432 433

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
434
    def _prepare_resource(self, workdir, cube_conf):
435
        self.workdir = workdir
M
MRXLT 已提交
436 437
        if self.resource_conf == None:
            self.resource_conf = server_sdk.ResourceConf()
Z
zhangjun 已提交
438 439
            for idx, op_general_model_config_fn in enumerate(
                    self.general_model_config_fn):
H
HexToString 已提交
440
                with open("{}/{}".format(workdir, op_general_model_config_fn),
Z
zhangjun 已提交
441
                          "w") as fout:
H
HexToString 已提交
442 443 444
                    fout.write(str(list(self.model_conf.values())[idx]))
                for workflow in self.workflow_conf.workflows:
                    for node in workflow.nodes:
B
bjjwwang 已提交
445
                        if "distkv" in node.name.lower():
H
HexToString 已提交
446 447 448 449 450 451 452
                            self.resource_conf.cube_config_path = workdir
                            self.resource_conf.cube_config_file = self.cube_config_fn
                            if cube_conf == None:
                                raise ValueError(
                                    "Please set the path of cube.conf while use dist_kv op."
                                )
                            shutil.copy(cube_conf, workdir)
B
bjjwwang 已提交
453
                            if "quant" in node.name.lower():
H
HexToString 已提交
454 455
                                self.resource_conf.cube_quant_bits = 8
                self.resource_conf.model_toolkit_path.extend([workdir])
Z
zhangjun 已提交
456 457
                self.resource_conf.model_toolkit_file.extend(
                    [self.model_toolkit_fn[idx]])
H
HexToString 已提交
458
                self.resource_conf.general_model_path.extend([workdir])
Z
zhangjun 已提交
459 460
                self.resource_conf.general_model_file.extend(
                    [op_general_model_config_fn])
H
HexToString 已提交
461 462 463 464 465
                #TODO:figure out the meaning of product_name and container_id.
                if self.product_name != None:
                    self.resource_conf.auth_product_name = self.product_name
                if self.container_id != None:
                    self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
466 467 468 469 470

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

H
HexToString 已提交
471
    def load_model_config(self, model_config_paths_args):
B
barrierye 已提交
472 473 474
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
475
        # of multiple models are the same.
H
HexToString 已提交
476 477 478 479 480 481 482
        if isinstance(model_config_paths_args, str):
            model_config_paths_args = [model_config_paths_args]

        for single_model_config in model_config_paths_args:
            if os.path.isdir(single_model_config):
                pass
            elif os.path.isfile(single_model_config):
Z
zhangjun 已提交
483 484 485
                raise ValueError(
                    "The input of --model should be a dir not file.")

H
HexToString 已提交
486
        if isinstance(model_config_paths_args, list):
B
barrierye 已提交
487
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
488
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
489
            # it from workflow_conf.
H
HexToString 已提交
490

H
HexToString 已提交
491 492 493
            # now only support single-workflow.
            # TODO:support multi-workflow
            model_config_paths_list_idx = 0
B
barrierye 已提交
494
            for node in self.workflow_conf.workflows[0].nodes:
H
HexToString 已提交
495
                if node.type in self.default_engine_types:
H
HexToString 已提交
496 497 498 499
                    if node.name is None:
                        raise Exception(
                            "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                        )
Z
zhangjun 已提交
500

H
HexToString 已提交
501
                    f = open("{}/serving_server_conf.prototxt".format(
Z
zhangjun 已提交
502 503 504 505 506 507 508 509 510 511 512 513
                        model_config_paths_args[model_config_paths_list_idx]),
                             'r')
                    self.model_conf[
                        node.name] = google.protobuf.text_format.Merge(
                            str(f.read()), m_config.GeneralModelConfig())
                    self.model_config_paths[
                        node.name] = model_config_paths_args[
                            model_config_paths_list_idx]
                    self.general_model_config_fn.append(
                        node.name + "/general_model.prototxt")
                    self.model_toolkit_fn.append(node.name +
                                                 "/model_toolkit.prototxt")
H
HexToString 已提交
514 515
                    self.subdirectory.append(node.name)
                    model_config_paths_list_idx += 1
Z
zhangjun 已提交
516 517
                    if model_config_paths_list_idx == len(
                            model_config_paths_args):
H
HexToString 已提交
518 519 520 521 522
                        break
        #Right now, this is not useful.
        elif isinstance(model_config_paths_args, dict):
            self.model_config_paths = collections.OrderedDict()
            for node_str, path in model_config_paths_args.items():
B
barrierye 已提交
523 524 525 526 527
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
H
HexToString 已提交
528 529
            f = open("{}/serving_server_conf.prototxt".format(path), 'r')
            self.model_conf[node.name] = google.protobuf.text_format.Merge(
Z
zhangjun 已提交
530
                str(f.read()), m_config.GeneralModelConfig())
B
barrierye 已提交
531
        else:
Z
zhangjun 已提交
532 533 534 535
            raise Exception(
                "The type of model_config_paths must be str or list or "
                "dict({op: model_path}), not {}.".format(
                    type(model_config_paths_args)))
M
MRXLT 已提交
536 537
        # check config here
        # print config here
Z
update  
zhangjun 已提交
538

Z
zhangjun 已提交
539 540 541
    def use_mkl(self, flag):
        self.mkl_flag = flag

Z
zhangjun 已提交
542
    def check_avx(self):
543 544 545 546 547
        p = subprocess.Popen(
            ['cat /proc/cpuinfo | grep avx 2>/dev/null'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True)
Z
zhangjun 已提交
548
        out, err = p.communicate()
Z
zhangjun 已提交
549
        if err == b'' and len(out) > 0:
Z
zhangjun 已提交
550 551 552 553
            return True
        else:
            return False

Z
zhangjun 已提交
554 555
    def get_device_version(self):
        avx_flag = False
Z
zhangjun 已提交
556
        avx_support = self.check_avx()
Z
update  
zhangjun 已提交
557
        if avx_support:
Z
zhangjun 已提交
558
            avx_flag = True
Z
zhangjun 已提交
559 560
            self.use_mkl(True)
        mkl_flag = self.mkl_flag
Z
zhangjun 已提交
561 562
        if avx_flag:
            if mkl_flag:
Z
update  
zhangjun 已提交
563
                device_version = "cpu-avx-mkl"
Z
zhangjun 已提交
564
            else:
Z
update  
zhangjun 已提交
565
                device_version = "cpu-avx-openblas"
Z
zhangjun 已提交
566 567 568 569 570
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
Z
update  
zhangjun 已提交
571 572 573 574 575 576 577
            device_version = "cpu-noavx-openblas"
        return device_version

    def get_serving_bin_name(self):
        if device_type == "0":
            device_version = self.get_device_version()
        elif device_type == "1":
B
bjjwwang 已提交
578
            if version_suffix == "101" or version_suffix == "102" or version_suffix == "1028" or version_suffix == "112":
Z
update  
zhangjun 已提交
579 580 581 582 583
                device_version = "gpu-" + version_suffix
            else:
                device_version = "gpu-cuda" + version_suffix
        elif device_type == "2":
            device_version = "xpu-" + platform.machine()
S
ShiningZhang 已提交
584 585
        elif device_type == "3":
            device_version = "rocm-" + platform.machine()
586 587 588 589 590
        elif device_type == "4":
            if self.use_lite:
                device_version = "ascendcl-lite-" + platform.machine()
            else:
                device_version = "ascendcl-" + platform.machine()
Z
zhangjun 已提交
591
        return device_version
M
MRXLT 已提交
592 593 594

    def download_bin(self):
        os.chdir(self.module_path)
M
MRXLT 已提交
595 596 597

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
Z
update  
zhangjun 已提交
598

Z
fix  
zhangjun 已提交
599
        folder_name = "serving-%s-%s" % (self.get_serving_bin_name(),
600
                                         version_tag)
Z
fix  
zhangjun 已提交
601
        tar_name = "%s.tar.gz" % folder_name
602
        bin_url = "https://paddle-serving.bj.bcebos.com/test-dev/bin/%s" % tar_name
Z
fix  
zhangjun 已提交
603

604 605 606 607
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
608 609 610

        fcntl.flock(version_file, fcntl.LOCK_EX)

611 612 613 614 615
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
616 617
        if not os.path.exists(self.server_path):
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
618

M
MRXLT 已提交
619 620 621 622
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
623
                raise SystemExit(
T
TeslaZhao 已提交
624 625
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
626 627 628 629 630 631
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
Z
zhangjun 已提交
632
                    open(download_flag, "a").close()
M
MRXLT 已提交
633
                except:
Z
zhangjun 已提交
634 635
                    if os.path.exists(self.server_path):
                        os.remove(self.server_path)
M
MRXLT 已提交
636
                    raise SystemExit(
T
TeslaZhao 已提交
637 638
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
639 640
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
641
        #release lock
B
barrierye 已提交
642
        version_file.close()
M
MRXLT 已提交
643 644
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
update  
zhangjun 已提交
645

M
MRXLT 已提交
646 647 648
    def prepare_server(self,
                       workdir=None,
                       port=9292,
H
HexToString 已提交
649
                       device=None,
W
wangjiawei04 已提交
650
                       use_encryption_model=False,
M
MRXLT 已提交
651
                       cube_conf=None):
H
HexToString 已提交
652 653 654 655 656 657 658 659
        # if `device` is not set, use self.device
        # self.device may not be changed.
        # or self.device may have changed by set_device.
        if device == None:
            device = self.device
        # if `device` is set, let self.device = device.
        else:
            self.device = device
M
MRXLT 已提交
660 661
        if workdir == None:
            workdir = "./tmp"
Z
zhangjun 已提交
662
            os.system("mkdir -p {}".format(workdir))
M
MRXLT 已提交
663
        else:
Z
zhangjun 已提交
664
            os.system("mkdir -p {}".format(workdir))
H
HexToString 已提交
665
        for subdir in self.subdirectory:
666
            os.system("mkdir -p {}/{}".format(workdir, subdir))
H
HexToString 已提交
667 668
            os.system("touch {}/{}/fluid_time_file".format(workdir, subdir))

M
MRXLT 已提交
669
        if not self.port_is_available(port):
G
gongweibao 已提交
670
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
671

G
guru4elephant 已提交
672
        self.set_port(port)
M
MRXLT 已提交
673
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
674 675
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
676 677 678 679 680
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        self._write_pb_str(infer_service_fn, self.infer_service_conf)
H
HexToString 已提交
681 682

        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
M
MRXLT 已提交
683
        self._write_pb_str(workflow_fn, self.workflow_conf)
H
HexToString 已提交
684 685

        resource_fn = "{}/{}".format(workdir, self.resource_fn)
M
MRXLT 已提交
686
        self._write_pb_str(resource_fn, self.resource_conf)
H
HexToString 已提交
687

Z
zhangjun 已提交
688
        for idx, single_model_toolkit_fn in enumerate(self.model_toolkit_fn):
H
HexToString 已提交
689 690
            model_toolkit_fn = "{}/{}".format(workdir, single_model_toolkit_fn)
            self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf[idx])
M
MRXLT 已提交
691

M
MRXLT 已提交
692
    def port_is_available(self, port):
M
MRXLT 已提交
693 694
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
H
HexToString 已提交
695
            result = sock.connect_ex(('127.0.0.1', port))
M
MRXLT 已提交
696 697 698 699 700
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
701 702 703
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
704
        self.check_local_bin()
M
MRXLT 已提交
705 706
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
707 708 709
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
710 711
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
712
        #self.check_cuda()
H
HexToString 已提交
713 714 715 716 717 718 719 720
        command = "{} " \
                    "-enable_model_toolkit " \
                    "-inferservice_path {} " \
                    "-inferservice_file {} " \
                    "-max_concurrency {} " \
                    "-num_threads {} " \
                    "-port {} " \
                    "-precision {} " \
721
                    "-use_calib={} " \
H
HexToString 已提交
722 723 724 725 726 727
                    "-reload_interval_s {} " \
                    "-resource_path {} " \
                    "-resource_file {} " \
                    "-workflow_path {} " \
                    "-workflow_file {} " \
                    "-bthread_concurrency {} " \
728
                    "-max_body_size {} ".format(
H
HexToString 已提交
729 730 731 732 733 734 735 736 737 738 739 740 741 742
                        self.bin_path,
                        self.workdir,
                        self.infer_service_fn,
                        self.max_concurrency,
                        self.num_threads,
                        self.port,
                        self.precision,
                        self.use_calib,
                        self.reload_interval_s,
                        self.workdir,
                        self.resource_fn,
                        self.workdir,
                        self.workflow_fn,
                        self.num_threads,
743
                        self.max_body_size)
S
ShiningZhang 已提交
744 745 746 747 748 749 750 751 752 753 754
        if self.enable_prometheus:
            command =   command + \
                        "-enable_prometheus={} " \
                        "-prometheus_port {} ".format(
                        self.enable_prometheus,
                        self.prometheus_port)
        if self.request_cache_size > 0:
            command =   command + \
                        "-request_cache_size {} ".format(
                            self.request_cache_size
                        )
H
HexToString 已提交
755

M
MRXLT 已提交
756 757
        print("Going to Run Comand")
        print(command)
758

M
MRXLT 已提交
759
        os.system(command)