diff --git a/README.md b/README.md
index 2b5df8ad51ef2666da29c502ae6fb03b2d19a7b3..970010c53ac6f4bcfe25be1a834e435207df5c18 100644
--- a/README.md
+++ b/README.md
@@ -98,9 +98,12 @@ print(fetch_map)
Chinese Word Segmentation
-- **Description**: Chinese word segmentation HTTP service that can be deployed with one line command.
+- **Description**:
+``` shell
+Chinese word segmentation HTTP service that can be deployed with one line command.
+```
-- **Download**:
+- **Download Servable Package**:
``` shell
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/lac/lac_model_jieba_web.tar.gz
```
@@ -120,9 +123,12 @@ curl -H "Content-Type:application/json" -X POST -d '{"words": "我爱北京天
Image Classification
-- **Description**: Image classification trained with Imagenet dataset. A label and corresponding probability will be returned.
+- **Description**:
+``` shell
+Image classification trained with Imagenet dataset. A label and corresponding probability will be returned.
+```
-- **Download**:
+- **Download Servable Package**:
``` shell
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imagenet-example/imagenet_demo.tar.gz
```
diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto
index cc508e8eaa9ccad1baa022a9c962094213ce4039..bbc4487d69757636178048da05ea608be0b9466a 100644
--- a/core/configure/proto/server_configure.proto
+++ b/core/configure/proto/server_configure.proto
@@ -52,9 +52,10 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; };
message ResourceConf {
required string model_toolkit_path = 1;
required string model_toolkit_file = 2;
- optional string cube_config_file = 3;
- optional string general_model_path = 4;
- optional string general_model_file = 5;
+ optional string general_model_path = 3;
+ optional string general_model_file = 4;
+ optional string cube_config_path = 5;
+ optional string cube_config_file = 6;
};
// DAG node depency info
diff --git a/core/cube/cube-api/include/cube_api.h b/core/cube/cube-api/include/cube_api.h
index 90a6c690e061a8e11697aa5a82b5d38200cf22cc..21d1fea38698c8f202e02d79e662d718a05f9499 100644
--- a/core/cube/cube-api/include/cube_api.h
+++ b/core/cube/cube-api/include/cube_api.h
@@ -99,6 +99,13 @@ class CubeAPI {
std::function parse,
std::string* version);
+ /**
+ * @brief: get all table names from cube server, thread safe.
+ * @param [out] vals: vector of table names
+ *
+ */
+ std::vector get_table_names();
+
public:
static const char* error_msg(int error_code);
diff --git a/core/cube/cube-api/src/cube_api.cpp b/core/cube/cube-api/src/cube_api.cpp
index 4963d307171c7e32fc4add720c092c257d063e83..c38c8bb7f96ad205460400ff5540bc0deb47b835 100644
--- a/core/cube/cube-api/src/cube_api.cpp
+++ b/core/cube/cube-api/src/cube_api.cpp
@@ -682,5 +682,13 @@ int CubeAPI::opt_seek(const std::string& dict_name,
return ret;
}
+std::vector CubeAPI::get_table_names() {
+ const std::vector metas = _meta->metas();
+ std::vector table_names;
+ for (auto itr = metas.begin(); itr != metas.end(); ++itr) {
+ table_names.push_back((*itr)->dict_name);
+ }
+ return table_names;
+}
} // namespace mcube
} // namespace rec
diff --git a/core/general-server/CMakeLists.txt b/core/general-server/CMakeLists.txt
index 82ce53462cb0a5f775cb0ee85b827bba38f5c306..9056e229a51f56463dc2eec5629f219d00dc6a38 100644
--- a/core/general-server/CMakeLists.txt
+++ b/core/general-server/CMakeLists.txt
@@ -1,3 +1,4 @@
+include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../../)
include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(serving ${serving_srcs})
diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp
new file mode 100755
index 0000000000000000000000000000000000000000..ac4e7bb23e9410aede4fd353099d3c90ce91bcd3
--- /dev/null
+++ b/core/general-server/op/general_dist_kv_infer_op.cpp
@@ -0,0 +1,167 @@
+// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "core/general-server/op/general_dist_kv_infer_op.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include "core/cube/cube-api/include/cube_api.h"
+#include "core/predictor/framework/infer.h"
+#include "core/predictor/framework/memory.h"
+#include "core/predictor/framework/resource.h"
+#include "core/util/include/timer.h"
+
+namespace baidu {
+namespace paddle_serving {
+namespace serving {
+
+using baidu::paddle_serving::Timer;
+using baidu::paddle_serving::predictor::MempoolWrapper;
+using baidu::paddle_serving::predictor::general_model::Tensor;
+using baidu::paddle_serving::predictor::general_model::Response;
+using baidu::paddle_serving::predictor::general_model::Request;
+using baidu::paddle_serving::predictor::general_model::FetchInst;
+using baidu::paddle_serving::predictor::InferManager;
+using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
+
+int GeneralDistKVInferOp::inference() {
+ VLOG(2) << "Going to run inference";
+ const GeneralBlob *input_blob = get_depend_argument(pre_name());
+ VLOG(2) << "Get precedent op name: " << pre_name();
+ GeneralBlob *output_blob = mutable_data();
+
+ if (!input_blob) {
+ LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name();
+ return -1;
+ }
+
+ const TensorVector *in = &input_blob->tensor_vector;
+ TensorVector *out = &output_blob->tensor_vector;
+ int batch_size = input_blob->GetBatchSize();
+ VLOG(2) << "input batch size: " << batch_size;
+ std::vector keys;
+ std::vector values;
+ int sparse_count = 0;
+ int dense_count = 0;
+ std::vector> dataptr_size_pairs;
+ size_t key_len = 0;
+ for (size_t i = 0; i < in->size(); ++i) {
+ if (in->at(i).dtype != paddle::PaddleDType::INT64) {
+ ++dense_count;
+ continue;
+ }
+ ++sparse_count;
+ size_t elem_num = 1;
+ for (size_t s = 0; s < in->at(i).shape.size(); ++s) {
+ elem_num *= in->at(i).shape[s];
+ }
+ key_len += elem_num;
+ int64_t *data_ptr = static_cast(in->at(i).data.data());
+ dataptr_size_pairs.push_back(std::make_pair(data_ptr, elem_num));
+ }
+ keys.resize(key_len);
+ int key_idx = 0;
+ for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) {
+ std::copy(dataptr_size_pairs[i].first,
+ dataptr_size_pairs[i].first + dataptr_size_pairs[i].second,
+ keys.begin() + key_idx);
+ key_idx += dataptr_size_pairs[i].second;
+ }
+ rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
+ std::vector table_names = cube->get_table_names();
+ if (table_names.size() == 0) {
+ LOG(ERROR) << "cube init error or cube config not given.";
+ return -1;
+ }
+ int ret = cube->seek(table_names[0], keys, &values);
+
+ if (values.size() != keys.size() || values[0].buff.size() == 0) {
+ LOG(ERROR) << "cube value return null";
+ }
+ size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
+ TensorVector sparse_out;
+ sparse_out.resize(sparse_count);
+ TensorVector dense_out;
+ dense_out.resize(dense_count);
+ int cube_val_idx = 0;
+ int sparse_idx = 0;
+ int dense_idx = 0;
+ std::unordered_map in_out_map;
+ baidu::paddle_serving::predictor::Resource &resource =
+ baidu::paddle_serving::predictor::Resource::instance();
+ std::shared_ptr model_config =
+ resource.get_general_model_config();
+ for (size_t i = 0; i < in->size(); ++i) {
+ if (in->at(i).dtype != paddle::PaddleDType::INT64) {
+ dense_out[dense_idx] = in->at(i);
+ ++dense_idx;
+ continue;
+ }
+
+ sparse_out[sparse_idx].lod.resize(in->at(i).lod.size());
+ for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) {
+ sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size());
+ std::copy(in->at(i).lod[x].begin(),
+ in->at(i).lod[x].end(),
+ sparse_out[sparse_idx].lod[x].begin());
+ }
+ sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32;
+ sparse_out[sparse_idx].shape.push_back(
+ sparse_out[sparse_idx].lod[0].back());
+ sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE);
+ sparse_out[sparse_idx].name = model_config->_feed_name[i];
+ sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() *
+ EMBEDDING_SIZE * sizeof(float));
+ float *dst_ptr = static_cast(sparse_out[sparse_idx].data.data());
+ for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) {
+ float *data_ptr = dst_ptr + x * EMBEDDING_SIZE;
+ memcpy(data_ptr,
+ values[cube_val_idx].buff.data(),
+ values[cube_val_idx].buff.size());
+ cube_val_idx++;
+ }
+ ++sparse_idx;
+ }
+ TensorVector infer_in;
+ infer_in.insert(infer_in.end(), dense_out.begin(), dense_out.end());
+ infer_in.insert(infer_in.end(), sparse_out.begin(), sparse_out.end());
+
+ output_blob->SetBatchSize(batch_size);
+
+ VLOG(2) << "infer batch size: " << batch_size;
+
+ Timer timeline;
+ int64_t start = timeline.TimeStampUS();
+ timeline.Start();
+
+ if (InferManager::instance().infer(
+ GENERAL_MODEL_NAME, &infer_in, out, batch_size)) {
+ LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
+ return -1;
+ }
+
+ int64_t end = timeline.TimeStampUS();
+ CopyBlobInfo(input_blob, output_blob);
+ AddBlobInfo(output_blob, start);
+ AddBlobInfo(output_blob, end);
+ return 0;
+}
+DEFINE_OP(GeneralDistKVInferOp);
+
+} // namespace serving
+} // namespace paddle_serving
+} // namespace baidu
diff --git a/core/general-server/op/general_dist_kv_infer_op.h b/core/general-server/op/general_dist_kv_infer_op.h
new file mode 100644
index 0000000000000000000000000000000000000000..2dee5bca6f9e12dbb8b36a6c39aa0a8e77763d23
--- /dev/null
+++ b/core/general-server/op/general_dist_kv_infer_op.h
@@ -0,0 +1,46 @@
+// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+#include
+#include
+#ifdef BCLOUD
+#ifdef WITH_GPU
+#include "paddle/paddle_inference_api.h"
+#else
+#include "paddle/fluid/inference/api/paddle_inference_api.h"
+#endif
+#else
+#include "paddle_inference_api.h" // NOLINT
+#endif
+#include "core/general-server/general_model_service.pb.h"
+#include "core/general-server/op/general_infer_helper.h"
+
+namespace baidu {
+namespace paddle_serving {
+namespace serving {
+
+class GeneralDistKVInferOp
+ : public baidu::paddle_serving::predictor::OpWithChannel {
+ public:
+ typedef std::vector TensorVector;
+
+ DECLARE_OP(GeneralDistKVInferOp);
+
+ int inference();
+};
+
+} // namespace serving
+} // namespace paddle_serving
+} // namespace baidu
diff --git a/core/predictor/CMakeLists.txt b/core/predictor/CMakeLists.txt
index 38d977f1d2aeee556c46bb16da32bd2b20de3911..1b9dc7b29845a2b8c7f958c1d8e836cb57e91d41 100644
--- a/core/predictor/CMakeLists.txt
+++ b/core/predictor/CMakeLists.txt
@@ -3,7 +3,7 @@ include(common/CMakeLists.txt)
include(op/CMakeLists.txt)
include(mempool/CMakeLists.txt)
include(framework/CMakeLists.txt)
-#include(plugin/CMakeLists.txt)
+include(tools/CMakeLists.txt)
include(src/CMakeLists.txt)
diff --git a/core/predictor/common/CMakeLists.txt b/core/predictor/common/CMakeLists.txt
index 0eebb3a7b79ba1f71930ba8453fd541ecd312675..7beb2317f2b6e84c1424b8ae84de7ea8851e92dc 100644
--- a/core/predictor/common/CMakeLists.txt
+++ b/core/predictor/common/CMakeLists.txt
@@ -1,2 +1,2 @@
-FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
+FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/constant.cpp)
LIST(APPEND pdserving_srcs ${common_srcs})
diff --git a/core/predictor/framework/resource.cpp b/core/predictor/framework/resource.cpp
index f4f9091f689c36bc853730e0dd68cba089c7fb8b..11b015bfcb2f3c4f72a67a5687c4fdfc95481a31 100644
--- a/core/predictor/framework/resource.cpp
+++ b/core/predictor/framework/resource.cpp
@@ -141,6 +141,17 @@ int Resource::initialize(const std::string& path, const std::string& file) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
return -1;
}
+ // init rocksDB or cube instance
+ if (resource_conf.has_cube_config_file() &&
+ resource_conf.has_cube_config_path()) {
+ LOG(INFO) << "init cube client, path[ " << resource_conf.cube_config_path()
+ << " ], config file [ " << resource_conf.cube_config_file()
+ << " ].";
+ rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
+ std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() +
+ "/" + resource_conf.cube_config_file();
+ this->cube_config_fullpath = cube_config_fullpath;
+ }
THREAD_SETSPECIFIC(_tls_bspec_key, NULL);
return 0;
@@ -149,6 +160,15 @@ int Resource::initialize(const std::string& path, const std::string& file) {
// model config
int Resource::general_model_initialize(const std::string& path,
const std::string& file) {
+ if (this->cube_config_fullpath.size() != 0) {
+ LOG(INFO) << "init cube by config file : " << this->cube_config_fullpath;
+ rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
+ int ret = cube->init(this->cube_config_fullpath.c_str());
+ if (ret != 0) {
+ LOG(ERROR) << "cube init error";
+ return -1;
+ }
+ }
VLOG(2) << "general model path: " << path;
VLOG(2) << "general model file: " << file;
if (!FLAGS_enable_general_model) {
diff --git a/core/predictor/framework/resource.h b/core/predictor/framework/resource.h
index 3cf494b97667474157dc744caea4bc77a6b8b443..1a648f0fa363efa4eb915a64553949206ec96153 100644
--- a/core/predictor/framework/resource.h
+++ b/core/predictor/framework/resource.h
@@ -108,6 +108,7 @@ class Resource {
private:
int thread_finalize() { return 0; }
std::shared_ptr _config;
+ std::string cube_config_fullpath;
THREAD_KEY_T _tls_bspec_key;
};
diff --git a/core/predictor/proto/CMakeLists.txt b/core/predictor/proto/CMakeLists.txt
index 773b5bfccea2a787409eb9be4b5722dd85f5fe61..1a9e2d35ef350cbe3dfb62d02d08dad19794300c 100644
--- a/core/predictor/proto/CMakeLists.txt
+++ b/core/predictor/proto/CMakeLists.txt
@@ -7,6 +7,7 @@ LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/./builtin_format.proto
${CMAKE_CURRENT_LIST_DIR}/./msg_data.proto
${CMAKE_CURRENT_LIST_DIR}/./xrecord_format.proto
+ ${CMAKE_CURRENT_LIST_DIR}/./framework.proto
)
PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles})
diff --git a/core/predictor/proto/framework.proto b/core/predictor/proto/framework.proto
new file mode 100755
index 0000000000000000000000000000000000000000..c0fb3ada51bf7b7af41f602bf08d3c9596884aab
--- /dev/null
+++ b/core/predictor/proto/framework.proto
@@ -0,0 +1,217 @@
+/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. */
+
+syntax = "proto2";
+option optimize_for = LITE_RUNTIME;
+package paddle.framework.proto;
+
+// Any incompatible changes to ProgramDesc and its dependencies should
+// raise the version defined version.h.
+//
+// Serailization and Deserialization codes should be modified in a way
+// that supports old versions following the version and compatibility policy.
+message Version { optional int64 version = 1 [ default = 0 ]; }
+
+enum AttrType {
+ INT = 0;
+ FLOAT = 1;
+ STRING = 2;
+ INTS = 3;
+ FLOATS = 4;
+ STRINGS = 5;
+ BOOLEAN = 6;
+ BOOLEANS = 7;
+ BLOCK = 8;
+ LONG = 9;
+ BLOCKS = 10;
+ LONGS = 11;
+}
+
+// OpDesc describes an instance of a C++ framework::OperatorBase
+// derived class type.
+message OpDesc {
+
+ message Attr {
+ required string name = 1;
+ required AttrType type = 2;
+ optional int32 i = 3;
+ optional float f = 4;
+ optional string s = 5;
+ repeated int32 ints = 6;
+ repeated float floats = 7;
+ repeated string strings = 8;
+ optional bool b = 10;
+ repeated bool bools = 11;
+ optional int32 block_idx = 12;
+ optional int64 l = 13;
+ repeated int32 blocks_idx = 14;
+ repeated int64 longs = 15;
+ };
+
+ message Var {
+ required string parameter = 1;
+ repeated string arguments = 2;
+ };
+
+ required string type = 3;
+ repeated Var inputs = 1;
+ repeated Var outputs = 2;
+ repeated Attr attrs = 4;
+ optional bool is_target = 5 [ default = false ];
+};
+
+// OpProto describes a C++ framework::OperatorBase derived class.
+message OpProto {
+
+ // VarProto describes the C++ type framework::Variable.
+ message Var {
+ required string name = 1;
+ required string comment = 2;
+
+ optional bool duplicable = 3 [ default = false ];
+ optional bool intermediate = 4 [ default = false ];
+ optional bool dispensable = 5 [ default = false ];
+ }
+
+ // AttrProto describes the C++ type Attribute.
+ message Attr {
+ required string name = 1;
+ required AttrType type = 2;
+ required string comment = 3;
+ // If that attribute is generated, it means the Paddle third
+ // language binding has responsibility to fill that
+ // attribute. End-User should not set that attribute.
+ optional bool generated = 4 [ default = false ];
+ }
+
+ required string type = 1;
+ repeated Var inputs = 2;
+ repeated Var outputs = 3;
+ repeated Attr attrs = 4;
+ required string comment = 5;
+}
+
+message VarType {
+ enum Type {
+ // Pod Types
+ BOOL = 0;
+ INT16 = 1;
+ INT32 = 2;
+ INT64 = 3;
+ FP16 = 4;
+ FP32 = 5;
+ FP64 = 6;
+ // Tensor is used in C++.
+ SIZE_T = 19;
+ UINT8 = 20;
+ INT8 = 21;
+
+ // Other types that may need additional descriptions
+ LOD_TENSOR = 7;
+ SELECTED_ROWS = 8;
+ FEED_MINIBATCH = 9;
+ FETCH_LIST = 10;
+ STEP_SCOPES = 11;
+ LOD_RANK_TABLE = 12;
+ LOD_TENSOR_ARRAY = 13;
+ PLACE_LIST = 14;
+ READER = 15;
+ // Any runtime decided variable type is raw
+ // raw variables should manage their own allocations
+ // in operators like nccl_op
+ RAW = 17;
+ TUPLE = 18;
+ }
+
+ required Type type = 1;
+
+ message TensorDesc {
+ // Should only be PODType. Is enforced in C++
+ required Type data_type = 1;
+ repeated int64 dims = 2; // [UNK, 640, 480] is saved as [-1, 640, 480]
+ }
+ optional TensorDesc selected_rows = 2;
+
+ message LoDTensorDesc {
+ required TensorDesc tensor = 1;
+ optional int32 lod_level = 2 [ default = 0 ];
+ }
+ optional LoDTensorDesc lod_tensor = 3;
+
+ message LoDTensorArrayDesc {
+ required TensorDesc tensor = 1;
+ optional int32 lod_level = 2 [ default = 0 ];
+ }
+ optional LoDTensorArrayDesc tensor_array = 4;
+
+ message ReaderDesc { repeated LoDTensorDesc lod_tensor = 1; }
+ optional ReaderDesc reader = 5;
+
+ message Tuple { repeated Type element_type = 1; }
+ optional Tuple tuple = 7;
+}
+
+message VarDesc {
+ required string name = 1;
+ required VarType type = 2;
+ optional bool persistable = 3 [ default = false ];
+ // True if the variable is an input data and
+ // have to check the feed data shape and dtype
+ optional bool need_check_feed = 4 [ default = false ];
+}
+
+message BlockDesc {
+ required int32 idx = 1;
+ required int32 parent_idx = 2;
+ repeated VarDesc vars = 3;
+ repeated OpDesc ops = 4;
+ optional int32 forward_block_idx = 5 [ default = -1 ];
+}
+
+// CompatibleInfo is used to determine if a feature is compatible and
+// provides the information.
+message CompatibleInfo {
+ enum Type {
+ COMPATIBLE = 0;
+ DEFINITELY_NOT = 1;
+ POSSIBLE = 2;
+ BUG_FIX = 3;
+ PRECISION_CHANGE = 4;
+ }
+ required string version = 1;
+ required Type type = 2;
+}
+
+// In some cases, Paddle Fluid may perform operator definition iterations,
+// and the operator uses OpCompatibleMap for compatibility testing.
+message OpCompatibleMap {
+ message OpCompatiblePair {
+ required string op_name = 1;
+ required CompatibleInfo compatible_info = 2;
+ }
+ repeated OpCompatiblePair pair = 1;
+ optional string default_required_version = 2;
+}
+
+// Please refer to
+// https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/program.md
+// for more details.
+// TODO(panyx0718): A model can have multiple programs. Need a
+// way to distinguish them. Maybe ID or name?
+message ProgramDesc {
+ reserved 2; // For backward compatibility.
+ repeated BlockDesc blocks = 1;
+ optional Version version = 4;
+ optional OpCompatibleMap op_compatible_map = 3;
+}
diff --git a/core/predictor/tools/CMakeLists.txt b/core/predictor/tools/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..f74e3ea571933665b4b8a3fc795ce4db3f1b1493
--- /dev/null
+++ b/core/predictor/tools/CMakeLists.txt
@@ -0,0 +1,4 @@
+set(seq_gen_src ${CMAKE_CURRENT_LIST_DIR}/seq_generator.cpp ${CMAKE_CURRENT_LIST_DIR}/seq_file.cpp)
+LIST(APPEND seq_gen_src ${PROTO_SRCS})
+add_executable(seq_generator ${seq_gen_src})
+target_link_libraries(seq_generator protobuf -lpthread)
diff --git a/core/predictor/tools/seq_file.cpp b/core/predictor/tools/seq_file.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..839b455d048752df060c5bc073d40906c2c05089
--- /dev/null
+++ b/core/predictor/tools/seq_file.cpp
@@ -0,0 +1,83 @@
+// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include "seq_file.h"
+
+SeqFileWriter::SeqFileWriter(const char *file) {
+ _fs = new std::ofstream(file, std::ios::binary);
+ std::srand(std::time(0));
+ for (int i = 0; i < SYNC_MARKER_SIZE; ++i) {
+ _sync_marker[i] = std::rand() % 255;
+ }
+
+ _write_seq_header();
+
+ _bytes_to_prev_sync = 0;
+}
+
+void SeqFileWriter::close() {
+ _fs->close();
+ delete _fs;
+}
+
+SeqFileWriter::~SeqFileWriter() { close(); }
+
+void SeqFileWriter::_write_sync_marker() {
+ char begin[] = {'\xFF', '\xFF', '\xFF', '\xFF'};
+ _fs->write(begin, 4);
+
+ _fs->write(_sync_marker, SYNC_MARKER_SIZE);
+}
+
+void SeqFileWriter::_write_seq_header() {
+ _fs->write(SEQ_HEADER, sizeof(SEQ_HEADER) - 1);
+ _fs->write(_sync_marker, SYNC_MARKER_SIZE);
+}
+
+int SeqFileWriter::write(const char *key,
+ size_t key_len,
+ const char *value,
+ size_t value_len) {
+ if (key_len != sizeof(int64_t)) {
+ std::cout << "Key length not equal to " << sizeof(int64_t) << std::endl;
+ return -1;
+ }
+
+ uint32_t record_len = key_len + value_len;
+ uint32_t b_record_len = htonl(record_len);
+ uint32_t b_key_len = htonl((uint32_t)key_len);
+ // std::cout << "b_record_len " << b_record_len << " record_len " <<
+ // record_len << std::endl;
+ _fs->write((char *)&b_record_len, sizeof(uint32_t));
+ _fs->write((char *)&b_key_len, sizeof(uint32_t));
+ _fs->write(key, key_len);
+ _fs->write(value, value_len);
+ _bytes_to_prev_sync += record_len;
+
+ if (_bytes_to_prev_sync >= SYNC_INTERVAL) {
+ _write_sync_marker();
+ _bytes_to_prev_sync = 0;
+ }
+
+ return 0;
+}
+
+/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
diff --git a/core/predictor/tools/seq_file.h b/core/predictor/tools/seq_file.h
new file mode 100644
index 0000000000000000000000000000000000000000..ae3944265e2fa1b01a6d0d34c19c0710a45f6cac
--- /dev/null
+++ b/core/predictor/tools/seq_file.h
@@ -0,0 +1,52 @@
+// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SEQ_FILE_H_
+#define __SEQ_FILE_H_
+
+#include
+
+const int SYNC_MARKER_SIZE = 16;
+const char SEQ_HEADER[] =
+ "SEQ\x06"
+ "\"org.apache.hadoop.io.BytesWritable\""
+ "org.apache.hadoop.io.BytesWritable"
+ "\x00\x00\x00\x00\x00\x00";
+const int SYNC_INTERVAL = 2000;
+
+class SeqFileWriter {
+ public:
+ SeqFileWriter(const char *file);
+ ~SeqFileWriter();
+
+ public:
+ int write(const char *key,
+ size_t key_len,
+ const char *value,
+ size_t value_len);
+
+ private:
+ void close();
+ void _write_sync_marker();
+ void _write_seq_header();
+
+ private:
+ char _sync_marker[SYNC_MARKER_SIZE];
+ int _bytes_to_prev_sync;
+ std::ofstream *_fs;
+};
+
+#endif //__SEQ_FILE_H_
+
+/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
diff --git a/core/predictor/tools/seq_generator.cpp b/core/predictor/tools/seq_generator.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..2efda77485b919a3fd14d05b43fa4729c97234fb
--- /dev/null
+++ b/core/predictor/tools/seq_generator.cpp
@@ -0,0 +1,116 @@
+// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+#include
+#include
+#include "core/predictor/framework.pb.h"
+#include "seq_file.h"
+using paddle::framework::proto::VarType;
+std::map var_type_size;
+void reg_var_types() {
+ var_type_size[static_cast(VarType::FP16)] = sizeof(int16_t);
+ var_type_size[static_cast(VarType::FP32)] = sizeof(float);
+ var_type_size[static_cast(VarType::FP64)] = sizeof(double);
+ var_type_size[static_cast(VarType::INT32)] = sizeof(int);
+ var_type_size[static_cast(VarType::INT64)] = sizeof(int64_t);
+ var_type_size[static_cast(VarType::BOOL)] = sizeof(bool);
+ var_type_size[static_cast(VarType::SIZE_T)] = sizeof(size_t);
+ var_type_size[static_cast(VarType::INT16)] = sizeof(int16_t);
+ var_type_size[static_cast(VarType::UINT8)] = sizeof(uint8_t);
+ var_type_size[static_cast(VarType::INT8)] = sizeof(int8_t);
+}
+int dump_parameter(const char *input_file, const char *output_file) {
+ std::ifstream is(input_file);
+ // the 1st field, unit32_t version for LoDTensor
+ uint32_t version;
+ is.read(reinterpret_cast(&version), sizeof(version));
+ if (version != 0) {
+ std::cout << "Version number " << version << " not supported" << std::endl;
+ return -1;
+ }
+ // the 2st field, LoD information
+ uint64_t lod_level;
+ is.read(reinterpret_cast(&lod_level), sizeof(lod_level));
+ std::vector> lod;
+ lod.resize(lod_level);
+ for (uint64_t i = 0; i < lod_level; ++i) {
+ uint64_t size;
+ is.read(reinterpret_cast(&size), sizeof(size));
+ std::vector tmp(size / sizeof(size_t));
+ is.read(reinterpret_cast(tmp.data()),
+ static_cast(size));
+ lod[i] = tmp;
+ }
+ // the 3st filed, Tensor
+ // Note: duplicate version field
+ is.read(reinterpret_cast(&version), sizeof(version));
+ if (version != 0) {
+ std::cout << "Version number " << version << " not supported" << std::endl;
+ return -1;
+ }
+ // int32_t size
+ // proto buffer
+ VarType::TensorDesc desc;
+ int32_t size;
+ is.read(reinterpret_cast(&size), sizeof(size));
+ std::unique_ptr buf(new char[size]);
+ is.read(reinterpret_cast(buf.get()), size);
+ if (!desc.ParseFromArray(buf.get(), size)) {
+ std::cout << "Cannot parse tensor desc" << std::endl;
+ return -1;
+ }
+ // read tensor
+ std::vector dims;
+ dims.reserve(static_cast(desc.dims().size()));
+ std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims));
+ std::cout << "Dims:";
+ for (auto x : dims) {
+ std::cout << " " << x;
+ }
+ std::cout << std::endl;
+ if (dims.size() != 2) {
+ std::cout << "Parameter dims not 2D" << std::endl;
+ return -1;
+ }
+ size_t numel = 1;
+ for (auto x : dims) {
+ numel *= x;
+ }
+ size_t buf_size = numel * var_type_size[desc.data_type()];
+ char *tensor_buf = new char[buf_size];
+ is.read(static_cast(tensor_buf), buf_size);
+ is.close();
+ SeqFileWriter seq_file_writer(output_file);
+ int value_buf_len = var_type_size[desc.data_type()] * dims[1];
+ char *value_buf = new char[value_buf_len];
+ size_t offset = 0;
+ for (int64_t i = 0; i < dims[0]; ++i) {
+ // std::cout << "key_len " << key_len << " value_len " << value_buf_len <<
+ // std::endl;
+ memcpy(value_buf, tensor_buf + offset, value_buf_len);
+ seq_file_writer.write((char *)&i, sizeof(i), value_buf, value_buf_len);
+ offset += value_buf_len;
+ }
+ return 0;
+}
+int main(int argc, char **argv) {
+ if (argc != 3) {
+ std::cout << "Usage: seq_generator PARAMETER_FILE OUTPUT_FILE" << std::endl;
+ return -1;
+ }
+ reg_var_types();
+ dump_parameter(argv[1], argv[2]);
+}
+/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
diff --git a/doc/criteo-cube-benchmark-avgcost.png b/doc/criteo-cube-benchmark-avgcost.png
new file mode 100644
index 0000000000000000000000000000000000000000..5db3a04ffb87ac57202aa6d1e92ead3449b0b5d1
Binary files /dev/null and b/doc/criteo-cube-benchmark-avgcost.png differ
diff --git a/doc/criteo-cube-benchmark-qps.png b/doc/criteo-cube-benchmark-qps.png
new file mode 100644
index 0000000000000000000000000000000000000000..4b7ce971ff5840c06beda2bd20a55cc585c9d11f
Binary files /dev/null and b/doc/criteo-cube-benchmark-qps.png differ
diff --git a/python/examples/criteo_ctr_with_cube/README.md b/python/examples/criteo_ctr_with_cube/README.md
new file mode 100755
index 0000000000000000000000000000000000000000..9c80f935454ca5b6c5de961c4f06c83ebca5a5b3
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/README.md
@@ -0,0 +1,61 @@
+## 带稀疏参数服务器的CTR预测服务
+
+### 获取样例数据
+```
+sh get_data.sh
+```
+
+### 保存模型和配置文件
+```
+python local_train.py
+```
+执行脚本后会在当前目录生成ctr_server_model和ctr_client_config文件夹,以及ctr_server_model_kv, ctr_client_conf_kv。
+
+### 启动稀疏参数服务器
+```
+cp ../../../build_server/core/predictor/seq_generator seq_generator
+cp ../../../build_server/output/bin/cube* ./cube/
+sh cube_prepare.sh &
+```
+
+### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置)
+
+```
+python test_server.py ctr_serving_model_kv
+```
+
+### 执行预测
+
+```
+python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data
+```
+
+### Benchmark
+
+设备 :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz
+
+模型 :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/ctr_criteo_with_cube/network_conf.py)
+
+server core/thread num : 4/8
+
+执行
+```
+bash benchmark.sh
+```
+客户端每个线程会发送1000个batch
+
+| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps |
+| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- |
+| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 |
+| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 |
+| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 |
+| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 |
+| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 |
+
+平均每个线程耗时图如下
+
+![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png)
+
+每个线程QPS耗时如下
+
+![qps](../../../doc/criteo-cube-benchmark-qps.png)
diff --git a/python/examples/criteo_ctr_with_cube/args.py b/python/examples/criteo_ctr_with_cube/args.py
new file mode 100755
index 0000000000000000000000000000000000000000..30124d4ebd9cd27cdb4580e654a8a47c55b178bf
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/args.py
@@ -0,0 +1,105 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+import argparse
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
+ parser.add_argument(
+ '--train_data_path',
+ type=str,
+ default='./data/raw/train.txt',
+ help="The path of training dataset")
+ parser.add_argument(
+ '--sparse_only',
+ type=bool,
+ default=False,
+ help="Whether we use sparse features only")
+ parser.add_argument(
+ '--test_data_path',
+ type=str,
+ default='./data/raw/valid.txt',
+ help="The path of testing dataset")
+ parser.add_argument(
+ '--batch_size',
+ type=int,
+ default=1000,
+ help="The size of mini-batch (default:1000)")
+ parser.add_argument(
+ '--embedding_size',
+ type=int,
+ default=10,
+ help="The size for embedding layer (default:10)")
+ parser.add_argument(
+ '--num_passes',
+ type=int,
+ default=10,
+ help="The number of passes to train (default: 10)")
+ parser.add_argument(
+ '--model_output_dir',
+ type=str,
+ default='models',
+ help='The path for model to store (default: models)')
+ parser.add_argument(
+ '--sparse_feature_dim',
+ type=int,
+ default=1000001,
+ help='sparse feature hashing space for index processing')
+ parser.add_argument(
+ '--is_local',
+ type=int,
+ default=1,
+ help='Local train or distributed train (default: 1)')
+ parser.add_argument(
+ '--cloud_train',
+ type=int,
+ default=0,
+ help='Local train or distributed train on paddlecloud (default: 0)')
+ parser.add_argument(
+ '--async_mode',
+ action='store_true',
+ default=False,
+ help='Whether start pserver in async mode to support ASGD')
+ parser.add_argument(
+ '--no_split_var',
+ action='store_true',
+ default=False,
+ help='Whether split variables into blocks when update_method is pserver')
+ parser.add_argument(
+ '--role',
+ type=str,
+ default='pserver', # trainer or pserver
+ help='The path for model to store (default: models)')
+ parser.add_argument(
+ '--endpoints',
+ type=str,
+ default='127.0.0.1:6000',
+ help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
+ parser.add_argument(
+ '--current_endpoint',
+ type=str,
+ default='127.0.0.1:6000',
+ help='The path for model to store (default: 127.0.0.1:6000)')
+ parser.add_argument(
+ '--trainer_id',
+ type=int,
+ default=0,
+ help='The path for model to store (default: models)')
+ parser.add_argument(
+ '--trainers',
+ type=int,
+ default=1,
+ help='The num of trianers, (default: 1)')
+ return parser.parse_args()
diff --git a/python/examples/criteo_ctr_with_cube/benchmark.py b/python/examples/criteo_ctr_with_cube/benchmark.py
new file mode 100755
index 0000000000000000000000000000000000000000..e5bde9f996fccc41027fa6d255ca227cba212e22
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/benchmark.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+from paddle_serving_client import Client
+import sys
+import os
+import criteo as criteo
+import time
+from paddle_serving_client.utils import MultiThreadRunner
+from paddle_serving_client.utils import benchmark_args
+from paddle_serving_client.metric import auc
+
+args = benchmark_args()
+
+
+def single_func(idx, resource):
+ client = Client()
+ client.load_client_config('ctr_client_conf/serving_client_conf.prototxt')
+ client.connect(['127.0.0.1:9292'])
+ batch = 1
+ buf_size = 100
+ dataset = criteo.CriteoDataset()
+ dataset.setup(1000001)
+ test_filelists = [
+ "./raw_data/part-%d" % x for x in range(len(os.listdir("./raw_data")))
+ ]
+ reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:],
+ batch, buf_size)
+ args.batch_size = 1
+ if args.request == "rpc":
+ fetch = ["prob"]
+ print("Start Time")
+ start = time.time()
+ itr = 1000
+ for ei in range(itr):
+ if args.batch_size == 1:
+ data = reader().next()
+ feed_dict = {}
+ feed_dict['dense_input'] = data[0][0]
+ for i in range(1, 27):
+ feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i]
+ result = client.predict(feed=feed_dict, fetch=fetch)
+ else:
+ print("unsupport batch size {}".format(args.batch_size))
+
+ elif args.request == "http":
+ raise ("Not support http service.")
+ end = time.time()
+ qps = itr / (end - start)
+ return [[end - start, qps]]
+
+
+if __name__ == '__main__':
+ multi_thread_runner = MultiThreadRunner()
+ endpoint_list = ["127.0.0.1:9292"]
+ #result = single_func(0, {"endpoint": endpoint_list})
+ result = multi_thread_runner.run(single_func, args.thread,
+ {"endpoint": endpoint_list})
+ avg_cost = 0
+ qps = 0
+ for i in range(args.thread):
+ avg_cost += result[0][i * 2 + 0]
+ qps += result[0][i * 2 + 1]
+ avg_cost = avg_cost / args.thread
+ print("average total cost {} s.".format(avg_cost))
+ print("qps {} ins/s".format(qps))
diff --git a/python/examples/criteo_ctr_with_cube/benchmark.sh b/python/examples/criteo_ctr_with_cube/benchmark.sh
new file mode 100755
index 0000000000000000000000000000000000000000..4bea258a5cfa4e12ed6848c61270fe44bbc7ba44
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/benchmark.sh
@@ -0,0 +1,10 @@
+rm profile_log
+batch_size=1
+for thread_num in 1 2 4 8 16
+do
+ $PYTHONROOT/bin/python benchmark.py --thread $thread_num --model ctr_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1
+ echo "========================================"
+ echo "batch size : $batch_size" >> profile_log
+ $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log
+ tail -n 2 profile >> profile_log
+done
diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.py b/python/examples/criteo_ctr_with_cube/benchmark_batch.py
new file mode 100755
index 0000000000000000000000000000000000000000..b4b15892375e830486afa320151fac619aab2ba7
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/benchmark_batch.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+from paddle_serving_client import Client
+import sys
+import os
+import criteo as criteo
+import time
+from paddle_serving_client.utils import MultiThreadRunner
+from paddle_serving_client.utils import benchmark_args
+from paddle_serving_client.metric import auc
+
+args = benchmark_args()
+
+
+def single_func(idx, resource):
+ client = Client()
+ print([resource["endpoint"][idx % len(resource["endpoint"])]])
+ client.load_client_config('ctr_client_conf/serving_client_conf.prototxt')
+ client.connect(['127.0.0.1:9292'])
+ batch = 1
+ buf_size = 100
+ dataset = criteo.CriteoDataset()
+ dataset.setup(1000001)
+ test_filelists = [
+ "./raw_data/part-%d" % x for x in range(len(os.listdir("./raw_data")))
+ ]
+ reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:],
+ batch, buf_size)
+ if args.request == "rpc":
+ fetch = ["prob"]
+ start = time.time()
+ itr = 1000
+ for ei in range(itr):
+ if args.batch_size > 1:
+ feed_batch = []
+ for bi in range(args.batch_size):
+ data = reader().next()
+ feed_dict = {}
+ feed_dict['dense_input'] = data[0][0]
+ for i in range(1, 27):
+ feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][
+ i]
+ feed_batch.append(feed_dict)
+ result = client.batch_predict(
+ feed_batch=feed_batch, fetch=fetch)
+ else:
+ print("unsupport batch size {}".format(args.batch_size))
+
+ elif args.request == "http":
+ raise ("Not support http service.")
+ end = time.time()
+ qps = itr * args.batch_size / (end - start)
+ return [[end - start, qps]]
+
+
+if __name__ == '__main__':
+ multi_thread_runner = MultiThreadRunner()
+ endpoint_list = ["127.0.0.1:9292"]
+ #result = single_func(0, {"endpoint": endpoint_list})
+ result = multi_thread_runner.run(single_func, args.thread,
+ {"endpoint": endpoint_list})
+ print(result)
+ avg_cost = 0
+ qps = 0
+ for i in range(args.thread):
+ avg_cost += result[0][i * 2 + 0]
+ qps += result[0][i * 2 + 1]
+ avg_cost = avg_cost / args.thread
+ print("average total cost {} s.".format(avg_cost))
+ print("qps {} ins/s".format(qps))
diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.sh b/python/examples/criteo_ctr_with_cube/benchmark_batch.sh
new file mode 100755
index 0000000000000000000000000000000000000000..3a51c0de68bf47fb798c165d2fb34868056ddab6
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/benchmark_batch.sh
@@ -0,0 +1,12 @@
+rm profile_log
+for thread_num in 1 2 4 8 16
+do
+for batch_size in 1 2 4 8 16 32 64 128 256 512
+do
+ $PYTHONROOT/bin/python benchmark_batch.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1
+ echo "========================================"
+ echo "batch size : $batch_size" >> profile_log
+ $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log
+ tail -n 2 profile >> profile_log
+done
+done
diff --git a/python/examples/criteo_ctr_with_cube/clean.sh b/python/examples/criteo_ctr_with_cube/clean.sh
new file mode 100755
index 0000000000000000000000000000000000000000..522a602a4e1ea8c9fb8902d8cd0d2d872cba6edd
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/clean.sh
@@ -0,0 +1,5 @@
+ps -ef | grep cube | awk {'print $2'} | xargs kill -9
+ps -ef | grep SimpleHTTPServer | awk {'print $2'} | xargs kill -9
+rm -rf cube/cube_data cube/data cube/log* cube/nohup* cube/output/ cube/donefile cube/input cube/monitor cube/cube-builder.INFO
+ps -ef | grep test | awk {'print $2'} | xargs kill -9
+ps -ef | grep serving | awk {'print $2'} | xargs kill -9
diff --git a/python/examples/criteo_ctr_with_cube/criteo.py b/python/examples/criteo_ctr_with_cube/criteo.py
new file mode 100755
index 0000000000000000000000000000000000000000..f37eb1d2c1d8df6975ec0c28923c6e17c0272290
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/criteo.py
@@ -0,0 +1,81 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+
+class CriteoDataset(object):
+ def setup(self, sparse_feature_dim):
+ self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+ self.cont_max_ = [
+ 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
+ ]
+ self.cont_diff_ = [
+ 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
+ ]
+ self.hash_dim_ = sparse_feature_dim
+ # here, training data are lines with line_index < train_idx_
+ self.train_idx_ = 41256555
+ self.continuous_range_ = range(1, 14)
+ self.categorical_range_ = range(14, 40)
+
+ def _process_line(self, line):
+ features = line.rstrip('\n').split('\t')
+ dense_feature = []
+ sparse_feature = []
+ for idx in self.continuous_range_:
+ if features[idx] == '':
+ dense_feature.append(0.0)
+ else:
+ dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \
+ self.cont_diff_[idx - 1])
+ for idx in self.categorical_range_:
+ sparse_feature.append(
+ [hash(str(idx) + features[idx]) % self.hash_dim_])
+
+ return dense_feature, sparse_feature, [int(features[0])]
+
+ def infer_reader(self, filelist, batch, buf_size):
+ def local_iter():
+ for fname in filelist:
+ with open(fname.strip(), "r") as fin:
+ for line in fin:
+ dense_feature, sparse_feature, label = self._process_line(
+ line)
+ #yield dense_feature, sparse_feature, label
+ yield [dense_feature] + sparse_feature + [label]
+
+ import paddle
+ batch_iter = paddle.batch(
+ paddle.reader.shuffle(
+ local_iter, buf_size=buf_size),
+ batch_size=batch)
+ return batch_iter
+
+ def generate_sample(self, line):
+ def data_iter():
+ dense_feature, sparse_feature, label = self._process_line(line)
+ feature_name = ["dense_input"]
+ for idx in self.categorical_range_:
+ feature_name.append("C" + str(idx - 13))
+ feature_name.append("label")
+ yield zip(feature_name, [dense_feature] + sparse_feature + [label])
+
+ return data_iter
+
+
+if __name__ == "__main__":
+ criteo_dataset = CriteoDataset()
+ criteo_dataset.setup(int(sys.argv[1]))
+ criteo_dataset.run_from_stdin()
diff --git a/python/examples/criteo_ctr_with_cube/criteo_reader.py b/python/examples/criteo_ctr_with_cube/criteo_reader.py
new file mode 100755
index 0000000000000000000000000000000000000000..2a80af78a9c2033bf246f703ca70a817ab786af3
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/criteo_reader.py
@@ -0,0 +1,83 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+import sys
+import paddle.fluid.incubate.data_generator as dg
+
+
+class CriteoDataset(dg.MultiSlotDataGenerator):
+ def setup(self, sparse_feature_dim):
+ self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+ self.cont_max_ = [
+ 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
+ ]
+ self.cont_diff_ = [
+ 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
+ ]
+ self.hash_dim_ = sparse_feature_dim
+ # here, training data are lines with line_index < train_idx_
+ self.train_idx_ = 41256555
+ self.continuous_range_ = range(1, 14)
+ self.categorical_range_ = range(14, 40)
+
+ def _process_line(self, line):
+ features = line.rstrip('\n').split('\t')
+ dense_feature = []
+ sparse_feature = []
+ for idx in self.continuous_range_:
+ if features[idx] == '':
+ dense_feature.append(0.0)
+ else:
+ dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \
+ self.cont_diff_[idx - 1])
+ for idx in self.categorical_range_:
+ sparse_feature.append(
+ [hash(str(idx) + features[idx]) % self.hash_dim_])
+
+ return dense_feature, sparse_feature, [int(features[0])]
+
+ def infer_reader(self, filelist, batch, buf_size):
+ def local_iter():
+ for fname in filelist:
+ with open(fname.strip(), "r") as fin:
+ for line in fin:
+ dense_feature, sparse_feature, label = self._process_line(
+ line)
+ #yield dense_feature, sparse_feature, label
+ yield [dense_feature] + sparse_feature + [label]
+
+ import paddle
+ batch_iter = paddle.batch(
+ paddle.reader.shuffle(
+ local_iter, buf_size=buf_size),
+ batch_size=batch)
+ return batch_iter
+
+ def generate_sample(self, line):
+ def data_iter():
+ dense_feature, sparse_feature, label = self._process_line(line)
+ feature_name = ["dense_input"]
+ for idx in self.categorical_range_:
+ feature_name.append("C" + str(idx - 13))
+ feature_name.append("label")
+ yield zip(feature_name, [dense_feature] + sparse_feature + [label])
+
+ return data_iter
+
+
+if __name__ == "__main__":
+ criteo_dataset = CriteoDataset()
+ criteo_dataset.setup(int(sys.argv[1]))
+ criteo_dataset.run_from_stdin()
diff --git a/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf b/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf
new file mode 100755
index 0000000000000000000000000000000000000000..b70f6e34247e410f9b80054010338d3c8f452ec6
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf
@@ -0,0 +1,13 @@
+[{
+ "dict_name": "test_dict",
+ "shard": 1,
+ "dup": 1,
+ "timeout": 200,
+ "retry": 3,
+ "backup_request": 100,
+ "type": "ipport_list",
+ "load_balancer": "rr",
+ "nodes": [{
+ "ipport_list": "list://127.0.0.1:8027"
+ }]
+}]
diff --git a/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf b/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf
new file mode 100755
index 0000000000000000000000000000000000000000..21c7bddebd8f22b91d0ba26a6121007f96a4380b
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf
@@ -0,0 +1,4 @@
+--port=8027
+--dict_split=1
+--in_mem=true
+--log_dir=./log/
diff --git a/python/examples/criteo_ctr_with_cube/cube/keys b/python/examples/criteo_ctr_with_cube/cube/keys
new file mode 100755
index 0000000000000000000000000000000000000000..f00c965d8307308469e537302baa73048488f162
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/cube/keys
@@ -0,0 +1,10 @@
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
diff --git a/python/examples/criteo_ctr_with_cube/cube_prepare.sh b/python/examples/criteo_ctr_with_cube/cube_prepare.sh
new file mode 100755
index 0000000000000000000000000000000000000000..ceeda0a603e7474f0845333ef94e05d923bde4f4
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/cube_prepare.sh
@@ -0,0 +1,7 @@
+
+mkdir -p cube_model
+mkdir -p cube/data
+./seq_generator ctr_serving_model/SparseFeatFactors ./cube_model/feature
+./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=./cube/data -shard_num=1 -only_build=false
+mv ./cube/data/0_0/test_dict_part0/* ./cube/data/
+cd cube && ./cube
diff --git a/python/examples/criteo_ctr_with_cube/get_data.sh b/python/examples/criteo_ctr_with_cube/get_data.sh
new file mode 100755
index 0000000000000000000000000000000000000000..1f244b3a4aa81488bb493825576ba30c4b3bba22
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/get_data.sh
@@ -0,0 +1,2 @@
+wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz
+tar -zxvf ctr_data.tar.gz
diff --git a/python/examples/criteo_ctr_with_cube/local_train.py b/python/examples/criteo_ctr_with_cube/local_train.py
new file mode 100755
index 0000000000000000000000000000000000000000..d4a1bc930924e348048f7ac3e5c46381d9b6441b
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/local_train.py
@@ -0,0 +1,100 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+from __future__ import print_function
+
+from args import parse_args
+import os
+import paddle.fluid as fluid
+import sys
+from network_conf import dnn_model
+
+dense_feature_dim = 13
+
+
+def train():
+ args = parse_args()
+ sparse_only = args.sparse_only
+ if not os.path.isdir(args.model_output_dir):
+ os.mkdir(args.model_output_dir)
+ dense_input = fluid.layers.data(
+ name="dense_input", shape=[dense_feature_dim], dtype='float32')
+ sparse_input_ids = [
+ fluid.layers.data(
+ name="C" + str(i), shape=[1], lod_level=1, dtype="int64")
+ for i in range(1, 27)
+ ]
+ label = fluid.layers.data(name='label', shape=[1], dtype='int64')
+
+ #nn_input = None if sparse_only else dense_input
+ nn_input = dense_input
+ predict_y, loss, auc_var, batch_auc_var, infer_vars = dnn_model(
+ nn_input, sparse_input_ids, label, args.embedding_size,
+ args.sparse_feature_dim)
+
+ optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
+ optimizer.minimize(loss)
+
+ exe = fluid.Executor(fluid.CPUPlace())
+ exe.run(fluid.default_startup_program())
+ dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
+ dataset.set_use_var([dense_input] + sparse_input_ids + [label])
+
+ python_executable = "python"
+ pipe_command = "{} criteo_reader.py {}".format(python_executable,
+ args.sparse_feature_dim)
+
+ dataset.set_pipe_command(pipe_command)
+ dataset.set_batch_size(128)
+ thread_num = 10
+ dataset.set_thread(thread_num)
+
+ whole_filelist = [
+ "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))
+ ]
+
+ print(whole_filelist)
+ dataset.set_filelist(whole_filelist[:100])
+ dataset.load_into_memory()
+ fluid.layers.Print(auc_var)
+ epochs = 1
+ for i in range(epochs):
+ exe.train_from_dataset(
+ program=fluid.default_main_program(), dataset=dataset, debug=True)
+ print("epoch {} finished".format(i))
+
+ import paddle_serving_client.io as server_io
+ feed_var_dict = {}
+ feed_var_dict['dense_input'] = dense_input
+ for i, sparse in enumerate(sparse_input_ids):
+ feed_var_dict["embedding_{}.tmp_0".format(i)] = sparse
+ fetch_var_dict = {"prob": predict_y}
+
+ feed_kv_dict = {}
+ feed_kv_dict['dense_input'] = dense_input
+ for i, emb in enumerate(infer_vars):
+ feed_kv_dict["embedding_{}.tmp_0".format(i)] = emb
+ fetch_var_dict = {"prob": predict_y}
+
+ server_io.save_model("ctr_serving_model", "ctr_client_conf", feed_var_dict,
+ fetch_var_dict, fluid.default_main_program())
+
+ server_io.save_model("ctr_serving_model_kv", "ctr_client_conf_kv",
+ feed_kv_dict, fetch_var_dict,
+ fluid.default_main_program())
+
+
+if __name__ == '__main__':
+ train()
diff --git a/python/examples/criteo_ctr_with_cube/network_conf.py b/python/examples/criteo_ctr_with_cube/network_conf.py
new file mode 100755
index 0000000000000000000000000000000000000000..2975533a72ad21d6dd5896446fd06c1f9bdfe8b4
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/network_conf.py
@@ -0,0 +1,77 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+import paddle.fluid as fluid
+import math
+
+
+def dnn_model(dense_input, sparse_inputs, label, embedding_size,
+ sparse_feature_dim):
+ def embedding_layer(input):
+ emb = fluid.layers.embedding(
+ input=input,
+ is_sparse=True,
+ is_distributed=False,
+ size=[sparse_feature_dim, embedding_size],
+ param_attr=fluid.ParamAttr(
+ name="SparseFeatFactors",
+ initializer=fluid.initializer.Uniform()))
+ x = fluid.layers.sequence_pool(input=emb, pool_type='sum')
+ return emb, x
+
+ def mlp_input_tensor(emb_sums, dense_tensor):
+ #if isinstance(dense_tensor, fluid.Variable):
+ # return fluid.layers.concat(emb_sums, axis=1)
+ #else:
+ return fluid.layers.concat(emb_sums + [dense_tensor], axis=1)
+
+ def mlp(mlp_input):
+ fc1 = fluid.layers.fc(input=mlp_input,
+ size=400,
+ act='relu',
+ param_attr=fluid.ParamAttr(
+ initializer=fluid.initializer.Normal(
+ scale=1 / math.sqrt(mlp_input.shape[1]))))
+ fc2 = fluid.layers.fc(input=fc1,
+ size=400,
+ act='relu',
+ param_attr=fluid.ParamAttr(
+ initializer=fluid.initializer.Normal(
+ scale=1 / math.sqrt(fc1.shape[1]))))
+ fc3 = fluid.layers.fc(input=fc2,
+ size=400,
+ act='relu',
+ param_attr=fluid.ParamAttr(
+ initializer=fluid.initializer.Normal(
+ scale=1 / math.sqrt(fc2.shape[1]))))
+ pre = fluid.layers.fc(input=fc3,
+ size=2,
+ act='softmax',
+ param_attr=fluid.ParamAttr(
+ initializer=fluid.initializer.Normal(
+ scale=1 / math.sqrt(fc3.shape[1]))))
+ return pre
+
+ emb_pair_sums = list(map(embedding_layer, sparse_inputs))
+ emb_sums = [x[1] for x in emb_pair_sums]
+ infer_vars = [x[0] for x in emb_pair_sums]
+ mlp_in = mlp_input_tensor(emb_sums, dense_input)
+ predict = mlp(mlp_in)
+ cost = fluid.layers.cross_entropy(input=predict, label=label)
+ avg_cost = fluid.layers.reduce_sum(cost)
+ accuracy = fluid.layers.accuracy(input=predict, label=label)
+ auc_var, batch_auc_var, auc_states = \
+ fluid.layers.auc(input=predict, label=label, num_thresholds=2 ** 12, slide_steps=20)
+ return predict, avg_cost, auc_var, batch_auc_var, infer_vars
diff --git a/python/examples/criteo_ctr_with_cube/test_client.py b/python/examples/criteo_ctr_with_cube/test_client.py
new file mode 100755
index 0000000000000000000000000000000000000000..de205ebc68af02e8dd978da51a4c43bef0cec0d4
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/test_client.py
@@ -0,0 +1,48 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+from paddle_serving_client import Client
+import sys
+import os
+import criteo as criteo
+import time
+from paddle_serving_client.metric import auc
+
+client = Client()
+client.load_client_config(sys.argv[1])
+client.connect(["127.0.0.1:9292"])
+
+batch = 1
+buf_size = 100
+dataset = criteo.CriteoDataset()
+dataset.setup(1000001)
+test_filelists = ["{}/part-0".format(sys.argv[2])]
+reader = dataset.infer_reader(test_filelists, batch, buf_size)
+label_list = []
+prob_list = []
+start = time.time()
+for ei in range(10000):
+ data = reader().next()
+ feed_dict = {}
+ feed_dict['dense_input'] = data[0][0]
+ for i in range(1, 27):
+ feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i]
+ fetch_map = client.predict(feed=feed_dict, fetch=["prob"])
+ prob_list.append(fetch_map['prob'][1])
+ label_list.append(data[0][-1][0])
+
+print(auc(label_list, prob_list))
+end = time.time()
+print(end - start)
diff --git a/python/examples/criteo_ctr_with_cube/test_server.py b/python/examples/criteo_ctr_with_cube/test_server.py
new file mode 100755
index 0000000000000000000000000000000000000000..5399ace839a00071c0ed9ce384e5523b68db27fc
--- /dev/null
+++ b/python/examples/criteo_ctr_with_cube/test_server.py
@@ -0,0 +1,37 @@
+# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pylint: disable=doc-string-missing
+
+import os
+import sys
+from paddle_serving_server import OpMaker
+from paddle_serving_server import OpSeqMaker
+from paddle_serving_server import Server
+
+op_maker = OpMaker()
+read_op = op_maker.create('general_reader')
+general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer')
+response_op = op_maker.create('general_response')
+
+op_seq_maker = OpSeqMaker()
+op_seq_maker.add_op(read_op)
+op_seq_maker.add_op(general_dist_kv_infer_op)
+op_seq_maker.add_op(response_op)
+
+server = Server()
+server.set_op_sequence(op_seq_maker.get_op_sequence())
+server.set_num_threads(4)
+server.load_model_config(sys.argv[1])
+server.prepare_server(workdir="work_dir1", port=9292, device="cpu")
+server.run_server()
diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py
index c77034b5ce9c811d3a5b0c42701b191870ffe45c..ce0eb8c83d1eabb79e0e51608c9b2e906faa4c70 100644
--- a/python/paddle_serving_client/__init__.py
+++ b/python/paddle_serving_client/__init__.py
@@ -228,16 +228,17 @@ class Client(object):
fetch_names, result_batch, self.pid)
result_map_batch = []
- for index in range(batch_size):
- result_map = {}
- for i, name in enumerate(fetch_names):
- if self.fetch_names_to_type_[name] == int_type:
- result_map[name] = result_batch.get_int64_by_name(name)[
- index]
- elif self.fetch_names_to_type_[name] == float_type:
- result_map[name] = result_batch.get_float_by_name(name)[
- index]
- result_map_batch.append(result_map)
+ result_map = {}
+ for i, name in enumerate(fetch_names):
+ if self.fetch_names_to_type_[name] == int_type:
+ result_map[name] = result_batch.get_int64_by_name(name)
+ elif self.fetch_names_to_type_[name] == float_type:
+ result_map[name] = result_batch.get_float_by_name(name)
+ for i in range(batch_size):
+ single_result = {}
+ for key in result_map:
+ single_result[key] = result_map[key][i]
+ result_map_batch.append(single_result)
return result_map_batch
diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py
index 87c6b4960e439dd529cc0354c667c244369c133a..55c5f8b4ae2f4728be87a9f5ed15e28e584b66c6 100644
--- a/python/paddle_serving_server/__init__.py
+++ b/python/paddle_serving_server/__init__.py
@@ -32,6 +32,7 @@ class OpMaker(object):
"general_text_reader": "GeneralTextReaderOp",
"general_text_response": "GeneralTextResponseOp",
"general_single_kv": "GeneralSingleKVOp",
+ "general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp",
"general_copy": "GeneralCopyOp"
}
@@ -82,6 +83,7 @@ class Server(object):
self.infer_service_fn = "infer_service.prototxt"
self.model_toolkit_fn = "model_toolkit.prototxt"
self.general_model_config_fn = "general_model.prototxt"
+ self.cube_config_fn = "cube.conf"
self.workdir = ""
self.max_concurrency = 0
self.num_threads = 4
@@ -157,6 +159,11 @@ class Server(object):
"w") as fout:
fout.write(str(self.model_conf))
self.resource_conf = server_sdk.ResourceConf()
+ for workflow in self.workflow_conf.workflows:
+ for node in workflow.nodes:
+ if "dist_kv" in node.name:
+ self.resource_conf.cube_config_path = workdir
+ self.resource_conf.cube_config_file = self.cube_config_fn
self.resource_conf.model_toolkit_path = workdir
self.resource_conf.model_toolkit_file = self.model_toolkit_fn
self.resource_conf.general_model_path = workdir
@@ -295,6 +302,6 @@ class Server(object):
self.workdir,
self.workflow_fn,
self.num_threads)
- print("Going to Run Comand")
+ print("Going to Run Command")
print(command)
os.system(command)
diff --git a/tools/serving_build.sh b/tools/serving_build.sh
index da43eb47914190058f13e970d649b243d2538907..0fc2dcdf0a521c2088c9d1c03ad01f713f33a1f7 100644
--- a/tools/serving_build.sh
+++ b/tools/serving_build.sh
@@ -47,7 +47,7 @@ function build_server() {
-DPYTHON_LIBRARIES=$PYTHONROOT/lib64/libpython2.7.so \
-DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \
-DCLIENT_ONLY=OFF ..
- check_cmd "make -j2 >/dev/null"
+ check_cmd "make -j2 >/dev/null && make install -j2 >/dev/null"
pip install -U python/dist/paddle_serving_server* >/dev/null
;;
GPU)
@@ -56,7 +56,7 @@ function build_server() {
-DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \
-DCLIENT_ONLY=OFF \
-DWITH_GPU=ON ..
- check_cmd "make -j2 >/dev/null"
+ check_cmd "make -j2 >/dev/null && make install -j2 >/dev/null"
pip install -U python/dist/paddle_serving_server* >/dev/null
;;
*)
@@ -78,6 +78,7 @@ function python_test_fit_a_line() {
cd fit_a_line # pwd: /Serving/python/examples/fit_a_line
sh get_data.sh
local TYPE=$1
+ echo $TYPE
case $TYPE in
CPU)
# test rpc
@@ -127,12 +128,45 @@ function python_test_fit_a_line() {
cd .. # pwd: /Serving/python/examples
}
+function python_run_criteo_ctr_with_cube() {
+ # pwd: /Serving/python/examples
+ local TYPE=$1
+ yum install -y bc >/dev/null
+ cd criteo_ctr_with_cube # pwd: /Serving/python/examples/criteo_ctr_with_cube
+ check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz"
+ check_cmd "tar xf ctr_cube_unittest.tar.gz"
+ check_cmd "mv models/ctr_client_conf ./"
+ check_cmd "mv models/ctr_serving_model_kv ./"
+ check_cmd "mv models/data ./cube/"
+ check_cmd "mv models/ut_data ./"
+ cp ../../../build-server-$TYPE/output/bin/cube* ./cube/
+ mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
+ yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
+
+ sh cube_prepare.sh &
+ check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/"
+ python test_server.py ctr_serving_model_kv &
+ check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score"
+ AUC=$(tail -n 2 score | awk 'NR==1')
+ VAR2="0.70"
+ RES=$( echo "$AUC>$VAR2" | bc )
+ if [[ $RES -eq 0 ]]; then
+ echo "error with criteo_ctr_with_cube inference auc test, auc should > 0.70"
+ exit 1
+ fi
+ echo "criteo_ctr_with_cube inference auc test success"
+ ps -ef | grep "paddle_serving_server" | grep -v grep | awk '{print $2}' | xargs kill
+ ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill
+ cd .. # pwd: /Serving/python/examples
+}
+
function python_run_test() {
# Using the compiled binary
local TYPE=$1 # pwd: /Serving
export SERVING_BIN=$PWD/build-server-${TYPE}/core/general-server/serving
cd python/examples # pwd: /Serving/python/examples
python_test_fit_a_line $TYPE # pwd: /Serving/python/examples
+ python_run_criteo_ctr_with_cube $TYPE # pwd: /Serving/python/examples
echo "test python $TYPE part finished as expected."
cd ../.. # pwd: /Serving
}