server.py 22.5 KB
Newer Older
Z
update  
zhangjun 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
M
MRXLT 已提交
14 15 16

import os
import tarfile
M
MRXLT 已提交
17
import socket
Z
zhangjun 已提交
18
import paddle_serving_server as paddle_serving_server
Z
update  
zhangjun 已提交
19 20 21
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
22
import time
Z
update  
zhangjun 已提交
23
from .version import serving_server_version, version_suffix, device_type
M
MRXLT 已提交
24
from contextlib import closing
G
guru4elephant 已提交
25
import argparse
Z
zhangjun 已提交
26

J
Jiawei Wang 已提交
27
import sys
W
wangjiawei04 已提交
28 29
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
30
import shutil
Z
update  
zhangjun 已提交
31
import platform
B
barrierye 已提交
32 33
import numpy as np
import grpc
B
barrierye 已提交
34
import sys
Z
zhangjun 已提交
35

B
barrierye 已提交
36 37 38
from multiprocessing import Pool, Process
from concurrent import futures

Z
update  
zhangjun 已提交
39

M
MRXLT 已提交
40 41 42 43 44 45 46
class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
47
        self.ir_optimization = False
M
MRXLT 已提交
48 49 50 51 52 53
        self.model_conf = None
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
        self.general_model_config_fn = "general_model.prototxt"
W
wangjiawei04 已提交
54
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
55 56
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
57
        self.num_threads = 2
M
MRXLT 已提交
58 59
        self.port = 8080
        self.reload_interval_s = 10
M
MRXLT 已提交
60
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
61 62
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
63
        self.use_local_bin = False
Z
zhangjun 已提交
64
        self.mkl_flag = False
Z
zhangjun 已提交
65
        self.device = "cpu"
M
MRXLT 已提交
66
        self.gpuid = 0
M
add trt  
MRXLT 已提交
67
        self.use_trt = False
Z
zhangjun 已提交
68 69
        self.use_lite = False
        self.use_xpu = False
B
barrierye 已提交
70
        self.model_config_paths = None  # for multi-model in a workflow
71 72
        self.product_name = None
        self.container_id = None
M
MRXLT 已提交
73

B
fix cpu  
barriery 已提交
74 75 76 77
    def get_fetch_list(self):
        fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
        return fetch_names

M
MRXLT 已提交
78 79 80 81 82 83
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
84 85 86 87 88 89 90 91
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

92 93 94
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
95 96 97 98 99 100 101 102 103
    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
104 105 106
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
107 108 109
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
110 111 112
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

113 114 115 116 117 118 119 120 121 122
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
123 124 125 126
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
127

M
MRXLT 已提交
128
    def check_cuda(self):
M
MRXLT 已提交
129 130 131
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
132
            raise SystemExit(
M
MRXLT 已提交
133
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
134 135
            )

Z
zhangjun 已提交
136 137 138
    def set_device(self, device="cpu"):
        self.device = device

M
MRXLT 已提交
139 140 141
    def set_gpuid(self, gpuid=0):
        self.gpuid = gpuid

M
bug fix  
MRXLT 已提交
142
    def set_trt(self):
M
add trt  
MRXLT 已提交
143 144
        self.use_trt = True

Z
zhangjun 已提交
145 146 147 148 149 150
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

H
HexToString 已提交
151
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
M
MRXLT 已提交
152 153 154
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()

B
barrierye 已提交
155 156 157 158 159 160 161 162 163 164
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
            engine.reloadable_meta = self.workdir + "/fluid_time_file"
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
            engine.runtime_thread_num = 0
            engine.batch_infer_size = 0
            engine.enable_batch_align = 0
Z
update  
zhangjun 已提交
165
            engine.model_dir = model_config_path
B
barrierye 已提交
166
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
167
            engine.enable_ir_optimization = self.ir_optimization
M
add trt  
MRXLT 已提交
168
            engine.use_trt = self.use_trt
Z
update  
zhangjun 已提交
169 170 171 172 173 174 175 176
            engine.use_lite = self.use_lite
            engine.use_xpu = self.use_xpu
            if not os.path.exists('{}/__params__'.format(model_config_path)):
                engine.combined_model = True
            if use_encryption_model:
                engine.encrypted_model = True
            engine.type = "PaddleInfer"

