server.py 22.4 KB
Newer Older
M
MRXLT 已提交
1 2 3

import os
import tarfile
M
MRXLT 已提交
4
import socket
5
import paddle_serving_server_gpu as paddle_serving_server
6
import time
7
from .version import serving_server_version
M
MRXLT 已提交
8
from contextlib import closing
G
guru4elephant 已提交
9
import argparse
Z
zhangjun 已提交
10

J
Jiawei Wang 已提交
11
import sys
W
wangjiawei04 已提交
12 13
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
14
import shutil
B
barrierye 已提交
15 16
import numpy as np
import grpc
B
barrierye 已提交
17
import sys
Z
zhangjun 已提交
18

B
barrierye 已提交
19 20 21
from multiprocessing import Pool, Process
from concurrent import futures

M
MRXLT 已提交
22 23 24 25 26 27 28
class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
29
        self.ir_optimization = False
M
MRXLT 已提交
30 31 32 33 34 35
        self.model_conf = None
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
        self.general_model_config_fn = "general_model.prototxt"
W
wangjiawei04 已提交
36
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
37 38
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
39
        self.num_threads = 2
M
MRXLT 已提交
40 41
        self.port = 8080
        self.reload_interval_s = 10
M
MRXLT 已提交
42
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
43 44
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
45
        self.use_local_bin = False
Z
zhangjun 已提交
46
        self.mkl_flag = False
Z
zhangjun 已提交
47
        self.device = "cpu"
M
MRXLT 已提交
48
        self.gpuid = 0
M
add trt  
MRXLT 已提交
49
        self.use_trt = False
Z
zhangjun 已提交
50 51
        self.use_lite = False
        self.use_xpu = False
B
barrierye 已提交
52
        self.model_config_paths = None  # for multi-model in a workflow
53 54
        self.product_name = None
        self.container_id = None
M
MRXLT 已提交
55

B
fix cpu  
barriery 已提交
56 57 58 59
    def get_fetch_list(self):
        fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
        return fetch_names

M
MRXLT 已提交
60 61 62 63 64 65
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
66 67 68 69 70 71 72 73
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

74 75 76
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
77 78 79 80 81 82 83 84 85
    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
86 87 88
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
89 90 91
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
92 93 94
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

95 96 97 98 99 100 101 102 103 104
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
105 106 107 108
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
109

M
MRXLT 已提交
110
    def check_cuda(self):
M
MRXLT 已提交
111 112 113
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
114
            raise SystemExit(
M
MRXLT 已提交
115
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
116 117
            )

Z
zhangjun 已提交
118 119 120
    def set_device(self, device="cpu"):
        self.device = device

M
MRXLT 已提交
121 122 123
    def set_gpuid(self, gpuid=0):
        self.gpuid = gpuid

M
bug fix  
MRXLT 已提交
124
    def set_trt(self):
M
add trt  
MRXLT 已提交
125 126
        self.use_trt = True

Z
zhangjun 已提交
127 128 129 130 131 132
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

H
HexToString 已提交
133
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
M
MRXLT 已提交
134 135 136
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()

B
barrierye 已提交
137 138 139 140 141 142 143 144 145 146 147 148
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
            engine.reloadable_meta = self.workdir + "/fluid_time_file"
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
            engine.runtime_thread_num = 0
            engine.batch_infer_size = 0
            engine.enable_batch_align = 0
            engine.model_data_path = model_config_path
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
149
            engine.enable_ir_optimization = self.ir_optimization
B
barrierye 已提交
150 151
            engine.static_optimization = False
            engine.force_update_static_cache = False
M
add trt  
MRXLT 已提交
152
            engine.use_trt = self.use_trt
153 154 155
            if os.path.exists('{}/__params__'.format(model_config_path)):
                suffix = ""
            else:
W
wangjiawei04 已提交
156
                suffix = "_DIR"
Z
zhangjun 已提交
157 158 159
            if device == "arm":
                engine.use_lite = self.use_lite
                engine.use_xpu = self.use_xpu
B
barrierye 已提交
160
            if device == "cpu":
W
wangjiawei04 已提交
161
                if use_encryption_model:
H
HexToString 已提交
162 163
                    engine.type = "FLUID_CPU_ANALYSIS_ENCRPT"
                else:
W
wangjiawei04 已提交
164
                    engine.type = "FLUID_CPU_ANALYSIS" + suffix
B
barrierye 已提交
165
            elif device == "gpu":
W
wangjiawei04 已提交
166
                if use_encryption_model:
H
HexToString 已提交
167 168
                    engine.type = "FLUID_GPU_ANALYSIS_ENCRPT"
                else:
W
wangjiawei04 已提交
169
                    engine.type = "FLUID_GPU_ANALYSIS" + suffix
Z
zhangjun 已提交
170
            elif device == "arm":
171
                engine.type = "FLUID_ARM_ANALYSIS" + suffix
B
barrierye 已提交
172
            self.model_toolkit_conf.engines.extend([engine])
M
MRXLT 已提交
173 174 175 176 177 178 179 180 181 182

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
183
    def _prepare_resource(self, workdir, cube_conf):
184
        self.workdir = workdir
M
MRXLT 已提交
185 186 187 188 189
        if self.resource_conf == None:
            with open("{}/{}".format(workdir, self.general_model_config_fn),
                      "w") as fout:
                fout.write(str(self.model_conf))
            self.resource_conf = server_sdk.ResourceConf()
W
wangjiawei04 已提交
190 191 192 193 194
            for workflow in self.workflow_conf.workflows:
                for node in workflow.nodes:
                    if "dist_kv" in node.name:
                        self.resource_conf.cube_config_path = workdir
                        self.resource_conf.cube_config_file = self.cube_config_fn
M
MRXLT 已提交
195 196 197 198 199
                        if cube_conf == None:
                            raise ValueError(
                                "Please set the path of cube.conf while use dist_kv op."
                            )
                        shutil.copy(cube_conf, workdir)
M
MRXLT 已提交
200 201 202 203
            self.resource_conf.model_toolkit_path = workdir
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
204 205 206 207
            if self.product_name != None:
                self.resource_conf.auth_product_name = self.product_name
            if self.container_id != None:
                self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
208 209 210 211 212

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

B
barrierye 已提交
213 214 215 216
    def load_model_config(self, model_config_paths):
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
217
        # of multiple models are the same.
B
barrierye 已提交
218 219
        workflow_oi_config_path = None
        if isinstance(model_config_paths, str):
B
barrierye 已提交
220
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
221
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
222 223 224
            # it from workflow_conf.
            default_engine_names = [
                'general_infer_0', 'general_dist_kv_infer_0',
B
barrierye 已提交
225
                'general_dist_kv_quant_infer_0'
B
barrierye 已提交
226 227
            ]
            engine_name = None
B
barrierye 已提交
228
            for node in self.workflow_conf.workflows[0].nodes:
