server.py 28.8 KB
Newer Older
Z
update  
zhangjun 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
M
MRXLT 已提交
14 15 16

import os
import tarfile
M
MRXLT 已提交
17
import socket
Z
zhangjun 已提交
18
import paddle_serving_server as paddle_serving_server
H
HexToString 已提交
19
from paddle_serving_server.serve import format_gpu_to_strlist
Z
update  
zhangjun 已提交
20 21 22
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
23
import time
24
from .version import version_tag, version_suffix, device_type
M
MRXLT 已提交
25
from contextlib import closing
G
guru4elephant 已提交
26
import argparse
Z
zhangjun 已提交
27

J
Jiawei Wang 已提交
28
import sys
W
wangjiawei04 已提交
29 30
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
31
import shutil
Z
update  
zhangjun 已提交
32
import platform
B
barrierye 已提交
33
import numpy as np
B
barrierye 已提交
34
import sys
35
import collections
Z
update  
zhangjun 已提交
36
import subprocess
Z
zhangjun 已提交
37

B
barrierye 已提交
38 39 40
from multiprocessing import Pool, Process
from concurrent import futures

Z
update  
zhangjun 已提交
41

H
HexToString 已提交
42 43
# The whole file is about to be discarded.
# We will use default config-file to start C++Server.
M
MRXLT 已提交
44 45
class Server(object):
    def __init__(self):
H
HexToString 已提交
46 47 48 49 50 51
        """
        self.model_toolkit_conf:'list'=[] # The quantity of self.model_toolkit_conf is equal to the InferOp quantity/Engine--OP
        self.model_conf:'collections.OrderedDict()' # Save the serving_server_conf.prototxt content (feed and fetch information) this is a map for multi-model in a workflow
        self.workflow_fn:'str'="workflow.prototxt" # Only one for one Service/Workflow
        self.resource_fn:'str'="resource.prototxt" # Only one for one Service,model_toolkit_fn and general_model_config_fn is recorded in this file
        self.infer_service_fn:'str'="infer_service.prototxt" # Only one for one Service,Service--Workflow
H
HexToString 已提交
52 53
        self.model_toolkit_fn:'list'=[] # ["GeneralInferOp_0/model_toolkit.prototxt"]The quantity is equal to the InferOp quantity,Engine--OP
        self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
H
HexToString 已提交
54 55
        self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name
        self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
56 57 58 59 60 61 62 63
        self.enable_dist_model: bool, enable distributed model, false default
        self.dist_carrier_id: string, mark distributed model carrier name, "" default.
        self.dist_cfg_file: string, file name of distributed configure, "" default.
        self.dist_nranks: int, number of distributed nodes, 0 default.
        self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default.
        self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default.
        self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default.
        self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default.
H
HexToString 已提交
64
        """
M
MRXLT 已提交
65 66
        self.server_handle_ = None
        self.infer_service_conf = None
H
HexToString 已提交
67
        self.model_toolkit_conf = []
M
MRXLT 已提交
68 69
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
70
        self.ir_optimization = False
Z
zhangjun 已提交
71
        self.model_conf = collections.OrderedDict()
H
HexToString 已提交
72 73 74
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
Z
zhangjun 已提交
75 76 77
        self.model_toolkit_fn = []
        self.general_model_config_fn = []
        self.subdirectory = []
W
wangjiawei04 已提交
78
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
79 80
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
81
        self.num_threads = 2
M
MRXLT 已提交
82
        self.port = 8080
83 84
        self.precision = "fp32"
        self.use_calib = False
M
MRXLT 已提交
85
        self.reload_interval_s = 10
M
MRXLT 已提交
86
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
87 88
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
89
        self.use_local_bin = False
Z
zhangjun 已提交
90
        self.mkl_flag = False
Z
zhangjun 已提交
91
        self.device = "cpu"
92
        self.gpuid = []
H
HexToString 已提交
93 94
        self.runtime_thread_num = [0]
        self.batch_infer_size = [32]
