__init__.py 12.7 KB
Newer Older
G
guru4elephant 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

G
guru4elephant 已提交
15
import os
16 17 18
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
M
MRXLT 已提交
19
import tarfile
M
MRXLT 已提交
20
import socket
M
MRXLT 已提交
21
import paddle_serving_server as paddle_serving_server
22
from .version import serving_server_version
M
MRXLT 已提交
23
from contextlib import closing
M
MRXLT 已提交
24

G
guru4elephant 已提交
25 26 27

class OpMaker(object):
    def __init__(self):
28
        self.op_dict = {
M
MRXLT 已提交
29 30 31 32 33 34
            "general_infer": "GeneralInferOp",
            "general_reader": "GeneralReaderOp",
            "general_response": "GeneralResponseOp",
            "general_text_reader": "GeneralTextReaderOp",
            "general_text_response": "GeneralTextResponseOp",
            "general_single_kv": "GeneralSingleKVOp",
W
wangjiawei04 已提交
35
            "general_dist_kv_infer": "GeneralDistKVInferOp",
W
wangjiawei04 已提交
36
            "general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
G
guru4elephant 已提交
37
            "general_copy": "GeneralCopyOp"
38
        }
G
guru4elephant 已提交
39

G
guru4elephant 已提交
40 41
    # currently, inputs and outputs are not used
    # when we have OpGraphMaker, inputs and outputs are necessary
B
barrierye 已提交
42 43 44 45
    def create(self, node_type, node_name=None, inputs=[], outputs=[]):
        if node_type not in self.op_dict:
            raise Exception("Op type {} is not supported right now".format(
                node_type))
G
guru4elephant 已提交
46
        node = server_sdk.DAGNode()
B
barrierye 已提交
47 48 49
        node.name = node_name if node_name is not None else "{}_op".format(
            node_type)
        node.type = self.op_dict[node_type]
G
guru4elephant 已提交
50 51
        return node

M
MRXLT 已提交
52

G
guru4elephant 已提交
53 54 55 56 57 58
class OpSeqMaker(object):
    def __init__(self):
        self.workflow = server_sdk.Workflow()
        self.workflow.name = "workflow1"
        self.workflow.workflow_type = "Sequence"

B
barrierye 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71
    def add_op(self, node, dependent_nodes=None):
        if dependent_nodes is None:
            if len(self.workflow.nodes) >= 1:
                dep = server_sdk.DAGNodeDependency()
                dep.name = self.workflow.nodes[-1].name
                dep.mode = "RO"
                node.dependencies.extend([dep])
        else:
            for dep_node in dependent_nodes:
                dep = server_sdk.DAGNodeDependency()
                dep.name = dep_node.name
                dep.mode = "RO"
                node.dependencies.extend([dep])
G
guru4elephant 已提交
72 73 74 75 76 77 78
        self.workflow.nodes.extend([node])

    def get_op_sequence(self):
        workflow_conf = server_sdk.WorkflowConf()
        workflow_conf.workflows.extend([self.workflow])
        return workflow_conf

M
MRXLT 已提交
79

G
guru4elephant 已提交
80 81 82 83 84
class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
G
guru4elephant 已提交
85
        self.resource_conf = None
86 87
        self.memory_optimization = False
        self.model_conf = None
G
guru4elephant 已提交
88 89 90 91
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
92
        self.general_model_config_fn = "general_model.prototxt"
W
wangjiawei04 已提交
93
        self.cube_config_fn = "cube.conf"
G
guru4elephant 已提交
94 95
        self.workdir = ""
        self.max_concurrency = 0
M
MRXLT 已提交
96
        self.num_threads = 4
G
guru4elephant 已提交
97 98
        self.port = 8080
        self.reload_interval_s = 10
M
MRXLT 已提交
99 100
        self.module_path = os.path.dirname(paddle_serving_server.__file__)
        self.cur_path = os.getcwd()
M
MRXLT 已提交
101
        self.use_local_bin = False
M
MRXLT 已提交
102
        self.mkl_flag = False
B
barrierye 已提交
103
        self.model_config_paths = None
G
guru4elephant 已提交
104 105 106 107 108 109 110 111 112 113 114 115

    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval
G
guru4elephant 已提交
116 117 118 119

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

