server.py 22.6 KB
Newer Older
Z
update  
zhangjun 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
M
MRXLT 已提交
14 15 16

import os
import tarfile
M
MRXLT 已提交
17
import socket
Z
zhangjun 已提交
18
import paddle_serving_server as paddle_serving_server
Z
update  
zhangjun 已提交
19 20 21
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
22
import time
Z
update  
zhangjun 已提交
23
from .version import serving_server_version, version_suffix, device_type
M
MRXLT 已提交
24
from contextlib import closing
G
guru4elephant 已提交
25
import argparse
Z
zhangjun 已提交
26

J
Jiawei Wang 已提交
27
import sys
W
wangjiawei04 已提交
28 29
if sys.platform.startswith('win') is False:
    import fcntl
M
MRXLT 已提交
30
import shutil
Z
update  
zhangjun 已提交
31
import platform
B
barrierye 已提交
32 33
import numpy as np
import grpc
B
barrierye 已提交
34
import sys
Z
zhangjun 已提交
35

B
barrierye 已提交
36 37 38
from multiprocessing import Pool, Process
from concurrent import futures

Z
update  
zhangjun 已提交
39

M
MRXLT 已提交
40 41 42 43 44 45 46
class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
        self.resource_conf = None
        self.memory_optimization = False
M
MRXLT 已提交
47
        self.ir_optimization = False
M
MRXLT 已提交
48 49 50 51 52 53
        self.model_conf = None
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
        self.general_model_config_fn = "general_model.prototxt"
W
wangjiawei04 已提交
54
        self.cube_config_fn = "cube.conf"
M
MRXLT 已提交
55 56
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
57
        self.num_threads = 2
M
MRXLT 已提交
58 59
        self.port = 8080
        self.reload_interval_s = 10
M
MRXLT 已提交
60
        self.max_body_size = 64 * 1024 * 1024
M
MRXLT 已提交
61 62
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
63
        self.use_local_bin = False
Z
zhangjun 已提交
64
        self.mkl_flag = False
Z
zhangjun 已提交
65
        self.device = "cpu"
M
MRXLT 已提交
66
        self.gpuid = 0
M
add trt  
MRXLT 已提交
67
        self.use_trt = False
Z
zhangjun 已提交
68 69
        self.use_lite = False
        self.use_xpu = False
B
barrierye 已提交
70
        self.model_config_paths = None  # for multi-model in a workflow
71 72
        self.product_name = None
        self.container_id = None
M
MRXLT 已提交
73

B
fix cpu  
barriery 已提交
74 75 76 77
    def get_fetch_list(self):
        fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
        return fetch_names

M
MRXLT 已提交
78 79 80 81 82 83
    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

M
MRXLT 已提交
84 85 86 87 88 89 90 91
    def set_max_body_size(self, body_size):
        if body_size >= self.max_body_size:
            self.max_body_size = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )

92 93 94
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

M
MRXLT 已提交
95 96 97 98 99 100 101 102 103
    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

B
barrierye 已提交
104 105 106
    def set_op_graph(self, op_graph):
        self.workflow_conf = op_graph

M
MRXLT 已提交
107 108 109
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
110 111 112
    def set_ir_optimize(self, flag=False):
        self.ir_optimization = flag

113 114 115 116 117 118 119 120 121 122
    def set_product_name(self, product_name=None):
        if product_name == None:
            raise ValueError("product_name can't be None.")
        self.product_name = product_name

    def set_container_id(self, container_id):
        if container_id == None:
            raise ValueError("container_id can't be None.")
        self.container_id = container_id

M
MRXLT 已提交
123 124 125 126
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
127

M
MRXLT 已提交
128
    def check_cuda(self):
M
MRXLT 已提交
129 130 131
        if os.system("ls /dev/ | grep nvidia > /dev/null") == 0:
            pass
        else:
M
MRXLT 已提交
132
            raise SystemExit(
M
MRXLT 已提交
133
                "GPU not found, please check your environment or use cpu version by \"pip install paddle_serving_server\""
M
MRXLT 已提交
134 135
            )

Z
zhangjun 已提交
136 137 138
    def set_device(self, device="cpu"):
        self.device = device

M
MRXLT 已提交
139 140 141
    def set_gpuid(self, gpuid=0):
        self.gpuid = gpuid

M
bug fix  
MRXLT 已提交
142
    def set_trt(self):
M
add trt  
MRXLT 已提交
143 144
        self.use_trt = True