B
barrierye 已提交
229 230 231 232 233 234 235 236 237
                if node.name in default_engine_names:
                    engine_name = node.name
                    break
            if engine_name is None:
                raise Exception(
                    "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                )
            self.model_config_paths = {engine_name: model_config_paths}
            workflow_oi_config_path = self.model_config_paths[engine_name]
B
barrierye 已提交
238 239 240 241 242 243 244 245
        elif isinstance(model_config_paths, dict):
            self.model_config_paths = {}
            for node_str, path in model_config_paths.items():
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
M
MRXLT 已提交
246 247
            workflow_oi_config_path = list(self.model_config_paths.items())[0][
                1]
B
barrierye 已提交
248 249 250 251 252
        else:
            raise Exception("The type of model_config_paths must be str or "
                            "dict({op: model_path}), not {}.".format(
                                type(model_config_paths)))

M
MRXLT 已提交
253
        self.model_conf = m_config.GeneralModelConfig()
B
barrierye 已提交
254 255 256
        f = open(
            "{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
            'r')
M
MRXLT 已提交
257 258 259 260
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
        # check config here
        # print config here
Z
zhangjun 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
    
    def use_mkl(self, flag):
        self.mkl_flag = flag

    def get_device_version(self):
        avx_flag = False
        mkl_flag = self.mkl_flag
        openblas_flag = False
        r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
        if r == 0:
            avx_flag = True
        if avx_flag:
            if mkl_flag:
                device_version = "serving-cpu-avx-mkl-"
            else:
                device_version = "serving-cpu-avx-openblas-"
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
            device_version = "serving-cpu-noavx-openblas-"
        return device_version
M
MRXLT 已提交
284 285 286 287

    def download_bin(self):
        os.chdir(self.module_path)
        need_download = False
M
MRXLT 已提交
288 289 290 291 292 293 294

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
        import re
        for line in version_file.readlines():
            if re.match("cuda_version", line):
                cuda_version = line.split("\"")[1]
W
wangjiawei04 已提交
295
                if cuda_version == "101" or cuda_version == "102":
M
bug fix  
MRXLT 已提交
296
                    device_version = "serving-gpu-" + cuda_version + "-"
Z
zhangjun 已提交
297
                elif cuda_version == "arm" or cuda_version == "arm-xpu":
Z
zhangjun 已提交
298
                    device_version = "serving-" + cuda_version + "-"
Z
update  
zhangjun 已提交
299
                else:
Z
zhangjun 已提交
300
                    device_version = "serving-gpu-cuda" + cuda_version + "-"
M
MRXLT 已提交
301

302 303
        folder_name = device_version + serving_server_version
        tar_name = folder_name + ".tar.gz"
M
MRXLT 已提交
304
        bin_url = "https://paddle-serving.bj.bcebos.com/bin/" + tar_name
305 306 307 308
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
309 310 311

        fcntl.flock(version_file, fcntl.LOCK_EX)

312 313 314 315 316
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
317
        if not os.path.exists(self.server_path):
318 319
            os.system("touch {}/{}.is_download".format(self.module_path,
                                                       folder_name))
M
MRXLT 已提交
320
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
321

M
MRXLT 已提交
322 323 324 325
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
326
                raise SystemExit(
T
TeslaZhao 已提交
327 328
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
329 330 331 332 333 334 335 336 337
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
                except:
                    if os.path.exists(exe_path):
                        os.remove(exe_path)
M
MRXLT 已提交
338
                    raise SystemExit(
T
TeslaZhao 已提交
339 340
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
341 342
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
343
        #release lock
B
barrierye 已提交
344
        version_file.close()
M
MRXLT 已提交
345 346
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
zhangjun 已提交
347 348
    
    
M
MRXLT 已提交
349 350 351 352
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
W
wangjiawei04 已提交
353
                       use_encryption_model=False,
M
MRXLT 已提交
354
                       cube_conf=None):
M
MRXLT 已提交
355 356 357 358 359 360 361
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
        else:
            os.system("mkdir {}".format(workdir))
        os.system("touch {}/fluid_time_file".format(workdir))

M
MRXLT 已提交
362
        if not self.port_is_available(port):
G
gongweibao 已提交
363
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
364

G
guru4elephant 已提交
365
        self.set_port(port)
M
MRXLT 已提交
366
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
367 368
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

M
MRXLT 已提交
382
    def port_is_available(self, port):
M
MRXLT 已提交
383 384
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
385
            result = sock.connect_ex(('0.0.0.0', port))
M
MRXLT 已提交
386 387 388 389 390
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
391 392 393
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
394
        self.check_local_bin()
M
MRXLT 已提交
395 396
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
397 398 399
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
400 401
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
402
        #self.check_cuda()
Z
zhangjun 已提交
403 404
        # Todo: merge CPU and GPU code, remove device to model_toolkit
        if self.device == "cpu" or self.device == "arm":
Z
zhangjun 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.max_body_size)
        else:
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-gpuid {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.gpuid,
                          self.max_body_size)
M
MRXLT 已提交
462 463
        print("Going to Run Comand")
        print(command)
464

M
MRXLT 已提交
465
        os.system(command)
B
barrierye 已提交
466 467

class MultiLangServer(object):
B
barrierye 已提交
468
    def __init__(self):
B
barrierye 已提交
469
        self.bserver_ = Server()
B
barrierye 已提交
470 471 472 473 474
        self.worker_num_ = 4
        self.body_size_ = 64 * 1024 * 1024
        self.concurrency_ = 100000
        self.is_multi_model_ = False  # for model ensemble

B
barrierye 已提交
475
    def set_max_concurrency(self, concurrency):
B
barrierye 已提交
476
        self.concurrency_ = concurrency
B
barrierye 已提交
477 478
        self.bserver_.set_max_concurrency(concurrency)

479 480 481
    def set_device(self, device="cpu"):
        self.device = device

B
barrierye 已提交
482
    def set_num_threads(self, threads):
B
barrierye 已提交
483
        self.worker_num_ = threads
B
barrierye 已提交
484 485 486 487
        self.bserver_.set_num_threads(threads)

    def set_max_body_size(self, body_size):
        self.bserver_.set_max_body_size(body_size)
B
barrierye 已提交
488 489 490 491 492 493
        if body_size >= self.body_size_:
            self.body_size_ = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )
B
barrierye 已提交
494

495 496 497
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

B
barrierye 已提交
498 499 500 501 502
    def set_port(self, port):
        self.gport_ = port

    def set_reload_interval(self, interval):
        self.bserver_.set_reload_interval(interval)
B
barrierye 已提交
503 504 505 506

    def set_op_sequence(self, op_seq):
        self.bserver_.set_op_sequence(op_seq)

B
barrierye 已提交
507 508
    def set_op_graph(self, op_graph):
        self.bserver_.set_op_graph(op_graph)
Z
zhangjun 已提交
509 510 511 512
    
    def use_mkl(self, flag):
        self.bserver_.use_mkl(flag)
    
B
barrierye 已提交
513 514 515 516 517
    def set_memory_optimize(self, flag=False):
        self.bserver_.set_memory_optimize(flag)

    def set_ir_optimize(self, flag=False):
        self.bserver_.set_ir_optimize(flag)
B
barrierye 已提交
518

B
barrierye 已提交
519 520 521
    def set_gpuid(self, gpuid=0):
        self.bserver_.set_gpuid(gpuid)

B
barrierye 已提交
522 523 524 525 526 527 528 529 530 531 532
    def load_model_config(self, server_config_paths, client_config_path=None):
        self.bserver_.load_model_config(server_config_paths)
        if client_config_path is None:
            if isinstance(server_config_paths, dict):
                self.is_multi_model_ = True
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    list(server_config_paths.items())[0][1])
            else:
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    server_config_paths)
        self.bclient_config_path_ = client_config_path
B
barrierye 已提交
533

M
MRXLT 已提交
534 535 536 537
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
H
HexToString 已提交
538
                       use_encryption_model=False,
M
MRXLT 已提交
539
                       cube_conf=None):
B
barrierye 已提交
540 541
        if not self._port_is_available(port):
            raise SystemExit("Prot {} is already used".format(port))
B
barrierye 已提交
542 543 544 545 546 547 548 549
        default_port = 12000
        self.port_list_ = []
        for i in range(1000):
            if default_port + i != port and self._port_is_available(default_port
                                                                    + i):
                self.port_list_.append(default_port + i)
                break
        self.bserver_.prepare_server(
M
MRXLT 已提交
550 551 552
            workdir=workdir,
            port=self.port_list_[0],
            device=device,
H
HexToString 已提交
553
            use_encryption_model=use_encryption_model,
M
MRXLT 已提交
554
            cube_conf=cube_conf)
B
barrierye 已提交
555
        self.set_port(port)
B
barrierye 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568 569

    def _launch_brpc_service(self, bserver):
        bserver.run_server()

    def _port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        return result != 0

    def run_server(self):
        p_bserver = Process(
            target=self._launch_brpc_service, args=(self.bserver_, ))
        p_bserver.start()
B
barrierye 已提交
570 571
        options = [('grpc.max_send_message_length', self.body_size_),
                   ('grpc.max_receive_message_length', self.body_size_)]
B
barrierye 已提交
572
        server = grpc.server(
B
barrierye 已提交
573 574 575
            futures.ThreadPoolExecutor(max_workers=self.worker_num_),
            options=options,
            maximum_concurrent_rpcs=self.concurrency_)
B
barrierye 已提交
576
        multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
B
barrierye 已提交
577
            MultiLangServerServiceServicer(
B
barrierye 已提交
578
                self.bclient_config_path_, self.is_multi_model_,
B
barrierye 已提交
579
                ["0.0.0.0:{}".format(self.port_list_[0])]), server)
B
barrierye 已提交
580 581 582
        server.add_insecure_port('[::]:{}'.format(self.gport_))
        server.start()
        p_bserver.join()
Z
zhangjun 已提交
583
        server.wait_for_termination()