server.py 22.5 KB
Newer Older
M
MRXLT 已提交
1 2 3

import os
import tarfile
M
MRXLT 已提交
4
import socket
Z
zhangjun 已提交
5
import paddle_serving_server as paddle_serving_server
6
import time
7
from .version import serving_server_version
M
MRXLT 已提交
8
from contextlib import closing
G
guru4elephant 已提交
9
import argparse
Z
zhangjun 已提交
10

J
Jiawei Wang 已提交
11
import sys
W
wangjiawei04 已提交
12 13
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
14
import shutil
B
barrierye 已提交
15 16
import numpy as np
import grpc
B
barrierye 已提交
17
import sys
Z
zhangjun 已提交
18

B
barrierye 已提交
19 20 21
from multiprocessing import Pool, Process
from concurrent import futures

M
MRXLT 已提交
22 23 24 25 26 27 28
class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
29
        self.ir_optimization = False
M
MRXLT 已提交
30 31 32 33 34 35
        self.model_conf = None
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
        self.general_model_config_fn = "general_model.prototxt"
W
wangjiawei04 已提交
36
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
37 38
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
39
        self.num_threads = 2
M
MRXLT 已提交
40 41
        self.port = 8080
        self.reload_interval_s = 10
M
MRXLT 已提交
42
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
43 44
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
45
        self.use_local_bin = False
Z
zhangjun 已提交
46
        self.mkl_flag = False
Z
zhangjun 已提交
47
        self.device = "cpu"
M
MRXLT 已提交
48
        self.gpuid = 0
M
add trt  
MRXLT 已提交
49
        self.use_trt = False
Z
zhangjun 已提交
50 51
        self.use_lite = False
        self.use_xpu = False
B
barrierye 已提交
52
        self.model_config_paths = None  # for multi-model in a workflow
53 54
        self.product_name = None
        self.container_id = None
M
MRXLT 已提交
55

B
fix cpu  
barriery 已提交
56 57 58 59
    def get_fetch_list(self):
        fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
        return fetch_names

M
MRXLT 已提交
60 61 62 63 64 65
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
66 67 68 69 70 71 72 73
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

74 75 76
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
77 78 79 80 81 82 83 84 85
    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
86 87 88
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
89 90 91
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
92 93 94
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

95 96 97 98 99 100 101 102 103 104
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
105 106 107 108
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
109

M
MRXLT 已提交
110
    def check_cuda(self):
M
MRXLT 已提交
111 112 113
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
114
            raise SystemExit(
M
MRXLT 已提交
115
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
116 117
            )

Z
zhangjun 已提交
118 119 120
    def set_device(self, device="cpu"):
        self.device = device

M
MRXLT 已提交
121 122 123
    def set_gpuid(self, gpuid=0):
        self.gpuid = gpuid

M
bug fix  
MRXLT 已提交
124
    def set_trt(self):
M
add trt  
MRXLT 已提交
125 126
        self.use_trt = True

Z
zhangjun 已提交
127 128 129 130 131 132
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

H
HexToString 已提交
133
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
M
MRXLT 已提交
134 135 136
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()

B
barrierye 已提交
137 138 139 140 141 142 143 144 145 146 147 148
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
            engine.reloadable_meta = self.workdir + "/fluid_time_file"
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
            engine.runtime_thread_num = 0
            engine.batch_infer_size = 0
            engine.enable_batch_align = 0
            engine.model_data_path = model_config_path
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
149
            engine.enable_ir_optimization = self.ir_optimization
B
barrierye 已提交
150 151
            engine.static_optimization = False
            engine.force_update_static_cache = False
M
add trt  
MRXLT 已提交
152
            engine.use_trt = self.use_trt
153 154 155
            if os.path.exists('{}/__params__'.format(model_config_path)):
                suffix = ""
            else:
W
wangjiawei04 已提交
156
                suffix = "_DIR"
Z
zhangjun 已提交
157 158 159
            if device == "arm":
                engine.use_lite = self.use_lite
                engine.use_xpu = self.use_xpu
