__init__.py 7.9 KB
Newer Older
G
guru4elephant 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

G
guru4elephant 已提交
15
import os
16 17 18
from .proto import server_configure_pb2 as server_sdk
from .proto import general_model_config_pb2 as m_config
import google.protobuf.text_format
G
guru4elephant 已提交
19 20 21

class OpMaker(object):
    def __init__(self):
G
guru4elephant 已提交
22
        self.op_dict = {"general_infer":"GeneralInferOp",
G
guru4elephant 已提交
23 24 25 26
                        "general_reader":"GeneralReaderOp",
                        "general_single_kv":"GeneralSingleKVOp",
                        "general_dist_kv":"GeneralDistKVOp"}

G
guru4elephant 已提交
27 28
    # currently, inputs and outputs are not used
    # when we have OpGraphMaker, inputs and outputs are necessary
G
guru4elephant 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
    def create(self, name, inputs=[], outputs=[]):
        if name not in self.op_dict:
            raise Exception("Op name {} is not supported right now".format(name))
        node = server_sdk.DAGNode()
        node.name = "{}_op".format(name)
        node.type = self.op_dict[name]
        return node

class OpSeqMaker(object):
    def __init__(self):
        self.workflow = server_sdk.Workflow()
        self.workflow.name = "workflow1"
        self.workflow.workflow_type = "Sequence"

    def add_op(self, node):
G
guru4elephant 已提交
44 45 46 47 48
        if len(self.workflow.nodes) >= 1:
            dep = server_sdk.DAGNodeDependency()
            dep.name = self.workflow.nodes[-1].name
            dep.mode = "RO"
            node.dependencies.extend([dep])
G
guru4elephant 已提交
49 50 51 52 53 54 55 56 57 58 59 60
        self.workflow.nodes.extend([node])

    def get_op_sequence(self):
        workflow_conf = server_sdk.WorkflowConf()
        workflow_conf.workflows.extend([self.workflow])
        return workflow_conf

class Server(object):
    def __init__(self):
        self.server_handle_ = None
        self.infer_service_conf = None
        self.model_toolkit_conf = None
G
guru4elephant 已提交
61
        self.resource_conf = None
G
guru4elephant 已提交
62
        self.engine = None
63 64
        self.memory_optimization = False
        self.model_conf = None
G
guru4elephant 已提交
65 66 67 68
        self.workflow_fn = "workflow.prototxt"
        self.resource_fn = "resource.prototxt"
        self.infer_service_fn = "infer_service.prototxt"
        self.model_toolkit_fn = "model_toolkit.prototxt"
69
        self.general_model_config_fn = "general_model.prototxt"
G
guru4elephant 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
        self.workdir = ""
        self.max_concurrency = 0
        self.num_threads = 0
        self.port = 8080
        self.reload_interval_s = 10

    def set_max_concurrency(self, concurrency):
        self.max_concurrency = concurrency

    def set_num_threads(self, threads):
        self.num_threads = threads

    def set_port(self, port):
        self.port = port

    def set_reload_interval(self, interval):
        self.reload_interval_s = interval
G
guru4elephant 已提交
87 88 89 90

    def set_op_sequence(self, op_seq):
        self.workflow_conf = op_seq

91 92 93
    def set_memory_optimize(self, flag=False):
        self.memory_optimization = flag

G
guru4elephant 已提交
94 95 96
    def _prepare_engine(self, model_config_path, device):
        if self.model_toolkit_conf == None:
            self.model_toolkit_conf = server_sdk.ModelToolkitConf()
97

G
guru4elephant 已提交
98 99
        if self.engine == None:
            self.engine = server_sdk.EngineDesc()
100 101

        self.model_config_path = model_config_path
G
guru4elephant 已提交
102
        self.engine.name = "general_model"
G
guru4elephant 已提交
103
        self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
104
        os.system("touch {}".format(self.engine.reloadable_meta))
G
guru4elephant 已提交
105
        self.engine.reloadable_type = "timestamp_ne"
G
guru4elephant 已提交
106 107 108
        self.engine.runtime_thread_num = 0
        self.engine.batch_infer_size = 0
        self.engine.enable_batch_align = 0
G
guru4elephant 已提交
109
        self.engine.model_data_path = model_config_path
110
        self.engine.enable_memory_optimization = self.memory_optimization
G
guru4elephant 已提交
111 112
        self.engine.static_optimization = False
        self.engine.force_update_static_cache = False