Z
zhangjun 已提交
145 146 147 148 149 150
    def set_lite(self):
        self.use_lite = True

    def set_xpu(self):
        self.use_xpu = True

H
HexToString 已提交
151
    def _prepare_engine(self, model_config_paths, device, use_encryption_model):
M
MRXLT 已提交
152 153 154
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()

B
barrierye 已提交
155 156 157 158 159 160 161 162 163 164
        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            # engine.reloadable_meta = model_config_path + "/fluid_time_file"
            engine.reloadable_meta = self.workdir + "/fluid_time_file"
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
            engine.runtime_thread_num = 0
            engine.batch_infer_size = 0
            engine.enable_batch_align = 0
Z
update  
zhangjun 已提交
165
            engine.model_dir = model_config_path
B
barrierye 已提交
166
            engine.enable_memory_optimization = self.memory_optimization
M
MRXLT 已提交
167
            engine.enable_ir_optimization = self.ir_optimization
M
add trt  
MRXLT 已提交
168
            engine.use_trt = self.use_trt
Z
update  
zhangjun 已提交
169 170
            engine.use_lite = self.use_lite
            engine.use_xpu = self.use_xpu
Z
fix  
zhangjun 已提交
171
            if os.path.exists('{}/__params__'.format(model_config_path)):
Z
update  
zhangjun 已提交
172
                engine.combined_model = True
Z
fix  
zhangjun 已提交
173 174
            else:
                engine.combined_model = False
Z
update  
zhangjun 已提交
175 176
            if use_encryption_model:
                engine.encrypted_model = True
Z
fix  
zhangjun 已提交
177
            engine.type = "PADDLE_INFER"
Z
update  
zhangjun 已提交
178

B
barrierye 已提交
179
            self.model_toolkit_conf.engines.extend([engine])
M
MRXLT 已提交
180 181 182 183 184 185 186 187 188 189

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

M
MRXLT 已提交
190
    def _prepare_resource(self, workdir, cube_conf):
191
        self.workdir = workdir
M
MRXLT 已提交
192 193 194 195 196
        if self.resource_conf == None:
            with open("{}/{}".format(workdir, self.general_model_config_fn),
                      "w") as fout:
                fout.write(str(self.model_conf))
            self.resource_conf = server_sdk.ResourceConf()
W
wangjiawei04 已提交
197 198 199 200 201
            for workflow in self.workflow_conf.workflows:
                for node in workflow.nodes:
                    if "dist_kv" in node.name:
                        self.resource_conf.cube_config_path = workdir
                        self.resource_conf.cube_config_file = self.cube_config_fn
M
MRXLT 已提交
202 203 204 205 206
                        if cube_conf == None:
                            raise ValueError(
                                "Please set the path of cube.conf while use dist_kv op."
                            )
                        shutil.copy(cube_conf, workdir)
M
MRXLT 已提交
207 208 209 210
            self.resource_conf.model_toolkit_path = workdir
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
211 212 213 214
            if self.product_name != None:
                self.resource_conf.auth_product_name = self.product_name
            if self.container_id != None:
                self.resource_conf.auth_container_id = self.container_id
M
MRXLT 已提交
215 216 217 218 219

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

B
barrierye 已提交
220 221 222 223
    def load_model_config(self, model_config_paths):
        # At present, Serving needs to configure the model path in
        # the resource.prototxt file to determine the input and output
        # format of the workflow. To ensure that the input and output
B
barrierye 已提交
224
        # of multiple models are the same.
B
barrierye 已提交
225 226
        workflow_oi_config_path = None
        if isinstance(model_config_paths, str):
B
barrierye 已提交
227
            # If there is only one model path, use the default infer_op.
M
MRXLT 已提交
228
            # Because there are several infer_op type, we need to find
B
barrierye 已提交
229 230 231
            # it from workflow_conf.
            default_engine_names = [
                'general_infer_0', 'general_dist_kv_infer_0',
B
barrierye 已提交
232
                'general_dist_kv_quant_infer_0'
B
barrierye 已提交
233 234
            ]
            engine_name = None
B
barrierye 已提交
235
            for node in self.workflow_conf.workflows[0].nodes:
B
barrierye 已提交
236 237 238 239 240 241 242 243 244
                if node.name in default_engine_names:
                    engine_name = node.name
                    break
            if engine_name is None:
                raise Exception(
                    "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
                )
            self.model_config_paths = {engine_name: model_config_paths}
            workflow_oi_config_path = self.model_config_paths[engine_name]