M
add trt  
MRXLT 已提交
95
        self.use_trt = False
96
        self.gpu_multi_stream = False
Z
zhangjun 已提交
97 98
        self.use_lite = False
        self.use_xpu = False
99
        self.use_ascend_cl = False
Z
zhangjun 已提交
100
        self.model_config_paths = collections.OrderedDict()
101 102
        self.product_name = None
        self.container_id = None
H
HexToString 已提交
103 104 105 106 107 108
        self.default_engine_types = [
            'GeneralInferOp',
            'GeneralDistKVInferOp',
            'GeneralDistKVQuantInferOp',
            'GeneralDetectionOp',
        ]
S
ShiningZhang 已提交
109 110
        self.enable_prometheus = False
        self.prometheus_port = 19393
S
ShiningZhang 已提交
111
        self.request_cache_size = 0
112 113 114 115 116 117 118 119
        self.enable_dist_model = False
        self.dist_carrier_id = ""
        self.dist_cfg_file = ""
        self.dist_nranks = 0
        self.dist_endpoints = []
        self.dist_subgraph_index = -1
        self.dist_worker_serving_endpoints = []
        self.dist_master_serving = False
M
MRXLT 已提交
120

Z
zhangjun 已提交
121 122 123 124 125
    def get_fetch_list(self, infer_node_idx=-1):
        fetch_names = [
            var.alias_name
            for var in list(self.model_conf.values())[infer_node_idx].fetch_var
        ]
B
fix cpu  
barriery 已提交
126 127
        return fetch_names

M
MRXLT 已提交
128 129 130 131 132 133
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
134 135 136 137 138 139 140 141
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

142 143 144
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
145 146 147
    def set_port(self, port):
        self.port = port

148 149 150 151 152 153
    def set_precision(self, precision="fp32"):
        self.precision = precision

    def set_use_calib(self, use_calib=False):
        self.use_calib = use_calib

M
MRXLT 已提交
154 155 156 157 158 159
    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
160 161 162
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
163 164 165
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
166 167 168
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

169
    # Multi-Server does not have this Function.
170 171 172 173 174
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

175
    # Multi-Server does not have this Function.
176 177 178 179 180
    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
181 182 183 184
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
185

M
MRXLT 已提交
186
    def check_cuda(self):
M
MRXLT 已提交
187 188 189
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
190
            raise SystemExit(
M
MRXLT 已提交
191
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
192 193
            )

Z
zhangjun 已提交
194 195 196
    def set_device(self, device="cpu"):
        self.device = device

197
    def set_gpuid(self, gpuid):
H
HexToString 已提交
198
        self.gpuid = format_gpu_to_strlist(gpuid)
M
MRXLT 已提交
199

H
HexToString 已提交
200 201
    def set_runtime_thread_num(self, runtime_thread_num):
        self.runtime_thread_num = runtime_thread_num
202

H
HexToString 已提交
203 204
    def set_batch_infer_size(self, batch_infer_size):
        self.batch_infer_size = batch_infer_size
205

M
bug fix  
MRXLT 已提交
206
    def set_trt(self):
M
add trt  
MRXLT 已提交
207 208
        self.use_trt = True

209 210 211
    def set_gpu_multi_stream(self):
        self.gpu_multi_stream = True

Z
zhangjun 已提交
212 213 214 215 216 217
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

218 219 220
    def set_ascend_cl(self):
        self.use_ascend_cl = True

S
ShiningZhang 已提交
221 222 223 224 225 226
    def set_enable_prometheus(self, flag=False):
        self.enable_prometheus = flag

    def set_prometheus_port(self, prometheus_port):
        self.prometheus_port = prometheus_port