113

G
guru4elephant 已提交
114 115 116 117
        if device == "cpu":
            self.engine.type = "FLUID_CPU_ANALYSIS_DIR"
        elif device == "gpu":
            self.engine.type = "FLUID_GPU_ANALYSIS_DIR"
118

G
guru4elephant 已提交
119
        self.model_toolkit_conf.engines.extend([self.engine])
G
guru4elephant 已提交
120 121 122 123 124 125 126 127 128 129 130 131

    def _prepare_infer_service(self, port):
        if self.infer_service_conf == None:
            self.infer_service_conf = server_sdk.InferServiceConf()
            self.infer_service_conf.port = port
            infer_service = server_sdk.InferService()
            infer_service.name = "GeneralModelService"
            infer_service.workflows.extend(["workflow1"])
            self.infer_service_conf.services.extend([infer_service])

    def _prepare_resource(self, workdir):
        if self.resource_conf == None:
132 133
            with open("{}/{}".format(workdir, self.general_model_config_fn), "w") as fout:
                fout.write(str(self.model_conf))
G
guru4elephant 已提交
134 135
            self.resource_conf = server_sdk.ResourceConf()
            self.resource_conf.model_toolkit_path = workdir
G
guru4elephant 已提交
136
            self.resource_conf.model_toolkit_file = self.model_toolkit_fn
137 138
            self.resource_conf.general_model_path = workdir
            self.resource_conf.general_model_file = self.general_model_config_fn
G
guru4elephant 已提交
139 140 141 142 143 144

    def _write_pb_str(self, filepath, pb_obj):
        with open(filepath, "w") as fout:
            fout.write(str(pb_obj))

    def load_model_config(self, path):
G
guru4elephant 已提交
145
        self.model_config_path = path
146 147 148 149
        self.model_conf = m_config.GeneralModelConfig()
        f = open("{}/serving_server_conf.prototxt".format(path), 'r')
        self.model_conf = google.protobuf.text_format.Merge(
            str(f.read()), self.model_conf)
G
guru4elephant 已提交
150
        # check config here
151
        # print config here
G
guru4elephant 已提交
152

G
guru4elephant 已提交
153
    def prepare_server(self, workdir=None, port=9292, device="cpu"):
G
guru4elephant 已提交
154 155 156
        if workdir == None:
            workdir = "./tmp"
            os.system("mkdir {}".format(workdir))
G
guru4elephant 已提交
157 158
        else:
            os.system("mkdir {}".format(workdir))
G
guru4elephant 已提交
159
        os.system("touch {}/fluid_time_file".format(workdir))
G
guru4elephant 已提交
160

G
guru4elephant 已提交
161
        self._prepare_resource(workdir)
G
guru4elephant 已提交
162
        self._prepare_engine(self.model_config_path, device)
G
guru4elephant 已提交
163 164 165
        self._prepare_infer_service(port)
        self.workdir = workdir

G
guru4elephant 已提交
166 167 168 169
        infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
        workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
        resource_fn = "{}/{}".format(workdir, self.resource_fn)
        model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)
G
guru4elephant 已提交
170 171 172 173 174 175 176 177

        self._write_pb_str(infer_service_fn, self.infer_service_conf)
        self._write_pb_str(workflow_fn, self.workflow_conf)
        self._write_pb_str(resource_fn, self.resource_conf)
        self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)

    def run_server(self):
        # just run server with system command
G
guru4elephant 已提交
178
        # currently we do not load cube
179
        command = "/home/users/dongdaxiang/github_develop/Serving/build_server/core/general-server/serving" \
G
guru4elephant 已提交
180
                  " -enable_model_toolkit " \
G
guru4elephant 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
                  "-inferservice_path {} " \
                  "-inferservice_file {} " \
                  "-max_concurrency {} " \
                  "-num_threads {} " \
                  "-port {} " \
                  "-reload_interval_s {} " \
                  "-resource_path {} " \
                  "-resource_file {} " \
                  "-workflow_path {} " \
                  "-workflow_file {} ".format(
                      self.workdir,
                      self.infer_service_fn,
                      self.max_concurrency,
                      self.num_threads,
                      self.port,
                      self.reload_interval_s,
                      self.workdir,
                      self.resource_fn,
                      self.workdir, 
                      self.workflow_fn)
        os.system(command)

G
guru4elephant 已提交
203