B
barrierye 已提交
245 246 247 248 249 250 251 252
        elif isinstance(model_config_paths, dict):
            self.model_config_paths = {}
            for node_str, path in model_config_paths.items():
                node = server_sdk.DAGNode()
                google.protobuf.text_format.Parse(node_str, node)
                self.model_config_paths[node.name] = path
            print("You have specified multiple model paths, please ensure "
                  "that the input and output of multiple models are the same.")
M
MRXLT 已提交
253 254
            workflow_oi_config_path = list(self.model_config_paths.items())[0][
                1]
B
barrierye 已提交
255 256 257 258 259
        else:
            raise Exception("The type of model_config_paths must be str or "
                            "dict({op: model_path}), not {}.".format(
                                type(model_config_paths)))

M
MRXLT 已提交
260
        self.model_conf = m_config.GeneralModelConfig()
B
barrierye 已提交
261 262 263
        f = open(
            "{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
            'r')
M
MRXLT 已提交
264 265 266 267
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
        # check config here
        # print config here
Z
update  
zhangjun 已提交
268

Z
zhangjun 已提交
269 270 271 272 273 274 275 276 277 278 279 280
    def use_mkl(self, flag):
        self.mkl_flag = flag

    def get_device_version(self):
        avx_flag = False
        mkl_flag = self.mkl_flag
        openblas_flag = False
        r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
        if r == 0:
            avx_flag = True
        if avx_flag:
            if mkl_flag:
Z
update  
zhangjun 已提交
281
                device_version = "cpu-avx-mkl"
Z
zhangjun 已提交
282
            else:
Z
update  
zhangjun 已提交
283
                device_version = "cpu-avx-openblas"
Z
zhangjun 已提交
284 285 286 287 288
        else:
            if mkl_flag:
                print(
                    "Your CPU does not support AVX, server will running with noavx-openblas mode."
                )
Z
update  
zhangjun 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301
            device_version = "cpu-noavx-openblas"
        return device_version

    def get_serving_bin_name(self):
        if device_type == "0":
            device_version = self.get_device_version()
        elif device_type == "1":
            if version_suffix == "101" or version_suffix == "102":
                device_version = "gpu-" + version_suffix
            else:
                device_version = "gpu-cuda" + version_suffix
        elif device_type == "2":
            device_version = "xpu-" + platform.machine()
Z
zhangjun 已提交
302
        return device_version
M
MRXLT 已提交
303 304 305 306

    def download_bin(self):
        os.chdir(self.module_path)
        need_download = False
M
MRXLT 已提交
307 308 309

        #acquire lock
        version_file = open("{}/version.py".format(self.module_path), "r")
Z
update  
zhangjun 已提交
310

Z
fix  
zhangjun 已提交
311 312 313 314 315
        folder_name = "serving-%s-%s" % (self.get_serving_bin_name(),
                                         serving_server_version)
        tar_name = "%s.tar.gz" % folder_name
        bin_url = "https://paddle-serving.bj.bcebos.com/bin/%s" % tar_name

316 317 318 319
        self.server_path = os.path.join(self.module_path, folder_name)

        download_flag = "{}/{}.is_download".format(self.module_path,
                                                   folder_name)
M
MRXLT 已提交
320 321 322

        fcntl.flock(version_file, fcntl.LOCK_EX)

323 324 325 326 327
        if os.path.exists(download_flag):
            os.chdir(self.cur_path)
            self.bin_path = self.server_path + "/serving"
            return

M
MRXLT 已提交
328
        if not os.path.exists(self.server_path):
329 330
            os.system("touch {}/{}.is_download".format(self.module_path,
                                                       folder_name))
M
MRXLT 已提交
331
            print('Frist time run, downloading PaddleServing components ...')
M
MRXLT 已提交
332

M
MRXLT 已提交
333 334 335 336
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
337
                raise SystemExit(
T
TeslaZhao 已提交
338 339
                    'Download failed, please check your network or permission of {}.'
                    .format(self.module_path))
M
MRXLT 已提交
340 341 342 343 344 345 346 347 348
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
                except:
                    if os.path.exists(exe_path):
                        os.remove(exe_path)
M
MRXLT 已提交
349
                    raise SystemExit(
T
TeslaZhao 已提交
350 351
                        'Decompressing failed, please check your permission of {} or disk space left.'
                        .format(self.module_path))
M
MRXLT 已提交
352 353
                finally:
                    os.remove(tar_name)