S
ShiningZhang 已提交
227 228 229
    def set_request_cache_size(self, request_cache_size):
        self.request_cache_size = request_cache_size

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
    def set_enable_dist_model(self, status):
        self.enable_dist_model = status

    def set_dist_carrier_id(self, carrier_id):
        if isinstance(carrier_id, int):
            carrier_id = str(carrier_id)
        self.dist_carrier_id = carrier_id

    def set_dist_cfg_file(self, dist_cfg_file):
        self.dist_cfg_file = dist_cfg_file

    def set_dist_nranks(self, nranks):
        if isinstance(nranks, str):
            nranks = int(nranks)
        elif not isinstance(nranks, int):
            raise ValueError("dist_nranks type error! must be int or string")

        self.dist_nranks = nranks

    def set_dist_endpoints(self, endpoints):
        if isinstance(endpoints, list):
            self.dist_endpoints = endpoints
        elif isinstance(endpoints, str):
            self.dist_endpoints = [endpoints]
        else:
            raise ValueError(
                "dist_endpoints type error! must be list or string")

    def set_dist_subgraph_index(self, subgraph_index):
        if isinstance(subgraph_index, str):
            subgraph_index = int(subgraph_index)
        elif not isinstance(subgraph_index, int):
            raise ValueError("subgraph type error! must be int or string")

        self.dist_subgraph_index = subgraph_index

    def set_dist_worker_serving_endpoint(self, serving_endpoints):
        if isinstance(serving_endpoints, list):
            self.dist_worker_serving_endpoint = serving_endpoints
        elif not isinstance(serving_endpoints, str):
            self.dist_worker_serving_endpoint = [serving_endpoints]
        else:
            raise ValueError(
                "dist_worker_serving_endpoint type error! must be list or string"
            )

    def set_dist_master_serving(self, is_master):
        self.dist_master_serving = is_master

H
HexToString 已提交
279
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
H
HexToString 已提交
280
        self.device = device
M
MRXLT 已提交
281
        if self.model_toolkit_conf == None:
H
HexToString 已提交
282
            self.model_toolkit_conf = []
H
HexToString 已提交
283

H
HexToString 已提交
284
        # Generally, self.gpuid = str[] or [].
H
HexToString 已提交
285 286 287
        # when len(self.gpuid) means no gpuid is specified.
        # if self.device == "gpu" or self.use_trt:
        # we assume you forget to set gpuid, so set gpuid = ['0'];
H
HexToString 已提交
288 289 290 291
        if len(self.gpuid) == 0 or self.gpuid == ["-1"]:
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
                self.gpuid = ["0"]
                self.device = "gpu"
292 293
            elif self.use_xpu or self.use_ascend_cl:
                self.gpuid = ["0"]
294
            else:
H
HexToString 已提交
295
                self.gpuid = ["-1"]
296

H
HexToString 已提交
297 298 299 300
        if isinstance(self.runtime_thread_num, int):
            self.runtime_thread_num = [self.runtime_thread_num]
        if len(self.runtime_thread_num) == 0:
            self.runtime_thread_num.append(0)
301

H
HexToString 已提交
302 303 304 305
        if isinstance(self.batch_infer_size, int):
            self.batch_infer_size = [self.batch_infer_size]
        if len(self.batch_infer_size) == 0:
            self.batch_infer_size.append(32)
306 307

        index = 0
M
MRXLT 已提交
308

B
barrierye 已提交
309 310 311 312
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
313
            engine.reloadable_meta = model_config_path + "/fluid_time_file"
B
barrierye 已提交
314 315
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
H
HexToString 已提交
316 317 318 319
            engine.runtime_thread_num = self.runtime_thread_num[index % len(
                self.runtime_thread_num)]
            engine.batch_infer_size = self.batch_infer_size[index % len(
                self.batch_infer_size)]
320

H
HexToString 已提交
321 322
            engine.enable_overrun = False
            engine.allow_split_request = True
Z
update  
zhangjun 已提交
323
            engine.model_dir = model_config_path
B
barrierye 已提交
324
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
325
            engine.enable_ir_optimization = self.ir_optimization
M
add trt  
MRXLT 已提交
326
            engine.use_trt = self.use_trt
327
            engine.gpu_multi_stream = self.gpu_multi_stream
Z
update  
zhangjun 已提交
328 329
            engine.use_lite = self.use_lite
            engine.use_xpu = self.use_xpu