120 121 122
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

M
MRXLT 已提交
123 124 125 126
    def check_local_bin(self):
        if "SERVING_BIN" in os.environ:
            self.use_local_bin = True
            self.bin_path = os.environ["SERVING_BIN"]
M
MRXLT 已提交
127

B
barrierye 已提交
128
    def _prepare_engine(self, model_config_paths, device):
G
guru4elephant 已提交
129 130
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()
131

B
barrierye 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
        if isinstance(model_config_paths, str):
            model_config_paths = {"general_infer_op": model_config_paths}
        elif not isinstance(model_config_paths, dict):
            raise Exception("model_config_paths can not be {}".format(
                type(model_config_paths)))

        for engine_name, model_config_path in model_config_paths.items():
            engine = server_sdk.EngineDesc()
            engine.name = engine_name
            engine.reloadable_meta = model_config_path + "/fluid_time_file"
            os.system("touch {}".format(engine.reloadable_meta))
            engine.reloadable_type = "timestamp_ne"
            engine.runtime_thread_num = 0
            engine.batch_infer_size = 0
            engine.enable_batch_align = 0
            engine.model_data_path = model_config_path
            engine.enable_memory_optimization = self.memory_optimization
            engine.static_optimization = False
            engine.force_update_static_cache = False

            if device == "cpu":
                engine.type = "FLUID_CPU_ANALYSIS_DIR"
            elif device == "gpu":
                engine.type = "FLUID_GPU_ANALYSIS_DIR"

            self.model_toolkit_conf.engines.extend([engine])
G
guru4elephant 已提交
158 159 160 161 162 163 164 165 166 167 168 169

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

    def _prepare_resource(self, workdir):
        if self.resource_conf == None:
M
MRXLT 已提交
170 171
            with open("{}/{}".format(workdir, self.general_model_config_fn),
                      "w") as fout:
172
                fout.write(str(self.model_conf))
G
guru4elephant 已提交
173
            self.resource_conf = server_sdk.ResourceConf()
W
wangjiawei04 已提交
174 175 176 177 178
            for workflow in self.workflow_conf.workflows:
                for node in workflow.nodes:
                    if "dist_kv" in node.name:
                        self.resource_conf.cube_config_path = workdir
                        self.resource_conf.cube_config_file = self.cube_config_fn
W
wangjiawei04 已提交
179 180
                        if "quant" in node.name:
                            self.resource_conf.cube_quant_bits = 8
G
guru4elephant 已提交
181
            self.resource_conf.model_toolkit_path = workdir
G
guru4elephant 已提交
182
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
183 184
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
G
guru4elephant 已提交
185 186 187 188 189

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

B
barrierye 已提交
190 191 192
    def load_model_config(self, model_config_paths):
        self.model_config_paths = model_config_paths
        path = model_config_paths.items()[0][1]
G
guru4elephant 已提交
193
        self.model_config_path = path
194 195 196 197
        self.model_conf = m_config.GeneralModelConfig()
        f = open("{}/serving_server_conf.prototxt".format(path), 'r')
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
G
guru4elephant 已提交
198
        # check config here
199
        # print config here
G
guru4elephant 已提交
200

M
MRXLT 已提交
201 202 203
    def use_mkl(self):
        self.mkl_flag = True

M
MRXLT 已提交
204 205
    def get_device_version(self):
        avx_flag = False
M
MRXLT 已提交
206
        mkl_flag = self.mkl_flag
M
MRXLT 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
        openblas_flag = False
        r = os.system("cat /proc/cpuinfo | grep avx > /dev/null 2>&1")
        if r == 0:
            avx_flag = True
        if avx_flag:
            if mkl_flag:
                device_version = "serving-cpu-avx-mkl-"
            else:
                device_version = "serving-cpu-avx-openblas-"
        else:
            device_version = "serving-cpu-noavx-openblas-"
        return device_version

    def download_bin(self):
        os.chdir(self.module_path)
        need_download = False
        device_version = self.get_device_version()
        floder_name = device_version + serving_server_version
        tar_name = floder_name + ".tar.gz"
        bin_url = "https://paddle-serving.bj.bcebos.com/bin/" + tar_name
        self.server_path = os.path.join(self.module_path, floder_name)