M
MRXLT 已提交
354
        #release lock
B
barrierye 已提交
355
        version_file.close()
M
MRXLT 已提交
356 357
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"
Z
update  
zhangjun 已提交
358

M
MRXLT 已提交
359 360 361 362
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
W
wangjiawei04 已提交
363
                       use_encryption_model=False,
M
MRXLT 已提交
364
                       cube_conf=None):
M
MRXLT 已提交
365 366 367 368 369 370 371
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
        else:
            os.system("mkdir {}".format(workdir))
        os.system("touch {}/fluid_time_file".format(workdir))

M
MRXLT 已提交
372
        if not self.port_is_available(port):
G
gongweibao 已提交
373
            raise SystemExit("Port {} is already used".format(port))
M
MRXLT 已提交
374

G
guru4elephant 已提交
375
        self.set_port(port)
M
MRXLT 已提交
376
        self._prepare_resource(workdir, cube_conf)
H
HexToString 已提交
377 378
        self._prepare_engine(self.model_config_paths, device,
                             use_encryption_model)
M
MRXLT 已提交
379 380 381 382 383 384 385 386 387 388 389 390 391
        self._prepare_infer_service(port)
        self.workdir = workdir

        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

M
MRXLT 已提交
392
    def port_is_available(self, port):
M
MRXLT 已提交
393 394
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
395
            result = sock.connect_ex(('0.0.0.0', port))
M
MRXLT 已提交
396 397 398 399 400
        if result != 0:
            return True
        else:
            return False

M
MRXLT 已提交
401 402 403
    def run_server(self):
        # just run server with system command
        # currently we do not load cube
M
MRXLT 已提交
404
        self.check_local_bin()
M
MRXLT 已提交
405 406
        if not self.use_local_bin:
            self.download_bin()
B
fix bug  
barrierye 已提交
407 408 409
            # wait for other process to download server bin
            while not os.path.exists(self.server_path):
                time.sleep(1)
M
MRXLT 已提交
410 411
        else:
            print("Use local bin : {}".format(self.bin_path))
Z
zhangjun 已提交
412
        #self.check_cuda()
Z
zhangjun 已提交
413 414
        # Todo: merge CPU and GPU code, remove device to model_toolkit
        if self.device == "cpu" or self.device == "arm":
Z
zhangjun 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.max_body_size)
        else:
            command = "{} " \
                      "-enable_model_toolkit " \
                      "-inferservice_path {} " \
                      "-inferservice_file {} " \
                      "-max_concurrency {} " \
                      "-num_threads {} " \
                      "-port {} " \
                      "-reload_interval_s {} " \
                      "-resource_path {} " \
                      "-resource_file {} " \
                      "-workflow_path {} " \
                      "-workflow_file {} " \
                      "-bthread_concurrency {} " \
                      "-gpuid {} " \
                      "-max_body_size {} ".format(
                          self.bin_path,
                          self.workdir,
                          self.infer_service_fn,
                          self.max_concurrency,
                          self.num_threads,
                          self.port,
                          self.reload_interval_s,
                          self.workdir,
                          self.resource_fn,
                          self.workdir,
                          self.workflow_fn,
                          self.num_threads,
                          self.gpuid,
                          self.max_body_size)
M
MRXLT 已提交
472 473
        print("Going to Run Comand")
        print(command)
474

M
MRXLT 已提交
475
        os.system(command)
B
barrierye 已提交
476

Z
update  
zhangjun 已提交
477

B
barrierye 已提交
478
class MultiLangServer(object):
B
barrierye 已提交
479
    def __init__(self):
B
barrierye 已提交
480
        self.bserver_ = Server()
B
barrierye 已提交
481 482 483 484 485
        self.worker_num_ = 4
        self.body_size_ = 64 * 1024 * 1024
        self.concurrency_ = 100000
        self.is_multi_model_ = False  # for model ensemble

B
barrierye 已提交
486
    def set_max_concurrency(self, concurrency):
B
barrierye 已提交
487
        self.concurrency_ = concurrency
B
barrierye 已提交
488 489
        self.bserver_.set_max_concurrency(concurrency)

490 491 492
    def set_device(self, device="cpu"):
        self.device = device

B
barrierye 已提交
493
    def set_num_threads(self, threads):
B
barrierye 已提交
494
        self.worker_num_ = threads
B
barrierye 已提交
495 496 497 498
        self.bserver_.set_num_threads(threads)

    def set_max_body_size(self, body_size):
        self.bserver_.set_max_body_size(body_size)
