From 45c8f8ce06692bee47243f14f8cc6ab8fe95f04d Mon Sep 17 00:00:00 2001 From: barrierye Date: Thu, 26 Mar 2020 23:30:51 +0800 Subject: [PATCH] test model ensemble --- core/general-server/op/general_infer_op.cpp | 7 +- core/general-server/op/general_reader_op.cpp | 4 + core/predictor/framework/dag.cpp | 112 +++++++++++++++++-- core/predictor/framework/dag_view.cpp | 16 +++ core/predictor/framework/infer.h | 8 ++ core/predictor/framework/service.cpp | 5 + core/predictor/op/op.cpp | 7 ++ core/predictor/op/op.h | 11 ++ python/examples/fit_a_line/test_client.py | 1 + python/examples/fit_a_line/test_server.py | 12 +- python/paddle_serving_server/__init__.py | 90 ++++++++------- 11 files changed, 217 insertions(+), 56 deletions(-) diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp index 6cec9d1c..8f16b088 100644 --- a/core/general-server/op/general_infer_op.cpp +++ b/core/general-server/op/general_infer_op.cpp @@ -59,8 +59,11 @@ int GeneralInferOp::inference() { int64_t start = timeline.TimeStampUS(); timeline.Start(); - if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) { - LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; + BLOG("engine name: %s", engine_name().c_str()); + if (InferManager::instance().infer( + GeneralInferOp::engine_name().c_str(), in, out, batch_size)) { + LOG(ERROR) << "Failed do infer in fluid model: " + << GeneralInferOp::engine_name(); return -1; } diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp index 8695da25..fde6f13a 100644 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -21,6 +21,9 @@ #include "core/predictor/framework/infer.h" #include "core/predictor/framework/memory.h" #include "core/util/include/timer.h" +#define BLOG(fmt, ...) \ + printf( \ + "[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -98,6 +101,7 @@ int GeneralReaderOp::inference() { baidu::paddle_serving::predictor::Resource::instance(); VLOG(2) << "get resource pointer done."; + BLOG("engine name: %s", engine_name().c_str()); std::shared_ptr model_config = resource.get_general_model_config(); diff --git a/core/predictor/framework/dag.cpp b/core/predictor/framework/dag.cpp index 9aea595e..6d666a4a 100644 --- a/core/predictor/framework/dag.cpp +++ b/core/predictor/framework/dag.cpp @@ -18,6 +18,9 @@ #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/predictor_metric.h" // PredictorMetric #include "core/predictor/op/op.h" +#define BLOG(fmt, ...) \ + printf( \ + "[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -199,25 +202,110 @@ const DagStage* Dag::stage_by_index(uint32_t index) { return _stages[index]; } int Dag::topo_sort() { std::stringstream ss; - for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { - DagStage* stage = new (std::nothrow) DagStage(); - if (stage == NULL) { - LOG(ERROR) << "Invalid stage!"; - return ERR_MEM_ALLOC_FAILURE; + uint32_t nodes_size = _index_nodes.size(); + std::vector in_degree(nodes_size, 0); + std::vector> in_egde(nodes_size); + for (uint32_t nid = 0; nid < nodes_size; nid++) { + in_degree[nid] += _index_nodes[nid]->depends.size(); + for (auto it = _index_nodes[nid]->depends.begin(); + it != _index_nodes[nid]->depends.end(); + ++it) { + uint32_t pnid = Dag::node_by_name(it->first)->id - + 1; // 0 is reserved for begginer-op + in_egde[pnid].push_back(nid); + BLOG("inegde[%d]: %d", pnid, nid); + } + } + for (int i = 0; i < in_degree.size(); ++i) { + BLOG("(%s) in_degree[%d]: %d", + _index_nodes[i]->name.c_str(), + i, + in_degree[i]); + } + int sorted_num = 0; + DagStage* stage = new (std::nothrow) DagStage(); + if (stage == NULL) { + LOG(ERROR) << "Invalid stage!"; + return ERR_MEM_ALLOC_FAILURE; + } + ss.str(""); + ss << _stages.size(); + stage->name = ss.str(); + stage->full_name = full_name() + NAME_DELIMITER + stage->name; + BLOG("stage->full_name: %s", stage->full_name.c_str()); + for (uint32_t nid = 0; nid < nodes_size; ++nid) { + if (in_degree[nid] == 0) { + BLOG("nid: %d", nid); + ++sorted_num; + stage->nodes.push_back(_index_nodes[nid]); + // assign stage number after stage created + _index_nodes[nid]->stage = _stages.size(); + // assign dag node full name after stage created + _index_nodes[nid]->full_name = + stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; } - stage->nodes.push_back(_index_nodes[nid]); + } + + if (stage->nodes.size() == 0) { + LOG(ERROR) << "Invalid Dag!"; + return ERR_INTERNAL_FAILURE; + } + _stages.push_back(stage); + + while (sorted_num < nodes_size) { + auto pre_nodes = _stages.back()->nodes; + DagStage* stage = new (std::nothrow) DagStage(); ss.str(""); ss << _stages.size(); stage->name = ss.str(); stage->full_name = full_name() + NAME_DELIMITER + stage->name; + BLOG("stage->full_name: %s", stage->full_name.c_str()); + for (uint32_t pi = 0; pi < pre_nodes.size(); ++pi) { + uint32_t pnid = pre_nodes[pi]->id - 1; + BLOG("pnid: %d", pnid); + for (uint32_t ei = 0; ei < in_egde[pnid].size(); ++ei) { + uint32_t nid = in_egde[pnid][ei]; + --in_degree[nid]; + BLOG("nid: %d, indeg: %d", nid, in_degree[nid]); + if (in_degree[nid] == 0) { + BLOG("nid: %d", nid); + ++sorted_num; + stage->nodes.push_back(_index_nodes[nid]); + // assign stage number after stage created + _index_nodes[nid]->stage = _stages.size(); + // assign dag node full name after stage created + _index_nodes[nid]->full_name = + stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; + } + } + } + if (stage->nodes.size() == 0) { + LOG(ERROR) << "Invalid Dag!"; + return ERR_INTERNAL_FAILURE; + } _stages.push_back(stage); - - // assign stage number after stage created - _index_nodes[nid]->stage = nid; - // assign dag node full name after stage created - _index_nodes[nid]->full_name = - stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; } + /*std::stringstream ss;*/ + // for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { + // DagStage* stage = new (std::nothrow) DagStage(); + // if (stage == NULL) { + // LOG(ERROR) << "Invalid stage!"; + // return ERR_MEM_ALLOC_FAILURE; + //} + // stage->nodes.push_back(_index_nodes[nid]); + // ss.str(""); + // ss << _stages.size(); + // stage->name = ss.str(); + // stage->full_name = full_name() + NAME_DELIMITER + stage->name; + // BLOG("stage->full_name: %s", stage->full_name.c_str()); + //_stages.push_back(stage); + + //// assign stage number after stage created + //_index_nodes[nid]->stage = nid; + //// assign dag node full name after stage created + //_index_nodes[nid]->full_name = + // stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; + /*}*/ return ERR_OK; } diff --git a/core/predictor/framework/dag_view.cpp b/core/predictor/framework/dag_view.cpp index 743e7341..11ce95cd 100644 --- a/core/predictor/framework/dag_view.cpp +++ b/core/predictor/framework/dag_view.cpp @@ -21,6 +21,12 @@ #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/op_repository.h" +#define BLOG(fmt, ...) \ + printf("[%s:%s]:%d " fmt "\n", \ + __FILE__, \ + __FUNCTION__, \ + __LINE__, \ + ##__VA_ARGS__); namespace baidu { namespace paddle_serving { @@ -76,6 +82,11 @@ int DagView::init(Dag* dag, const std::string& service_name) { } op->set_full_name(service_name + NAME_DELIMITER + node->full_name); + + // Set the name of the Op as the key of the matching engine. + BLOG("op->set_engine_name(%s)", node->name.c_str()); + op->set_engine_name(node->name); + vnode->conf = node; vnode->op = op; vstage->nodes.push_back(vnode); @@ -121,6 +132,7 @@ int DagView::deinit() { int DagView::execute(butil::IOBufBuilder* debug_os) { uint32_t stage_size = _view.size(); for (uint32_t si = 0; si < stage_size; si++) { + BLOG("start to execute stage[%u] %s", si, _view[si]->full_name.c_str()); TRACEPRINTF("start to execute stage[%u]", si); int errcode = execute_one_stage(_view[si], debug_os); TRACEPRINTF("finish to execute stage[%u]", si); @@ -139,12 +151,16 @@ int DagView::execute_one_stage(ViewStage* vstage, butil::IOBufBuilder* debug_os) { butil::Timer stage_time(butil::Timer::STARTED); uint32_t node_size = vstage->nodes.size(); + BLOG("vstage->nodes.size(): %d", node_size); for (uint32_t ni = 0; ni < node_size; ni++) { ViewNode* vnode = vstage->nodes[ni]; DagNode* conf = vnode->conf; Op* op = vnode->op; + BLOG("start to execute op[%s]", op->name()); + BLOG("Op engine name: %s", op->engine_name().c_str()); TRACEPRINTF("start to execute op[%s]", op->name()); int errcode = op->process(debug_os != NULL); + BLOG("finish to execute op[%s]", op->name()); TRACEPRINTF("finish to execute op[%s]", op->name()); if (errcode < 0) { LOG(ERROR) << "Execute failed, Op:" << op->debug_string(); diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 177b6274..b31f80cb 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -22,6 +22,9 @@ #include "core/predictor/framework/bsf.h" #include "core/predictor/framework/factory.h" #include "core/predictor/framework/infer_data.h" +#define BLOG(fmt, ...) \ + printf( \ + "[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -763,6 +766,9 @@ class InferManager { size_t engine_num = model_toolkit_conf.engines_size(); for (size_t ei = 0; ei < engine_num; ++ei) { + BLOG("model_toolkit_conf.engines(%d).name: %s", + ei, + model_toolkit_conf.engines(ei).name().c_str()); std::string engine_name = model_toolkit_conf.engines(ei).name(); VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); if (!engine) { @@ -846,8 +852,10 @@ class InferManager { void* out, uint32_t batch_size = -1) { auto it = _map.find(model_name); + BLOG("find model_name: %s", model_name); if (it == _map.end()) { LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + BLOG("Cannot find engine in map, model name: %s", model_name); return -1; } return it->second->infer(in, out, batch_size); diff --git a/core/predictor/framework/service.cpp b/core/predictor/framework/service.cpp index 95c7db9f..799b88ac 100644 --- a/core/predictor/framework/service.cpp +++ b/core/predictor/framework/service.cpp @@ -30,6 +30,9 @@ #include "core/predictor/framework/predictor_metric.h" // PredictorMetric #include "core/predictor/framework/resource.h" #include "core/predictor/framework/server.h" +#define BLOG(fmt, ...) \ + printf( \ + "[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -161,6 +164,7 @@ int InferService::inference(const google::protobuf::Message* request, return ERR_INTERNAL_FAILURE; } TRACEPRINTF("start to execute workflow[%s]", workflow->name().c_str()); + BLOG("start to execute workflow[%s]", workflow->name().c_str()); int errcode = _execute_workflow(workflow, request, response, debug_os); TRACEPRINTF("finish to execute workflow[%s]", workflow->name().c_str()); if (errcode < 0) { @@ -220,6 +224,7 @@ int InferService::_execute_workflow(Workflow* workflow, // call actual inference interface int errcode = dv->execute(debug_os); + BLOG("execute_workflow"); if (errcode < 0) { LOG(ERROR) << "Failed execute dag for workflow:" << workflow->name(); return errcode; diff --git a/core/predictor/op/op.cpp b/core/predictor/op/op.cpp index d2e512eb..a48f5269 100644 --- a/core/predictor/op/op.cpp +++ b/core/predictor/op/op.cpp @@ -25,6 +25,12 @@ #include "core/predictor/common/utils.h" #include "core/predictor/framework/channel.h" #include "core/predictor/framework/dag.h" +#define BLOG(fmt, ...) \ + printf("[%s:%s]:%d " fmt "\n", \ + __FILE__, \ + __FUNCTION__, \ + __LINE__, \ + ##__VA_ARGS__); namespace baidu { namespace paddle_serving { @@ -133,6 +139,7 @@ int Op::process(bool debug) { } // 2. current inference + BLOG("Op: %s->inference()", _name.c_str()); if (inference() != 0) { return ERR_OP_INFER_FAILURE; } diff --git a/core/predictor/op/op.h b/core/predictor/op/op.h index 84bcf445..3b3f71ff 100644 --- a/core/predictor/op/op.h +++ b/core/predictor/op/op.h @@ -144,6 +144,16 @@ class Op { uint32_t id() const; + // Set the name of the Op as the key of the matching engine. + // Notes that this key is only used by infer_op (only the + // infer_op needs to find the corresponding engine). + // At present, there is only general_infer_op. + void set_engine_name(const std::string engine_name) { + _engine_name = engine_name; + } + + const std::string& engine_name() const { return _engine_name; } + // --------------- Default implements ---------------- virtual int custom_init() { return 0; } @@ -196,6 +206,7 @@ class Op { bool _has_calc; bool _has_init; TimerFlow* _timer; + std::string _engine_name; // only for infer_op }; template diff --git a/python/examples/fit_a_line/test_client.py b/python/examples/fit_a_line/test_client.py index 442ed230..9b04d0af 100644 --- a/python/examples/fit_a_line/test_client.py +++ b/python/examples/fit_a_line/test_client.py @@ -29,3 +29,4 @@ test_reader = paddle.batch( for data in test_reader(): fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"]) print("{} {}".format(fetch_map["price"][0], data[0][1][0])) + exit(0) diff --git a/python/examples/fit_a_line/test_server.py b/python/examples/fit_a_line/test_server.py index 3293be06..a4aa0570 100644 --- a/python/examples/fit_a_line/test_server.py +++ b/python/examples/fit_a_line/test_server.py @@ -21,16 +21,20 @@ from paddle_serving_server import Server op_maker = OpMaker() read_op = op_maker.create('general_reader') -general_infer_op = op_maker.create('general_infer') +g1_infer_op = op_maker.create('general_infer', node_name='g1') +g2_infer_op = op_maker.create('general_infer', node_name='g2') response_op = op_maker.create('general_response') op_seq_maker = OpSeqMaker() op_seq_maker.add_op(read_op) -op_seq_maker.add_op(general_infer_op) -op_seq_maker.add_op(response_op) +op_seq_maker.add_op(g1_infer_op, dependent_nodes=[read_op]) +op_seq_maker.add_op(g2_infer_op, dependent_nodes=[read_op]) +op_seq_maker.add_op(response_op, dependent_nodes=[g1_infer_op, g2_infer_op]) server = Server() server.set_op_sequence(op_seq_maker.get_op_sequence()) -server.load_model_config(sys.argv[1]) +# server.load_model_config(sys.argv[1]) +model_configs = {'g1': 'uci_housing_model', 'g2': 'uci_housing_model'} +server.load_model_config(model_configs) server.prepare_server(workdir="work_dir1", port=9393, device="cpu") server.run_server() diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index d893861e..d6975a93 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -39,13 +39,14 @@ class OpMaker(object): # currently, inputs and outputs are not used # when we have OpGraphMaker, inputs and outputs are necessary - def create(self, name, inputs=[], outputs=[]): - if name not in self.op_dict: - raise Exception("Op name {} is not supported right now".format( - name)) + def create(self, node_type, node_name=None, inputs=[], outputs=[]): + if node_type not in self.op_dict: + raise Exception("Op type {} is not supported right now".format( + node_type)) node = server_sdk.DAGNode() - node.name = "{}_op".format(name) - node.type = self.op_dict[name] + node.name = node_name if node_name is not None else "{}_op".format( + node_type) + node.type = self.op_dict[node_type] return node @@ -55,12 +56,19 @@ class OpSeqMaker(object): self.workflow.name = "workflow1" self.workflow.workflow_type = "Sequence" - def add_op(self, node): - if len(self.workflow.nodes) >= 1: - dep = server_sdk.DAGNodeDependency() - dep.name = self.workflow.nodes[-1].name - dep.mode = "RO" - node.dependencies.extend([dep]) + def add_op(self, node, dependent_nodes=None): + if dependent_nodes is None: + if len(self.workflow.nodes) >= 1: + dep = server_sdk.DAGNodeDependency() + dep.name = self.workflow.nodes[-1].name + dep.mode = "RO" + node.dependencies.extend([dep]) + else: + for dep_node in dependent_nodes: + dep = server_sdk.DAGNodeDependency() + dep.name = dep_node.name + dep.mode = "RO" + node.dependencies.extend([dep]) self.workflow.nodes.extend([node]) def get_op_sequence(self): @@ -75,7 +83,6 @@ class Server(object): self.infer_service_conf = None self.model_toolkit_conf = None self.resource_conf = None - self.engine = None self.memory_optimization = False self.model_conf = None self.workflow_fn = "workflow.prototxt" @@ -93,6 +100,7 @@ class Server(object): self.cur_path = os.getcwd() self.use_local_bin = False self.mkl_flag = False + self.model_config_paths = None def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency @@ -117,32 +125,36 @@ class Server(object): self.use_local_bin = True self.bin_path = os.environ["SERVING_BIN"] - def _prepare_engine(self, model_config_path, device): + def _prepare_engine(self, model_config_paths, device): if self.model_toolkit_conf == None: self.model_toolkit_conf = server_sdk.ModelToolkitConf() - if self.engine == None: - self.engine = server_sdk.EngineDesc() - - self.model_config_path = model_config_path - self.engine.name = "general_model" - self.engine.reloadable_meta = model_config_path + "/fluid_time_file" - os.system("touch {}".format(self.engine.reloadable_meta)) - self.engine.reloadable_type = "timestamp_ne" - self.engine.runtime_thread_num = 0 - self.engine.batch_infer_size = 0 - self.engine.enable_batch_align = 0 - self.engine.model_data_path = model_config_path - self.engine.enable_memory_optimization = self.memory_optimization - self.engine.static_optimization = False - self.engine.force_update_static_cache = False - - if device == "cpu": - self.engine.type = "FLUID_CPU_ANALYSIS_DIR" - elif device == "gpu": - self.engine.type = "FLUID_GPU_ANALYSIS_DIR" - - self.model_toolkit_conf.engines.extend([self.engine]) + if isinstance(model_config_paths, str): + model_config_paths = {"general_infer_op": model_config_paths} + elif not isinstance(model_config_paths, dict): + raise Exception("model_config_paths can not be {}".format( + type(model_config_paths))) + + for engine_name, model_config_path in model_config_paths.items(): + engine = server_sdk.EngineDesc() + engine.name = engine_name + engine.reloadable_meta = model_config_path + "/fluid_time_file" + os.system("touch {}".format(engine.reloadable_meta)) + engine.reloadable_type = "timestamp_ne" + engine.runtime_thread_num = 0 + engine.batch_infer_size = 0 + engine.enable_batch_align = 0 + engine.model_data_path = model_config_path + engine.enable_memory_optimization = self.memory_optimization + engine.static_optimization = False + engine.force_update_static_cache = False + + if device == "cpu": + engine.type = "FLUID_CPU_ANALYSIS_DIR" + elif device == "gpu": + engine.type = "FLUID_GPU_ANALYSIS_DIR" + + self.model_toolkit_conf.engines.extend([engine]) def _prepare_infer_service(self, port): if self.infer_service_conf == None: @@ -175,7 +187,9 @@ class Server(object): with open(filepath, "w") as fout: fout.write(str(pb_obj)) - def load_model_config(self, path): + def load_model_config(self, model_config_paths): + self.model_config_paths = model_config_paths + path = model_config_paths.items()[0][1] self.model_config_path = path self.model_conf = m_config.GeneralModelConfig() f = open("{}/serving_server_conf.prototxt".format(path), 'r') @@ -248,7 +262,7 @@ class Server(object): if not self.port_is_available(port): raise SystemExit("Prot {} is already used".format(port)) self._prepare_resource(workdir) - self._prepare_engine(self.model_config_path, device) + self._prepare_engine(self.model_config_paths, device) self._prepare_infer_service(port) self.workdir = workdir -- GitLab