228

M
MRXLT 已提交
229 230 231 232 233 234
        if not os.path.exists(self.server_path):
            print('Frist time run, downloading PaddleServing components ...')
            r = os.system('wget ' + bin_url + ' --no-check-certificate')
            if r != 0:
                if os.path.exists(tar_name):
                    os.remove(tar_name)
M
MRXLT 已提交
235 236 237
                raise SystemExit(
                    'Download failed, please check your network or permission of {}.'.
                    format(self.module_path))
M
MRXLT 已提交
238 239 240 241 242 243 244 245 246
            else:
                try:
                    print('Decompressing files ..')
                    tar = tarfile.open(tar_name)
                    tar.extractall()
                    tar.close()
                except:
                    if os.path.exists(exe_path):
                        os.remove(exe_path)
M
MRXLT 已提交
247 248 249
                    raise SystemExit(
                        'Decompressing failed, please check your permission of {} or disk space left.'.
                        foemat(self.module_path))
M
MRXLT 已提交
250 251 252 253 254
                finally:
                    os.remove(tar_name)
        os.chdir(self.cur_path)
        self.bin_path = self.server_path + "/serving"

G
guru4elephant 已提交
255
    def prepare_server(self, workdir=None, port=9292, device="cpu"):
G
guru4elephant 已提交
256 257 258
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
G
guru4elephant 已提交
259 260
        else:
            os.system("mkdir {}".format(workdir))
G
guru4elephant 已提交
261
        os.system("touch {}/fluid_time_file".format(workdir))
G
guru4elephant 已提交
262

M
MRXLT 已提交
263
        if not self.port_is_available(port):
M
MRXLT 已提交
264
            raise SystemExit("Prot {} is already used".format(port))
G
guru4elephant 已提交
265
        self._prepare_resource(workdir)
B
barrierye 已提交
266
        self._prepare_engine(self.model_config_paths, device)
G
guru4elephant 已提交
267 268 269
        self._prepare_infer_service(port)
        self.workdir = workdir

G
guru4elephant 已提交
270 271 272 273
        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)
G
guru4elephant 已提交
274 275 276 277 278 279

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

M
MRXLT 已提交
280
    def port_is_available(self, port):
M
MRXLT 已提交
281 282
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(2)
283
            result = sock.connect_ex(('0.0.0.0', port))
M
MRXLT 已提交
284 285 286 287 288
        if result != 0:
            return True
        else:
            return False

G
guru4elephant 已提交
289 290
    def run_server(self):
        # just run server with system command
G
guru4elephant 已提交
291
        # currently we do not load cube
M
MRXLT 已提交
292
        self.check_local_bin()
M
MRXLT 已提交
293 294
        if not self.use_local_bin:
            self.download_bin()
G
guru4elephant 已提交
295
        else:
M
MRXLT 已提交
296
            print("Use local bin : {}".format(self.bin_path))
M
MRXLT 已提交
297 298
        command = "{} " \
                  "-enable_model_toolkit " \
G
guru4elephant 已提交
299 300 301 302 303 304 305 306 307
                  "-inferservice_path {} " \
                  "-inferservice_file {} " \
                  "-max_concurrency {} " \
                  "-num_threads {} " \
                  "-port {} " \
                  "-reload_interval_s {} " \
                  "-resource_path {} " \
                  "-resource_file {} " \
                  "-workflow_path {} " \
M
MRXLT 已提交
308
                  "-workflow_file {} " \
M
MRXLT 已提交
309
                  "-bthread_concurrency {} ".format(
M
MRXLT 已提交
310
                      self.bin_path,
G
guru4elephant 已提交
311 312 313 314 315 316 317 318
                      self.workdir,
                      self.infer_service_fn,
                      self.max_concurrency,
                      self.num_threads,
                      self.port,
                      self.reload_interval_s,
                      self.workdir,
                      self.resource_fn,
M
MRXLT 已提交
319 320
                      self.workdir,
                      self.workflow_fn,
G
guru4elephant 已提交
321
                      self.num_threads)
W
wangjiawei04 已提交
322
        print("Going to Run Command")
G
guru4elephant 已提交
323
        print(command)
G
guru4elephant 已提交
324
        os.system(command)