提交 02cc6c96 编写于 作者: B barrierye

update gpu code

上级 c7ca29f0
......@@ -40,6 +40,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
......
......@@ -41,6 +41,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVQuantInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
......
......@@ -40,8 +40,6 @@ class OpMaker(object):
}
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
......
......@@ -24,6 +24,7 @@ import time
from .version import serving_server_version
from contextlib import closing
import argparse
import collections
def serve_args():
......@@ -61,17 +62,35 @@ class OpMaker(object):
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary
def create(self, name, inputs=[], outputs=[]):
if name not in self.op_dict:
raise Exception("Op name {} is not supported right now".format(
name))
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
node.name = "{}_op".format(name)
node.type = self.op_dict[name]
return node
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
......@@ -80,12 +99,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node):
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
......@@ -94,13 +126,30 @@ class OpSeqMaker(object):
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object):
def __init__(self):
self.server_handle_ = None
self.infer_service_conf = None
self.model_toolkit_conf = None
self.resource_conf = None
self.engine = None
self.memory_optimization = False
self.model_conf = None
self.workflow_fn = "workflow.prototxt"
......@@ -119,6 +168,7 @@ class Server(object):
self.check_cuda()
self.use_local_bin = False
self.gpuid = 0
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
......@@ -135,6 +185,9 @@ class Server(object):
def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False):
self.memory_optimization = flag
......@@ -153,33 +206,31 @@ class Server(object):
def set_gpuid(self, gpuid=0):
self.gpuid = gpuid
def _prepare_engine(self, model_config_path, device):
def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if self.engine == None:
self.engine = server_sdk.EngineDesc()
self.model_config_path = model_config_path
self.engine.name = "general_model"
#self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(self.engine.reloadable_meta))
self.engine.reloadable_type = "timestamp_ne"
self.engine.runtime_thread_num = 0
self.engine.batch_infer_size = 0
self.engine.enable_batch_align = 0
self.engine.model_data_path = model_config_path
self.engine.enable_memory_optimization = self.memory_optimization
self.engine.static_optimization = False
self.engine.force_update_static_cache = False
if device == "cpu":
self.engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu":
self.engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([self.engine])
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
# engine.reloadable_meta = model_config_path + "/fluid_time_file"
engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(engine.reloadable_meta))
engine.reloadable_type = "timestamp_ne"
engine.runtime_thread_num = 0
engine.batch_infer_size = 0
engine.enable_batch_align = 0
engine.model_data_path = model_config_path
engine.enable_memory_optimization = self.memory_optimization
engine.static_optimization = False
engine.force_update_static_cache = False
if device == "cpu":
engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu":
engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port):
if self.infer_service_conf == None:
......@@ -211,10 +262,34 @@ class Server(object):
with open(filepath, "w") as fout:
fout.write(str(pb_obj))
def load_model_config(self, path):
self.model_config_path = path
def load_model_config(self, model_config_paths):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# the default engine name is "general_infer"
self.model_config_paths = {"general_infer_0": model_config_paths}
workflow_oi_config_path = self.model_config_paths["general_infer_0"]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r')
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
......@@ -277,7 +352,7 @@ class Server(object):
self.set_port(port)
self._prepare_resource(workdir)
self._prepare_engine(self.model_config_path, device)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册