330
            engine.use_ascend_cl = self.use_ascend_cl
Z
zhangjun 已提交
331 332
            engine.use_gpu = False

333 334 335 336 337 338 339 340 341
            # use distributed model.
            if self.dist_subgraph_index >= 0:
                engine.enable_dist_model = True
                engine.dist_carrier_id = self.dist_carrier_id
                engine.dist_cfg_file = self.dist_cfg_file
                engine.dist_nranks = self.dist_nranks
                engine.dist_endpoints.extend(self.dist_endpoints)
                engine.dist_subgraph_index = self.dist_subgraph_index

342 343 344 345 346 347
            if len(self.gpuid) == 0:
                raise ValueError("CPU: self.gpuid = -1, GPU: must set it ")
            op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",")
            for ids in op_gpu_list:
                engine.gpu_ids.extend([int(ids)])

H
HexToString 已提交
348
            if self.device == "gpu" or self.use_trt or self.gpu_multi_stream:
H
HexToString 已提交
349 350 351 352 353
                engine.use_gpu = True
                # this is for Mixed use of GPU and CPU
                # if model-1 use GPU and set the device="gpu"
                # but gpuid[1] = "-1" which means use CPU in Model-2
                # so config about GPU should be False.
H
HexToString 已提交
354 355
                # op_gpu_list = gpuid[index].split(",")
                # which is the gpuid for each engine.
H
HexToString 已提交
356 357 358 359 360 361
                if len(op_gpu_list) == 1:
                    if int(op_gpu_list[0]) == -1:
                        engine.use_gpu = False
                        engine.gpu_multi_stream = False
                        engine.use_trt = False

Z
fix  
zhangjun 已提交
362
            if os.path.exists('{}/__params__'.format(model_config_path)):
Z
update  
zhangjun 已提交
363
                engine.combined_model = True
Z
fix  
zhangjun 已提交
364 365
            else:
                engine.combined_model = False
Z
update  
zhangjun 已提交
366 367
            if use_encryption_model:
                engine.encrypted_model = True
Z
fix  
zhangjun 已提交
368
            engine.type = "PADDLE_INFER"
H
HexToString 已提交
369 370
            self.model_toolkit_conf.append(server_sdk.ModelToolkitConf())
            self.model_toolkit_conf[-1].engines.extend([engine])
371
            index = index + 1
M
MRXLT 已提交
372 373 374 375 376 377 378 379 380 381

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
382
    def _prepare_resource(self, workdir, cube_conf):
383
        self.workdir = workdir
M
MRXLT 已提交
384 385
        if self.resource_conf == None:
            self.resource_conf = server_sdk.ResourceConf()
Z
zhangjun 已提交
386 387
            for idx, op_general_model_config_fn in enumerate(
                    self.general_model_config_fn):
H
HexToString 已提交
388
                with open("{}/{}".format(workdir, op_general_model_config_fn),
Z
zhangjun 已提交
389
                          "w") as fout:
H
HexToString 已提交
390 391 392
                    fout.write(str(list(self.model_conf.values())[idx]))
                for workflow in self.workflow_conf.workflows:
                    for node in workflow.nodes:
B
bjjwwang 已提交
393
                        if "distkv" in node.name.lower():
H
HexToString 已提交
394 395 396 397 398 399 400
                            self.resource_conf.cube_config_path = workdir
                            self.resource_conf.cube_config_file = self.cube_config_fn
                            if cube_conf == None:
                                raise ValueError(
                                    "Please set the path of cube.conf while use dist_kv op."
                                )
                            shutil.copy(cube_conf, workdir)
B
bjjwwang 已提交
401
                            if "quant" in node.name.lower():
H
HexToString 已提交
402 403
                                self.resource_conf.cube_quant_bits = 8
                self.resource_conf.model_toolkit_path.extend([workdir])
Z
zhangjun 已提交
404 405
                self.resource_conf.model_toolkit_file.extend(
                    [self.model_toolkit_fn[idx]])