B
barrierye 已提交
177
            self.model_toolkit_conf.engines.extend([engine])
M
MRXLT 已提交
178 179 180 181 182 183 184 185 186 187

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
188
    def _prepare_resource(self, workdir, cube_conf):
189
        self.workdir = workdir
M
MRXLT 已提交
190 191 192 193 194
        if self.resource_conf == None:
            with open("{}/{}".format(workdir, self.general_model_config_fn),
                      "w") as fout:
                fout.write(str(self.model_conf))
            self.resource_conf = server_sdk.ResourceConf()
W
wangjiawei04 已提交
195 196 197 198 199
            for workflow in self.workflow_conf.workflows:
                for node in workflow.nodes:
                    if "dist_kv" in node.name:
                        self.resource_conf.cube_config_path = workdir
                        self.resource_conf.cube_config_file = self.cube_config_fn
M
MRXLT 已提交
200 201 202 203 204
                        if cube_conf == None:
                            raise ValueError(
                                "Please set the path of cube.conf while use dist_kv op."
                            )
                        shutil.copy(cube_conf, workdir)
M
MRXLT 已提交
205 206 207 208
            self.resource_conf.model_toolkit_path = workdir
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
209 210 211 212
            if self.product_name != None:
                self.resource_conf.auth_product_name = self.product_name
            if self.container_id != None:
                self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
213 214 215 216 217

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

B
barrierye 已提交
218 219 220 221
    def load_model_config(self, model_config_paths):
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
222
        # of multiple models are the same.
B
barrierye 已提交
223 224
        workflow_oi_config_path = None
        if isinstance(model_config_paths, str):
B
barrierye 已提交
225
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
226
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
227 228 229
            # it from workflow_conf.
            default_engine_names = [
                'general_infer_0', 'general_dist_kv_infer_0',
B
barrierye 已提交
230
                'general_dist_kv_quant_infer_0'
B
barrierye 已提交
231 232
            ]
            engine_name = None
B
barrierye 已提交
233
            for node in self.workflow_conf.workflows[0].nodes:
B
barrierye 已提交
234 235 236 237 238 239 240 241 242
                if node.name in default_engine_names:
                    engine_name = node.name
                    break
            if engine_name is None:
                raise Exception(
                    "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                )
            self.model_config_paths = {engine_name: model_config_paths}
            workflow_oi_config_path = self.model_config_paths[engine_name]
B
barrierye 已提交
243 244 245 246 247 248 249 250
        elif isinstance(model_config_paths, dict):
            self.model_config_paths = {}
            for node_str, path in model_config_paths.items():
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
M
MRXLT 已提交
251 252
            workflow_oi_config_path = list(self.model_config_paths.items())[0][
                1]
B
barrierye 已提交
253 254 255 256 257
        else:
            raise Exception("The type of model_config_paths must be str or "
                            "dict({op: model_path}), not {}.".format(
                                type(model_config_paths)))

M
MRXLT 已提交
258
        self.model_conf = m_config.GeneralModelConfig()
B
barrierye 已提交
259 260 261
        f = open(
            "{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
            'r')
M
MRXLT 已提交
262 263 264 265
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
        # check config here
        # print config here
Z
update  
zhangjun 已提交
266

Z
zhangjun 已提交
267 268 269 270 271 272 273 274 275 276 277 278
    def use_mkl(self, flag):
        self.mkl_flag = flag

    def get_device_version(self):
        avx_flag = False
        mkl_flag = self.mkl_flag
        openblas_flag = False
        r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
        if r == 0:
            avx_flag = True
        if avx_flag:
            if mkl_flag:
Z
update  
zhangjun 已提交
279
                device_version = "cpu-avx-mkl"
Z
zhangjun 已提交
280
            else:
Z
update  
zhangjun 已提交
281
                device_version = "cpu-avx-openblas"
Z
zhangjun 已提交
282 283 284 285 286
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
Z
update  
zhangjun 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299
            device_version = "cpu-noavx-openblas"
        return device_version

    def get_serving_bin_name(self):
        if device_type == "0":
            device_version = self.get_device_version()
        elif device_type == "1":
            if version_suffix == "101" or version_suffix == "102":
                device_version = "gpu-" + version_suffix
            else:
                device_version = "gpu-cuda" + version_suffix
        elif device_type == "2":
            device_version = "xpu-" + platform.machine()
Z
zhangjun 已提交
300
        return device_version
M
MRXLT 已提交
301 302 303 304

    def download_bin(self):
        os.chdir(self.module_path)
        need_download = False
M
MRXLT 已提交
305 306 307

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
Z
update  
zhangjun 已提交
308 309 310 311

        tar_name = self.get_serving_bin_name() + ".tar.gz"
        bin_url = "https://paddle-serving.bj.bcebos.com/bin/serving-%s-%s.tar.gz" % (
            self.get_serving_bin_name(), serving_server_version)
312 313 314 315
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
316 317 318

        fcntl.flock(version_file, fcntl.LOCK_EX)

319 320 321 322 323
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
324
        if not os.path.exists(self.server_path):
325 326
            os.system("touch {}/{}.is_download".format(self.module_path,
                                                       folder_name))
M
MRXLT 已提交
327
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
328

M
MRXLT 已提交
329 330 331 332
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
333
                raise SystemExit(
T
TeslaZhao 已提交
334 335
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
336 337 338 339 340 341 342 343 344
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
                except:
                    if os.path.exists(exe_path):
                        os.remove(exe_path)
M
MRXLT 已提交
345
                    raise SystemExit(
T
TeslaZhao 已提交
346 347
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
348 349
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
350
        #release lock
B
barrierye 已提交
351
        version_file.close()
M
MRXLT 已提交
352 353
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
update  
zhangjun 已提交
354

M
MRXLT 已提交
355 356 357 358
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
W
wangjiawei04 已提交
359
                       use_encryption_model=False,
M
MRXLT 已提交
360
                       cube_conf=None):
M
MRXLT 已提交
361 362 363 364 365 366 367
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
        else:
            os.system("mkdir {}".format(workdir))
        os.system("touch {}/fluid_time_file".format(workdir))

M
MRXLT 已提交
368
        if not self.port_is_available(port):
G
gongweibao 已提交
369
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
370

G
guru4elephant 已提交
371
        self.set_port(port)
M
MRXLT 已提交
372
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
373 374
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

M
MRXLT 已提交
388
    def port_is_available(self, port):
M
MRXLT 已提交
389 390
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
391
            result = sock.connect_ex(('0.0.0.0', port))
M
MRXLT 已提交
392 393 394 395 396
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
397 398 399
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
400
        self.check_local_bin()
M
MRXLT 已提交
401 402
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
403 404 405
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
406 407
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
408
        #self.check_cuda()
Z
zhangjun 已提交
409 410
        # Todo: merge CPU and GPU code, remove device to model_toolkit
        if self.device == "cpu" or self.device == "arm":
Z
zhangjun 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.max_body_size)
        else:
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-gpuid {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.gpuid,
                          self.max_body_size)
M
MRXLT 已提交
468 469
        print("Going to Run Comand")
        print(command)
470

M
MRXLT 已提交
471
        os.system(command)
B
barrierye 已提交
472

Z
update  
zhangjun 已提交
473

B
barrierye 已提交
474
class MultiLangServer(object):
B
barrierye 已提交
475
    def __init__(self):
B
barrierye 已提交
476
        self.bserver_ = Server()
B
barrierye 已提交
477 478 479 480 481
        self.worker_num_ = 4
        self.body_size_ = 64 * 1024 * 1024
        self.concurrency_ = 100000
        self.is_multi_model_ = False  # for model ensemble

B
barrierye 已提交
482
    def set_max_concurrency(self, concurrency):
B
barrierye 已提交
483
        self.concurrency_ = concurrency
B
barrierye 已提交
484 485
        self.bserver_.set_max_concurrency(concurrency)

486 487 488
    def set_device(self, device="cpu"):
        self.device = device

B
barrierye 已提交
489
    def set_num_threads(self, threads):
B
barrierye 已提交
490
        self.worker_num_ = threads
B
barrierye 已提交
491 492 493 494
        self.bserver_.set_num_threads(threads)

    def set_max_body_size(self, body_size):
        self.bserver_.set_max_body_size(body_size)
B
barrierye 已提交
495 496 497 498 499 500
        if body_size >= self.body_size_:
            self.body_size_ = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )
B
barrierye 已提交
501

502 503 504
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

B
barrierye 已提交
505 506 507 508 509
    def set_port(self, port):
        self.gport_ = port

    def set_reload_interval(self, interval):
        self.bserver_.set_reload_interval(interval)
B
barrierye 已提交
510 511 512 513

    def set_op_sequence(self, op_seq):
        self.bserver_.set_op_sequence(op_seq)

B
barrierye 已提交
514 515
    def set_op_graph(self, op_graph):
        self.bserver_.set_op_graph(op_graph)
Z
update  
zhangjun 已提交
516

Z
zhangjun 已提交
517 518
    def use_mkl(self, flag):
        self.bserver_.use_mkl(flag)
Z
update  
zhangjun 已提交
519

B
barrierye 已提交
520 521 522 523 524
    def set_memory_optimize(self, flag=False):
        self.bserver_.set_memory_optimize(flag)

    def set_ir_optimize(self, flag=False):
        self.bserver_.set_ir_optimize(flag)
B
barrierye 已提交
525

B
barrierye 已提交
526 527 528
    def set_gpuid(self, gpuid=0):
        self.bserver_.set_gpuid(gpuid)

B
barrierye 已提交
529 530 531 532 533 534 535 536 537 538 539
    def load_model_config(self, server_config_paths, client_config_path=None):
        self.bserver_.load_model_config(server_config_paths)
        if client_config_path is None:
            if isinstance(server_config_paths, dict):
                self.is_multi_model_ = True
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    list(server_config_paths.items())[0][1])
            else:
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    server_config_paths)
        self.bclient_config_path_ = client_config_path
B
barrierye 已提交
540

M
MRXLT 已提交
541 542 543 544
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
H
HexToString 已提交
545
                       use_encryption_model=False,
M
MRXLT 已提交
546
                       cube_conf=None):
B
barrierye 已提交
547 548
        if not self._port_is_available(port):
            raise SystemExit("Prot {} is already used".format(port))
B
barrierye 已提交
549 550 551 552 553 554 555 556
        default_port = 12000
        self.port_list_ = []
        for i in range(1000):
            if default_port + i != port and self._port_is_available(default_port
                                                                    + i):
                self.port_list_.append(default_port + i)
                break
        self.bserver_.prepare_server(
M
MRXLT 已提交
557 558 559
            workdir=workdir,
            port=self.port_list_[0],
            device=device,
H
HexToString 已提交
560
            use_encryption_model=use_encryption_model,
M
MRXLT 已提交
561
            cube_conf=cube_conf)
B
barrierye 已提交
562
        self.set_port(port)
B
barrierye 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575 576

    def _launch_brpc_service(self, bserver):
        bserver.run_server()

    def _port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        return result != 0

    def run_server(self):
        p_bserver = Process(
            target=self._launch_brpc_service, args=(self.bserver_, ))
        p_bserver.start()
B
barrierye 已提交
577 578
        options = [('grpc.max_send_message_length', self.body_size_),
                   ('grpc.max_receive_message_length', self.body_size_)]
B
barrierye 已提交
579
        server = grpc.server(
B
barrierye 已提交
580 581 582
            futures.ThreadPoolExecutor(max_workers=self.worker_num_),
            options=options,
            maximum_concurrent_rpcs=self.concurrency_)
B
barrierye 已提交
583
        multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
B
barrierye 已提交
584
            MultiLangServerServiceServicer(
B
barrierye 已提交
585
                self.bclient_config_path_, self.is_multi_model_,
B
barrierye 已提交
586
                ["0.0.0.0:{}".format(self.port_list_[0])]), server)
B
barrierye 已提交
587 588 589
        server.add_insecure_port('[::]:{}'.format(self.gport_))
        server.start()
        p_bserver.join()
Z
update  
zhangjun 已提交
590
        server.wait_for_termination()