Z
zhangjun 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172
            engine.type = "PaddleInferenceEngine"
            # if device == "cpu":
            #     if use_encryption_model:
            #         engine.type = "FLUID_CPU_ANALYSIS_ENCRPT"
            #     else:
            #         engine.type = "FLUID_CPU_ANALYSIS" + suffix
            # elif device == "gpu":
            #     if use_encryption_model:
            #         engine.type = "FLUID_GPU_ANALYSIS_ENCRPT"
            #     else:
            #         engine.type = "FLUID_GPU_ANALYSIS" + suffix
            # elif device == "arm":
            #     engine.type = "FLUID_ARM_ANALYSIS" + suffix
B
barrierye 已提交
173
            self.model_toolkit_conf.engines.extend([engine])
M
MRXLT 已提交
174 175 176 177 178 179 180 181 182 183

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
184
    def _prepare_resource(self, workdir, cube_conf):
185
        self.workdir = workdir
M
MRXLT 已提交
186 187 188 189 190
        if self.resource_conf == None:
            with open("{}/{}".format(workdir, self.general_model_config_fn),
                      "w") as fout:
                fout.write(str(self.model_conf))
            self.resource_conf = server_sdk.ResourceConf()
W
wangjiawei04 已提交
191 192 193 194 195
            for workflow in self.workflow_conf.workflows:
                for node in workflow.nodes:
                    if "dist_kv" in node.name:
                        self.resource_conf.cube_config_path = workdir
                        self.resource_conf.cube_config_file = self.cube_config_fn
M
MRXLT 已提交
196 197 198 199 200
                        if cube_conf == None:
                            raise ValueError(
                                "Please set the path of cube.conf while use dist_kv op."
                            )
                        shutil.copy(cube_conf, workdir)
M
MRXLT 已提交
201 202 203 204
            self.resource_conf.model_toolkit_path = workdir
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
205 206 207 208
            if self.product_name != None:
                self.resource_conf.auth_product_name = self.product_name
            if self.container_id != None:
                self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
209 210 211 212 213

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

B
barrierye 已提交
214 215 216 217
    def load_model_config(self, model_config_paths):
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
218
        # of multiple models are the same.
B
barrierye 已提交
219 220
        workflow_oi_config_path = None
        if isinstance(model_config_paths, str):
B
barrierye 已提交
221
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
222
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
223 224 225
            # it from workflow_conf.
            default_engine_names = [
                'general_infer_0', 'general_dist_kv_infer_0',
B
barrierye 已提交
226
                'general_dist_kv_quant_infer_0'
B
barrierye 已提交
227 228
            ]
            engine_name = None
B
barrierye 已提交
229
            for node in self.workflow_conf.workflows[0].nodes:
B
barrierye 已提交
230 231 232 233 234 235 236 237 238
                if node.name in default_engine_names:
                    engine_name = node.name
                    break
            if engine_name is None:
                raise Exception(
                    "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                )
            self.model_config_paths = {engine_name: model_config_paths}
            workflow_oi_config_path = self.model_config_paths[engine_name]
B
barrierye 已提交
239 240 241 242 243 244 245 246
        elif isinstance(model_config_paths, dict):
            self.model_config_paths = {}
            for node_str, path in model_config_paths.items():
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
M
MRXLT 已提交
247 248
            workflow_oi_config_path = list(self.model_config_paths.items())[0][
                1]
B
barrierye 已提交
249 250 251 252 253
        else:
            raise Exception("The type of model_config_paths must be str or "
                            "dict({op: model_path}), not {}.".format(
                                type(model_config_paths)))

M
MRXLT 已提交
254
        self.model_conf = m_config.GeneralModelConfig()
B
barrierye 已提交
255 256 257
        f = open(
            "{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
            'r')
M
MRXLT 已提交
258 259 260 261
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
        # check config here
        # print config here
Z
zhangjun 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    
    def use_mkl(self, flag):
        self.mkl_flag = flag

    def get_device_version(self):
        avx_flag = False
        mkl_flag = self.mkl_flag
        openblas_flag = False
        r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
        if r == 0:
            avx_flag = True
        if avx_flag:
            if mkl_flag:
                device_version = "serving-cpu-avx-mkl-"
            else:
                device_version = "serving-cpu-avx-openblas-"
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
            device_version = "serving-cpu-noavx-openblas-"
        return device_version
M
MRXLT 已提交
285 286 287 288

    def download_bin(self):
        os.chdir(self.module_path)
        need_download = False
M
MRXLT 已提交
289 290 291 292 293

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
        import re
        for line in version_file.readlines():
Z
zhangjun 已提交
294
            # to add, version_suffix
M
MRXLT 已提交
295 296
            if re.match("cuda_version", line):
                cuda_version = line.split("\"")[1]
W
wangjiawei04 已提交
297
                if cuda_version == "101" or cuda_version == "102":
M
bug fix  
MRXLT 已提交
298
                    device_version = "serving-gpu-" + cuda_version + "-"
Z
zhangjun 已提交
299
                elif cuda_version == "arm" or cuda_version == "arm-xpu":
Z
zhangjun 已提交
300
                    device_version = "serving-" + cuda_version + "-"
Z
update  
zhangjun 已提交
301
                else:
Z
zhangjun 已提交
302
                    device_version = "serving-gpu-cuda" + cuda_version + "-"
M
MRXLT 已提交
303

304 305
        folder_name = device_version + serving_server_version
        tar_name = folder_name + ".tar.gz"
M
MRXLT 已提交
306
        bin_url = "https://paddle-serving.bj.bcebos.com/bin/" + tar_name
307 308 309 310
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
311 312 313

        fcntl.flock(version_file, fcntl.LOCK_EX)

314 315 316 317 318
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
319
        if not os.path.exists(self.server_path):
320 321
            os.system("touch {}/{}.is_download".format(self.module_path,
                                                       folder_name))
M
MRXLT 已提交
322
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
323

M
MRXLT 已提交
324 325 326 327
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
328
                raise SystemExit(
T
TeslaZhao 已提交
329 330
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
331 332 333 334 335 336 337 338 339
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
                except:
                    if os.path.exists(exe_path):
                        os.remove(exe_path)
M
MRXLT 已提交
340
                    raise SystemExit(
T
TeslaZhao 已提交
341 342
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
343 344
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
345
        #release lock
B
barrierye 已提交
346
        version_file.close()
M
MRXLT 已提交
347 348
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
zhangjun 已提交
349 350
    
    
M
MRXLT 已提交
351 352 353 354
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
W
wangjiawei04 已提交
355
                       use_encryption_model=False,
M
MRXLT 已提交
356
                       cube_conf=None):
M
MRXLT 已提交
357 358 359 360 361 362 363
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
        else:
            os.system("mkdir {}".format(workdir))
        os.system("touch {}/fluid_time_file".format(workdir))

M
MRXLT 已提交
364
        if not self.port_is_available(port):
G
gongweibao 已提交
365
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
366

G
guru4elephant 已提交
367
        self.set_port(port)
M
MRXLT 已提交
368
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
369 370
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

M
MRXLT 已提交
384
    def port_is_available(self, port):
M
MRXLT 已提交
385 386
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
387
            result = sock.connect_ex(('0.0.0.0', port))
M
MRXLT 已提交
388 389 390 391 392
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
393 394 395
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
396
        self.check_local_bin()
M
MRXLT 已提交
397 398
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
399 400 401
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
402 403
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
404
        #self.check_cuda()
Z
zhangjun 已提交
405 406
        # Todo: merge CPU and GPU code, remove device to model_toolkit
        if self.device == "cpu" or self.device == "arm":
Z
zhangjun 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.max_body_size)
        else:
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-gpuid {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.gpuid,
                          self.max_body_size)
M
MRXLT 已提交
464 465
        print("Going to Run Comand")
        print(command)
466

M
MRXLT 已提交
467
        os.system(command)
B
barrierye 已提交
468 469

class MultiLangServer(object):
B
barrierye 已提交
470
    def __init__(self):
B
barrierye 已提交
471
        self.bserver_ = Server()
B
barrierye 已提交
472 473 474 475 476
        self.worker_num_ = 4
        self.body_size_ = 64 * 1024 * 1024
        self.concurrency_ = 100000
        self.is_multi_model_ = False  # for model ensemble

B
barrierye 已提交
477
    def set_max_concurrency(self, concurrency):
B
barrierye 已提交
478
        self.concurrency_ = concurrency
B
barrierye 已提交
479 480
        self.bserver_.set_max_concurrency(concurrency)

481 482 483
    def set_device(self, device="cpu"):
        self.device = device

B
barrierye 已提交
484
    def set_num_threads(self, threads):
B
barrierye 已提交
485
        self.worker_num_ = threads
B
barrierye 已提交
486 487 488 489
        self.bserver_.set_num_threads(threads)

    def set_max_body_size(self, body_size):
        self.bserver_.set_max_body_size(body_size)
B
barrierye 已提交
490 491 492 493 494 495
        if body_size >= self.body_size_:
            self.body_size_ = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )
B
barrierye 已提交
496

497 498 499
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

B
barrierye 已提交
500 501 502 503 504
    def set_port(self, port):
        self.gport_ = port

    def set_reload_interval(self, interval):
        self.bserver_.set_reload_interval(interval)
B
barrierye 已提交
505 506 507 508

    def set_op_sequence(self, op_seq):
        self.bserver_.set_op_sequence(op_seq)

B
barrierye 已提交
509 510
    def set_op_graph(self, op_graph):
        self.bserver_.set_op_graph(op_graph)
Z
zhangjun 已提交
511 512 513 514
    
    def use_mkl(self, flag):
        self.bserver_.use_mkl(flag)
    
B
barrierye 已提交
515 516 517 518 519
    def set_memory_optimize(self, flag=False):
        self.bserver_.set_memory_optimize(flag)

    def set_ir_optimize(self, flag=False):
        self.bserver_.set_ir_optimize(flag)
B
barrierye 已提交
520

B
barrierye 已提交
521 522 523
    def set_gpuid(self, gpuid=0):
        self.bserver_.set_gpuid(gpuid)

B
barrierye 已提交
524 525 526 527 528 529 530 531 532 533 534
    def load_model_config(self, server_config_paths, client_config_path=None):
        self.bserver_.load_model_config(server_config_paths)
        if client_config_path is None:
            if isinstance(server_config_paths, dict):
                self.is_multi_model_ = True
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    list(server_config_paths.items())[0][1])
            else:
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    server_config_paths)
        self.bclient_config_path_ = client_config_path
B
barrierye 已提交
535

M
MRXLT 已提交
536 537 538 539
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
H
HexToString 已提交
540
                       use_encryption_model=False,
M
MRXLT 已提交
541
                       cube_conf=None):
B
barrierye 已提交
542 543
        if not self._port_is_available(port):
            raise SystemExit("Prot {} is already used".format(port))
B
barrierye 已提交
544 545 546 547 548 549 550 551
        default_port = 12000
        self.port_list_ = []
        for i in range(1000):
            if default_port + i != port and self._port_is_available(default_port
                                                                    + i):
                self.port_list_.append(default_port + i)
                break
        self.bserver_.prepare_server(
M
MRXLT 已提交
552 553 554
            workdir=workdir,
            port=self.port_list_[0],
            device=device,
H
HexToString 已提交
555
            use_encryption_model=use_encryption_model,
M
MRXLT 已提交
556
            cube_conf=cube_conf)
B
barrierye 已提交
557
        self.set_port(port)
B
barrierye 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571

    def _launch_brpc_service(self, bserver):
        bserver.run_server()

    def _port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        return result != 0

    def run_server(self):
        p_bserver = Process(
            target=self._launch_brpc_service, args=(self.bserver_, ))
        p_bserver.start()
B
barrierye 已提交
572 573
        options = [('grpc.max_send_message_length', self.body_size_),
                   ('grpc.max_receive_message_length', self.body_size_)]
B
barrierye 已提交
574
        server = grpc.server(
B
barrierye 已提交
575 576 577
            futures.ThreadPoolExecutor(max_workers=self.worker_num_),
            options=options,
            maximum_concurrent_rpcs=self.concurrency_)
B
barrierye 已提交
578
        multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
B
barrierye 已提交
579
            MultiLangServerServiceServicer(
B
barrierye 已提交
580
                self.bclient_config_path_, self.is_multi_model_,
B
barrierye 已提交
581
                ["0.0.0.0:{}".format(self.port_list_[0])]), server)
B
barrierye 已提交
582 583 584
        server.add_insecure_port('[::]:{}'.format(self.gport_))
        server.start()
        p_bserver.join()
Z
update  
zhangjun 已提交
585
        server.wait_for_termination()