H
HexToString 已提交
406
                self.resource_conf.general_model_path.extend([workdir])
Z
zhangjun 已提交
407 408
                self.resource_conf.general_model_file.extend(
                    [op_general_model_config_fn])
H
HexToString 已提交
409 410 411 412 413
                #TODO:figure out the meaning of product_name and container_id.
                if self.product_name != None:
                    self.resource_conf.auth_product_name = self.product_name
                if self.container_id != None:
                    self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
414 415 416 417 418

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

H
HexToString 已提交
419
    def load_model_config(self, model_config_paths_args):
B
barrierye 已提交
420 421 422
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
423
        # of multiple models are the same.
H
HexToString 已提交
424 425 426 427 428 429 430
        if isinstance(model_config_paths_args, str):
            model_config_paths_args = [model_config_paths_args]

        for single_model_config in model_config_paths_args:
            if os.path.isdir(single_model_config):
                pass
            elif os.path.isfile(single_model_config):
Z
zhangjun 已提交
431 432 433
                raise ValueError(
                    "The input of --model should be a dir not file.")

H
HexToString 已提交
434
        if isinstance(model_config_paths_args, list):
B
barrierye 已提交
435
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
436
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
437
            # it from workflow_conf.
H
HexToString 已提交
438

H
HexToString 已提交
439 440 441
            # now only support single-workflow.
            # TODO:support multi-workflow
            model_config_paths_list_idx = 0
B
barrierye 已提交
442
            for node in self.workflow_conf.workflows[0].nodes:
H
HexToString 已提交
443
                if node.type in self.default_engine_types:
