From 3ac52f6a2bd2b4eb5a4699197ed6bc9eceb12fbf Mon Sep 17 00:00:00 2001 From: barrierye Date: Fri, 10 Apr 2020 18:50:42 +0800 Subject: [PATCH] update code for ensemble --- .../op/{ => bak}/general_copy_op.cpp | 0 .../op/{ => bak}/general_copy_op.h | 0 .../op/{ => bak}/general_dist_kv_infer_op.cpp | 0 .../op/{ => bak}/general_dist_kv_infer_op.h | 0 .../general_dist_kv_quant_infer_op.cpp | 0 .../general_dist_kv_quant_infer_op.h | 0 core/general-server/op/general_infer_op.cpp | 57 ++++++++++++++----- core/general-server/op/general_reader_op.cpp | 4 -- .../general-server/op/general_response_op.cpp | 7 ++- .../op/general_text_response_op.cpp | 7 ++- core/predictor/framework/channel.h | 8 ++- core/predictor/framework/dag_view.cpp | 27 ++++++--- core/predictor/framework/workflow.cpp | 2 + core/predictor/op/op.cpp | 1 + core/predictor/op/op.h | 22 +++++-- ensemble-demo/client.py | 37 ++++++++++++ ensemble-demo/get_data.sh | 4 ++ ensemble-demo/server.py | 42 ++++++++++++++ ensemble-demo/server.seq.py | 38 +++++++++++++ python/paddle_serving_server/__init__.py | 3 +- 20 files changed, 223 insertions(+), 36 deletions(-) rename core/general-server/op/{ => bak}/general_copy_op.cpp (100%) rename core/general-server/op/{ => bak}/general_copy_op.h (100%) rename core/general-server/op/{ => bak}/general_dist_kv_infer_op.cpp (100%) rename core/general-server/op/{ => bak}/general_dist_kv_infer_op.h (100%) rename core/general-server/op/{ => bak}/general_dist_kv_quant_infer_op.cpp (100%) rename core/general-server/op/{ => bak}/general_dist_kv_quant_infer_op.h (100%) create mode 100644 ensemble-demo/client.py create mode 100644 ensemble-demo/get_data.sh create mode 100644 ensemble-demo/server.py create mode 100644 ensemble-demo/server.seq.py diff --git a/core/general-server/op/general_copy_op.cpp b/core/general-server/op/bak/general_copy_op.cpp similarity index 100% rename from core/general-server/op/general_copy_op.cpp rename to core/general-server/op/bak/general_copy_op.cpp diff --git a/core/general-server/op/general_copy_op.h b/core/general-server/op/bak/general_copy_op.h similarity index 100% rename from core/general-server/op/general_copy_op.h rename to core/general-server/op/bak/general_copy_op.h diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/bak/general_dist_kv_infer_op.cpp similarity index 100% rename from core/general-server/op/general_dist_kv_infer_op.cpp rename to core/general-server/op/bak/general_dist_kv_infer_op.cpp diff --git a/core/general-server/op/general_dist_kv_infer_op.h b/core/general-server/op/bak/general_dist_kv_infer_op.h similarity index 100% rename from core/general-server/op/general_dist_kv_infer_op.h rename to core/general-server/op/bak/general_dist_kv_infer_op.h diff --git a/core/general-server/op/general_dist_kv_quant_infer_op.cpp b/core/general-server/op/bak/general_dist_kv_quant_infer_op.cpp similarity index 100% rename from core/general-server/op/general_dist_kv_quant_infer_op.cpp rename to core/general-server/op/bak/general_dist_kv_quant_infer_op.cpp diff --git a/core/general-server/op/general_dist_kv_quant_infer_op.h b/core/general-server/op/bak/general_dist_kv_quant_infer_op.h similarity index 100% rename from core/general-server/op/general_dist_kv_quant_infer_op.h rename to core/general-server/op/bak/general_dist_kv_quant_infer_op.h diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp index 8f16b088..1b550ee2 100644 --- a/core/general-server/op/general_infer_op.cpp +++ b/core/general-server/op/general_infer_op.cpp @@ -37,29 +37,55 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralInferOp::inference() { VLOG(2) << "Going to run inference"; - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - VLOG(2) << "Get precedent op name: " << pre_name(); + //const GeneralBlob *input_blob = get_depend_argument(pre_name()); GeneralBlob *output_blob = mutable_data(); - - if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); - return -1; - } - - const TensorVector *in = &input_blob->tensor_vector; + VLOG(2) << "finish get output_blob"; TensorVector *out = &output_blob->tensor_vector; - int batch_size = input_blob->GetBatchSize(); - VLOG(2) << "input batch size: " << batch_size; + VLOG(2) << "finish get *out"; + + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); + TensorVector input; + int batch_size = 0; + const GeneralBlob *input_blob; + for (int i = 0; i < (int)pre_node_names.size(); ++i) { + VLOG(2) << "pre names[" << i << "]: " + << pre_node_names[i]; + input_blob = + get_depend_argument(pre_node_names[i]); + fprintf(stderr, "input blob address %x\n", input_blob); + if (!input_blob) { + LOG(ERROR) << "Failed mutable depended argument, op:" << pre_node_names[i]; + return -1; + } + batch_size = input_blob->GetBatchSize(); + VLOG(2) << "batch size of input: " << batch_size; + for (int j = 0; j < input_blob->tensor_vector.size(); ++j) { + VLOG(2) << "input tensor[" << j << "]: " + << input_blob->tensor_vector[j].name; + input.push_back(input_blob->tensor_vector[j]); + VLOG(2) << "add an input tensor name: " << input_blob->tensor_vector[j].name; + } + } - output_blob->SetBatchSize(batch_size); + VLOG(2) << "get output blob done."; + const TensorVector *in = &input; + VLOG(2) << "get input done."; + batch_size = 1; VLOG(2) << "infer batch size: " << batch_size; + output_blob->SetBatchSize(batch_size); Timer timeline; int64_t start = timeline.TimeStampUS(); timeline.Start(); - BLOG("engine name: %s", engine_name().c_str()); + VLOG(2) << "input of op " << op_name(); + for (int i = 0; i < in->size(); ++i) { + VLOG(2) << in->at(i).name; + } + + VLOG(2) << "get engine name: " << engine_name().c_str(); if (InferManager::instance().infer( GeneralInferOp::engine_name().c_str(), in, out, batch_size)) { LOG(ERROR) << "Failed do infer in fluid model: " @@ -67,6 +93,11 @@ int GeneralInferOp::inference() { return -1; } + VLOG(2) << "output of op " << op_name(); + for (int i = 0; i < out->size(); ++i) { + VLOG(2) << out->at(i).name; + } + int64_t end = timeline.TimeStampUS(); CopyBlobInfo(input_blob, output_blob); AddBlobInfo(output_blob, start); diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp index fde6f13a..8695da25 100644 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -21,9 +21,6 @@ #include "core/predictor/framework/infer.h" #include "core/predictor/framework/memory.h" #include "core/util/include/timer.h" -#define BLOG(fmt, ...) \ - printf( \ - "[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -101,7 +98,6 @@ int GeneralReaderOp::inference() { baidu::paddle_serving::predictor::Resource::instance(); VLOG(2) << "get resource pointer done."; - BLOG("engine name: %s", engine_name().c_str()); std::shared_ptr model_config = resource.get_general_model_config(); diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp index c5248227..3e29a63c 100644 --- a/core/general-server/op/general_response_op.cpp +++ b/core/general-server/op/general_response_op.cpp @@ -37,10 +37,13 @@ using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralResponseOp::inference() { - const GeneralBlob *input_blob = get_depend_argument(pre_name()); + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); + + const GeneralBlob *input_blob = get_depend_argument(pre_node_names[0]); if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name(); + LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0]; return -1; } diff --git a/core/general-server/op/general_text_response_op.cpp b/core/general-server/op/general_text_response_op.cpp index 43c7af77..d3a0f355 100644 --- a/core/general-server/op/general_text_response_op.cpp +++ b/core/general-server/op/general_text_response_op.cpp @@ -36,10 +36,13 @@ using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralTextResponseOp::inference() { - const GeneralBlob *input_blob = get_depend_argument(pre_name()); + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); + + const GeneralBlob *input_blob = get_depend_argument(pre_node_names[0]); if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name(); + LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0]; return -1; } diff --git a/core/predictor/framework/channel.h b/core/predictor/framework/channel.h index a4836832..2c9b41cb 100644 --- a/core/predictor/framework/channel.h +++ b/core/predictor/framework/channel.h @@ -152,9 +152,13 @@ class OpChannel : public Channel { // functions of derived class - T* data() { return &_data; } + T* data() { + LOG(INFO) << "get data from channel."; + return &_data; } - const T* data() const { return &_data; } + const T* data() const { + LOG(INFO) << "get data from channel."; + return &_data; } Channel& operator=(const T& obj) { _data = obj; diff --git a/core/predictor/framework/dag_view.cpp b/core/predictor/framework/dag_view.cpp index 11ce95cd..a8f7db9d 100644 --- a/core/predictor/framework/dag_view.cpp +++ b/core/predictor/framework/dag_view.cpp @@ -33,6 +33,7 @@ namespace paddle_serving { namespace predictor { int DagView::init(Dag* dag, const std::string& service_name) { + BLOG("DagView::init."); _name = dag->name(); _full_name = service_name + NAME_DELIMITER + dag->name(); _bus = butil::get_object(); @@ -89,20 +90,30 @@ int DagView::init(Dag* dag, const std::string& service_name) { vnode->conf = node; vnode->op = op; + // Add depends + for (auto it = vnode->conf->depends.begin(); + it != vnode->conf->depends.end(); ++it) { + std::string pre_node_name = it->first; + VLOG(2) << "add op pre name: \n" + << "current op name: " << vnode->op->op_name() + << ", previous op name: " << pre_node_name; + vnode->op->add_pre_node_name(pre_node_name); + } vstage->nodes.push_back(vnode); } // TODO(guru4elephant): this seems buggy, please review later - if (si > 0) { - VLOG(2) << "set op pre name: \n" - << "current op name: " << vstage->nodes.back()->op->op_name() - << " previous op name: " - << _view[si - 1]->nodes.back()->op->op_name(); - vstage->nodes.back()->op->set_pre_node_name( - _view[si - 1]->nodes.back()->op->op_name()); - } + /*if (si > 0) {*/ + //VLOG(2) << "set op pre name: \n" + //<< "current op name: " << vstage->nodes.back()->op->op_name() + //<< " previous op name: " + //<< _view[si - 1]->nodes.back()->op->op_name(); + //vstage->nodes.back()->op->set_pre_node_name( + //_view[si - 1]->nodes.back()->op->op_name()); + /*}*/ _view.push_back(vstage); } + BLOG("DagView::finish."); return ERR_OK; } diff --git a/core/predictor/framework/workflow.cpp b/core/predictor/framework/workflow.cpp index 16c4a6e9..f6a73cb8 100644 --- a/core/predictor/framework/workflow.cpp +++ b/core/predictor/framework/workflow.cpp @@ -16,6 +16,7 @@ #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/predictor_metric.h" // PredictorMetric +#define BLOG(fmt, ...) printf("[%s:%s]:%d "fmt"\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) namespace baidu { namespace paddle_serving { @@ -51,6 +52,7 @@ DagView* Workflow::fetch_dag_view(const std::string& service_name) { } void Workflow::return_dag_view(DagView* view) { + BLOG("Workflow::return_dag_vie"); view->deinit(); if (_type == "Sequence") { butil::return_object(view); diff --git a/core/predictor/op/op.cpp b/core/predictor/op/op.cpp index a48f5269..e4d359f2 100644 --- a/core/predictor/op/op.cpp +++ b/core/predictor/op/op.cpp @@ -66,6 +66,7 @@ int Op::init(Bus* bus, return -1; } + _pre_node_names.clear(); return custom_init(); } diff --git a/core/predictor/op/op.h b/core/predictor/op/op.h index 3b3f71ff..0c97e677 100644 --- a/core/predictor/op/op.h +++ b/core/predictor/op/op.h @@ -94,6 +94,10 @@ class Op { template T* mutable_data() { Channel* channel = mutable_channel(); + LOG(INFO) << "succ to get channel!"; + auto x = (dynamic_cast*>(channel))->data(); + LOG(INFO) << "succ to x!"; + return x; return (dynamic_cast*>(channel))->data(); } @@ -132,12 +136,16 @@ class Op { const std::string& full_name() const { return _full_name; } - const std::string& pre_name() const { return _pre_node_name; } + //const std::string& pre_name() const { return _pre_node_name; } + const std::vector& pre_names() const { return _pre_node_names; } void set_full_name(const std::string full_name) { _full_name = full_name; } - void set_pre_node_name(const std::string pre_name) { - _pre_node_name = pre_name; + /*void set_pre_node_name(const std::string pre_name) {*/ + //_pre_node_name = pre_name; + /*}*/ + void add_pre_node_name(const std::string pre_name) { + _pre_node_names.push_back(pre_name); } const std::string& type() const; @@ -199,7 +207,8 @@ class Op { Bus* _bus; Dag* _dag; uint32_t _id; - std::string _pre_node_name; // only for sequential execution + //std::string _pre_node_name; // only for sequential execution + std::vector _pre_node_names; // for dag execution std::string _name; std::string _full_name; // service_workflow_stageindex_opname std::string _type; @@ -222,15 +231,20 @@ class OpWithChannel : public Op { // ---------- Implements ---------- Channel* mutable_channel() { + LOG(INFO) << "op->mutable_data"; if (_channel != NULL) { + LOG(INFO) << "op->mutable_data: return _channel"; return _channel; } + LOG(INFO) << "op->mutable_data: _channel == NULL"; _channel = butil::get_object(); if (!_channel) { + LOG(INFO) << "op->mutable_data: fail to get _channel"; LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name(); return NULL; } + LOG(INFO) << "op->mutable_data: succ to get _channel"; _channel->init(this->id(), this->name()); return _channel; } diff --git a/ensemble-demo/client.py b/ensemble-demo/client.py new file mode 100644 index 00000000..99c25dc2 --- /dev/null +++ b/ensemble-demo/client.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_client import Client +from imdb_reader import IMDBDataset +import sys + +client = Client() +client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(40): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["acc", "cost", "prediction"] + fetch_map = client.predict(feed=feed, fetch=fetch) + print("{} {}".format(i, fetch_map["prediction"][1])) + exit(0) +print('0.633530199528') diff --git a/ensemble-demo/get_data.sh b/ensemble-demo/get_data.sh new file mode 100644 index 00000000..81d8d5d3 --- /dev/null +++ b/ensemble-demo/get_data.sh @@ -0,0 +1,4 @@ +wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz +tar -zxvf text_classification_data.tar.gz +tar -zxvf imdb_model.tar.gz diff --git a/ensemble-demo/server.py b/ensemble-demo/server.py new file mode 100644 index 00000000..5fc75aee --- /dev/null +++ b/ensemble-demo/server.py @@ -0,0 +1,42 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +g1_infer_op = op_maker.create('general_infer', node_name='g1') +g2_infer_op = op_maker.create('general_infer', node_name='g2') +add_op = op_maker.create('general_add') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(g1_infer_op, dependent_nodes=[read_op]) +op_seq_maker.add_op(g2_infer_op, dependent_nodes=[read_op]) +op_seq_maker.add_op(add_op, dependent_nodes=[g1_infer_op, g2_infer_op]) +op_seq_maker.add_op(response_op, dependent_nodes=[add_op]) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +# server.load_model_config(sys.argv[1]) +model_configs = {'g1': 'imdb_bow_model', 'g2': 'imdb_cnn_model'} +server.load_model_config(model_configs) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/ensemble-demo/server.seq.py b/ensemble-demo/server.seq.py new file mode 100644 index 00000000..d8f7275f --- /dev/null +++ b/ensemble-demo/server.seq.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +infer_op = op_maker.create('general_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(infer_op, dependent_nodes=[read_op]) +op_seq_maker.add_op(response_op, dependent_nodes=[infer_op]) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +# server.load_model_config(sys.argv[1]) +model_configs = {'general_infer_op': 'imdb_bow_model'} +server.load_model_config(model_configs) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index c4e4aaf2..baa20c0f 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -34,7 +34,8 @@ class OpMaker(object): "general_single_kv": "GeneralSingleKVOp", "general_dist_kv_infer": "GeneralDistKVInferOp", "general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp", - "general_copy": "GeneralCopyOp" + "general_copy": "GeneralCopyOp", + "general_add": "GeneralAddOp" } # currently, inputs and outputs are not used -- GitLab