B
barrierye 已提交
499 500 501 502 503 504
        if body_size >= self.body_size_:
            self.body_size_ = body_size
        else:
            print(
                "max_body_size is less than default value, will use default value in service."
            )
B
barrierye 已提交
505

506 507 508
    def use_encryption_model(self, flag=False):
        self.encryption_model = flag

B
barrierye 已提交
509 510 511 512 513
    def set_port(self, port):
        self.gport_ = port

    def set_reload_interval(self, interval):
        self.bserver_.set_reload_interval(interval)
B
barrierye 已提交
514 515 516 517

    def set_op_sequence(self, op_seq):
        self.bserver_.set_op_sequence(op_seq)

B
barrierye 已提交
518 519
    def set_op_graph(self, op_graph):
        self.bserver_.set_op_graph(op_graph)
Z
update  
zhangjun 已提交
520

Z
zhangjun 已提交
521 522
    def use_mkl(self, flag):
        self.bserver_.use_mkl(flag)
Z
update  
zhangjun 已提交
523

B
barrierye 已提交
524 525 526 527 528
    def set_memory_optimize(self, flag=False):
        self.bserver_.set_memory_optimize(flag)

    def set_ir_optimize(self, flag=False):
        self.bserver_.set_ir_optimize(flag)
B
barrierye 已提交
529

B
barrierye 已提交
530 531 532
    def set_gpuid(self, gpuid=0):
        self.bserver_.set_gpuid(gpuid)

B
barrierye 已提交
533 534 535 536 537 538 539 540 541 542 543
    def load_model_config(self, server_config_paths, client_config_path=None):
        self.bserver_.load_model_config(server_config_paths)
        if client_config_path is None:
            if isinstance(server_config_paths, dict):
                self.is_multi_model_ = True
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    list(server_config_paths.items())[0][1])
            else:
                client_config_path = '{}/serving_server_conf.prototxt'.format(
                    server_config_paths)
        self.bclient_config_path_ = client_config_path
B
barrierye 已提交
544

M
MRXLT 已提交
545 546 547 548
    def prepare_server(self,
                       workdir=None,
                       port=9292,
                       device="cpu",
H
HexToString 已提交
549
                       use_encryption_model=False,
M
MRXLT 已提交
550
                       cube_conf=None):
B
barrierye 已提交
551 552
        if not self._port_is_available(port):
            raise SystemExit("Prot {} is already used".format(port))
B
barrierye 已提交
553 554 555 556 557 558 559 560
        default_port = 12000
        self.port_list_ = []
        for i in range(1000):
            if default_port + i != port and self._port_is_available(default_port
                                                                    + i):
                self.port_list_.append(default_port + i)
                break
        self.bserver_.prepare_server(
M
MRXLT 已提交
561 562 563
            workdir=workdir,
            port=self.port_list_[0],
            device=device,
H
HexToString 已提交
564
            use_encryption_model=use_encryption_model,
M
MRXLT 已提交
565
            cube_conf=cube_conf)
B
barrierye 已提交
566
        self.set_port(port)
B
barrierye 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579 580

    def _launch_brpc_service(self, bserver):
        bserver.run_server()

    def _port_is_available(self, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
            result = sock.connect_ex(('0.0.0.0', port))
        return result != 0

    def run_server(self):
        p_bserver = Process(
            target=self._launch_brpc_service, args=(self.bserver_, ))
        p_bserver.start()
B
barrierye 已提交
581 582
        options = [('grpc.max_send_message_length', self.body_size_),
                   ('grpc.max_receive_message_length', self.body_size_)]
B
barrierye 已提交
583
        server = grpc.server(
B
barrierye 已提交
584 585 586
            futures.ThreadPoolExecutor(max_workers=self.worker_num_),
            options=options,
            maximum_concurrent_rpcs=self.concurrency_)
B
barrierye 已提交
587
        multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
B
barrierye 已提交
588
            MultiLangServerServiceServicer(
B
barrierye 已提交
589
                self.bclient_config_path_, self.is_multi_model_,
B
barrierye 已提交
590
                ["0.0.0.0:{}".format(self.port_list_[0])]), server)
B
barrierye 已提交
591 592 593
        server.add_insecure_port('[::]:{}'.format(self.gport_))
        server.start()
        p_bserver.join()
Z
update  
zhangjun 已提交
594
        server.wait_for_termination()