H
HexToString 已提交
444 445 446 447
                    if node.name is None:
                        raise Exception(
                            "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                        )
Z
zhangjun 已提交
448

H
HexToString 已提交
449
                    f = open("{}/serving_server_conf.prototxt".format(
Z
zhangjun 已提交
450 451 452 453 454 455 456 457 458 459 460 461
                        model_config_paths_args[model_config_paths_list_idx]),
                             'r')
                    self.model_conf[
                        node.name] = google.protobuf.text_format.Merge(
                            str(f.read()), m_config.GeneralModelConfig())
                    self.model_config_paths[
                        node.name] = model_config_paths_args[
                            model_config_paths_list_idx]
                    self.general_model_config_fn.append(
                        node.name + "/general_model.prototxt")
                    self.model_toolkit_fn.append(node.name +
                                                 "/model_toolkit.prototxt")
H
HexToString 已提交
462 463
                    self.subdirectory.append(node.name)
                    model_config_paths_list_idx += 1
Z
zhangjun 已提交
464 465
                    if model_config_paths_list_idx == len(
                            model_config_paths_args):
H
HexToString 已提交
466 467 468 469 470
                        break
        #Right now, this is not useful.
        elif isinstance(model_config_paths_args, dict):
            self.model_config_paths = collections.OrderedDict()
            for node_str, path in model_config_paths_args.items():
B
barrierye 已提交
471 472 473 474 475
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
H
HexToString 已提交
476 477
            f = open("{}/serving_server_conf.prototxt".format(path), 'r')
            self.model_conf[node.name] = google.protobuf.text_format.Merge(
Z
zhangjun 已提交
478
                str(f.read()), m_config.GeneralModelConfig())
B
barrierye 已提交
479
        else:
Z
zhangjun 已提交
480 481 482 483
            raise Exception(
                "The type of model_config_paths must be str or list or "
                "dict({op: model_path}), not {}.".format(
                    type(model_config_paths_args)))
M
MRXLT 已提交
484 485
        # check config here
        # print config here
Z
update  
zhangjun 已提交
486

Z
zhangjun 已提交
487 488 489
    def use_mkl(self, flag):
        self.mkl_flag = flag

Z
zhangjun 已提交
490
    def check_avx(self):
491 492 493 494 495
        p = subprocess.Popen(
            ['cat /proc/cpuinfo | grep avx 2>/dev/null'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True)
Z
zhangjun 已提交
496
        out, err = p.communicate()
Z
zhangjun 已提交
497
        if err == b'' and len(out) > 0:
Z
zhangjun 已提交
498 499 500 501
            return True
        else:
            return False

Z
zhangjun 已提交
502 503
    def get_device_version(self):
        avx_flag = False
Z
zhangjun 已提交
504
        avx_support = self.check_avx()
Z
update  
zhangjun 已提交
505
        if avx_support:
Z
zhangjun 已提交
506
            avx_flag = True
Z
zhangjun 已提交
507 508
            self.use_mkl(True)
        mkl_flag = self.mkl_flag
Z
zhangjun 已提交
509 510
        if avx_flag:
            if mkl_flag:
Z
update  
zhangjun 已提交
511
                device_version = "cpu-avx-mkl"
Z
zhangjun 已提交
512
            else:
Z
update  
zhangjun 已提交
513
                device_version = "cpu-avx-openblas"
Z
zhangjun 已提交
514 515 516 517 518
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
Z
update  
zhangjun 已提交
519 520 521 522 523 524 525
            device_version = "cpu-noavx-openblas"
        return device_version

    def get_serving_bin_name(self):
        if device_type == "0":
            device_version = self.get_device_version()
        elif device_type == "1":
B
bjjwwang 已提交
526
            if version_suffix == "101" or version_suffix == "102" or version_suffix == "1028" or version_suffix == "112":
Z
update  
zhangjun 已提交
527 528 529 530 531
                device_version = "gpu-" + version_suffix
            else:
                device_version = "gpu-cuda" + version_suffix
        elif device_type == "2":
            device_version = "xpu-" + platform.machine()
S
ShiningZhang 已提交
532 533
        elif device_type == "3":
            device_version = "rocm-" + platform.machine()
534 535 536 537 538
        elif device_type == "4":
            if self.use_lite:
                device_version = "ascendcl-lite-" + platform.machine()
            else:
                device_version = "ascendcl-" + platform.machine()
Z
zhangjun 已提交
539
        return device_version
M
MRXLT 已提交
540 541 542

    def download_bin(self):
        os.chdir(self.module_path)
M
MRXLT 已提交
543 544 545

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
Z
update  
zhangjun 已提交
546

Z
fix  
zhangjun 已提交
547
        folder_name = "serving-%s-%s" % (self.get_serving_bin_name(),
548
                                         version_tag)
Z
fix  
zhangjun 已提交
549
        tar_name = "%s.tar.gz" % folder_name
550
        bin_url = "https://paddle-serving.bj.bcebos.com/test-dev/bin/%s" % tar_name
Z
fix  
zhangjun 已提交
551

552 553 554 555
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
556 557 558

        fcntl.flock(version_file, fcntl.LOCK_EX)

559 560 561 562 563
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
564 565
        if not os.path.exists(self.server_path):
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
566

M
MRXLT 已提交
567 568 569 570
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
571
                raise SystemExit(
T
TeslaZhao 已提交
572 573
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
574 575 576 577 578 579
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
Z
zhangjun 已提交
580
                    open(download_flag, "a").close()
M
MRXLT 已提交
581
                except:
Z
zhangjun 已提交
582 583
                    if os.path.exists(self.server_path):
                        os.remove(self.server_path)
M
MRXLT 已提交
584
                    raise SystemExit(
T
TeslaZhao 已提交
585 586
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
587 588
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
589
        #release lock
B
barrierye 已提交
590
        version_file.close()
M
MRXLT 已提交
591 592
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
update  
zhangjun 已提交
593

M
MRXLT 已提交
594 595 596
    def prepare_server(self,
                       workdir=None,
                       port=9292,
H
HexToString 已提交
597
                       device=None,
W
wangjiawei04 已提交
598
                       use_encryption_model=False,
M
MRXLT 已提交
599
                       cube_conf=None):
H
HexToString 已提交
600 601 602 603 604 605 606 607
        # if `device` is not set, use self.device
        # self.device may not be changed.
        # or self.device may have changed by set_device.
        if device == None:
            device = self.device
        # if `device` is set, let self.device = device.
        else:
            self.device = device
M
MRXLT 已提交
608 609
        if workdir == None:
            workdir = "./tmp"
Z
zhangjun 已提交
610
            os.system("mkdir -p {}".format(workdir))
M
MRXLT 已提交
611
        else:
Z
zhangjun 已提交
612
            os.system("mkdir -p {}".format(workdir))
H
HexToString 已提交
613
        for subdir in self.subdirectory:
614
            os.system("mkdir -p {}/{}".format(workdir, subdir))
H
HexToString 已提交
615 616
            os.system("touch {}/{}/fluid_time_file".format(workdir, subdir))

M
MRXLT 已提交
617
        if not self.port_is_available(port):
G
gongweibao 已提交
618
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
619

G
guru4elephant 已提交
620
        self.set_port(port)
M
MRXLT 已提交
621
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
622 623
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
624 625 626 627 628
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        self._write_pb_str(infer_service_fn, self.infer_service_conf)
H
HexToString 已提交
629 630

        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
M
MRXLT 已提交
631
        self._write_pb_str(workflow_fn, self.workflow_conf)
H
HexToString 已提交
632 633

        resource_fn = "{}/{}".format(workdir, self.resource_fn)
M
MRXLT 已提交
634
        self._write_pb_str(resource_fn, self.resource_conf)
H
HexToString 已提交
635

Z
zhangjun 已提交
636
        for idx, single_model_toolkit_fn in enumerate(self.model_toolkit_fn):
H
HexToString 已提交
637 638
            model_toolkit_fn = "{}/{}".format(workdir, single_model_toolkit_fn)
            self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf[idx])
M
MRXLT 已提交
639

M
MRXLT 已提交
640
    def port_is_available(self, port):
M
MRXLT 已提交
641 642
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
H
HexToString 已提交
643
            result = sock.connect_ex(('127.0.0.1', port))
M
MRXLT 已提交
644 645 646 647 648
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
649 650 651
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
652
        self.check_local_bin()
M
MRXLT 已提交
653 654
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
655 656 657
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
658 659
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
660
        #self.check_cuda()
H
HexToString 已提交
661 662 663 664 665 666 667 668
        command = "{} " \
                    "-enable_model_toolkit " \
                    "-inferservice_path {} " \
                    "-inferservice_file {} " \
                    "-max_concurrency {} " \
                    "-num_threads {} " \
                    "-port {} " \
                    "-precision {} " \
669
                    "-use_calib {} " \
H
HexToString 已提交
670 671 672 673 674 675
                    "-reload_interval_s {} " \
                    "-resource_path {} " \
                    "-resource_file {} " \
                    "-workflow_path {} " \
                    "-workflow_file {} " \
                    "-bthread_concurrency {} " \
676
                    "-max_body_size {} ".format(
H
HexToString 已提交
677 678 679 680 681 682 683 684 685 686 687 688 689 690
                        self.bin_path,
                        self.workdir,
                        self.infer_service_fn,
                        self.max_concurrency,
                        self.num_threads,
                        self.port,
                        self.precision,
                        self.use_calib,
                        self.reload_interval_s,
                        self.workdir,
                        self.resource_fn,
                        self.workdir,
                        self.workflow_fn,
                        self.num_threads,
691
                        self.max_body_size)
S
ShiningZhang 已提交
692 693 694 695 696 697 698 699 700 701 702
        if self.enable_prometheus:
            command =   command + \
                        "-enable_prometheus={} " \
                        "-prometheus_port {} ".format(
                        self.enable_prometheus,
                        self.prometheus_port)
        if self.request_cache_size > 0:
            command =   command + \
                        "-request_cache_size {} ".format(
                            self.request_cache_size
                        )
H
HexToString 已提交
703

M
MRXLT 已提交
704 705
        print("Going to Run Comand")
        print(command)
706

M
MRXLT 已提交
707
        os.system(command)