diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto index cc508e8eaa9ccad1baa022a9c962094213ce4039..bbc4487d69757636178048da05ea608be0b9466a 100644 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -52,9 +52,10 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; }; message ResourceConf { required string model_toolkit_path = 1; required string model_toolkit_file = 2; - optional string cube_config_file = 3; - optional string general_model_path = 4; - optional string general_model_file = 5; + optional string general_model_path = 3; + optional string general_model_file = 4; + optional string cube_config_path = 5; + optional string cube_config_file = 6; }; // DAG node depency info diff --git a/core/cube/cube-api/include/cube_api.h b/core/cube/cube-api/include/cube_api.h index 90a6c690e061a8e11697aa5a82b5d38200cf22cc..21d1fea38698c8f202e02d79e662d718a05f9499 100644 --- a/core/cube/cube-api/include/cube_api.h +++ b/core/cube/cube-api/include/cube_api.h @@ -99,6 +99,13 @@ class CubeAPI { std::function parse, std::string* version); + /** + * @brief: get all table names from cube server, thread safe. + * @param [out] vals: vector of table names + * + */ + std::vector get_table_names(); + public: static const char* error_msg(int error_code); diff --git a/core/cube/cube-api/src/cube_api.cpp b/core/cube/cube-api/src/cube_api.cpp index 4963d307171c7e32fc4add720c092c257d063e83..c38c8bb7f96ad205460400ff5540bc0deb47b835 100644 --- a/core/cube/cube-api/src/cube_api.cpp +++ b/core/cube/cube-api/src/cube_api.cpp @@ -682,5 +682,13 @@ int CubeAPI::opt_seek(const std::string& dict_name, return ret; } +std::vector CubeAPI::get_table_names() { + const std::vector metas = _meta->metas(); + std::vector table_names; + for (auto itr = metas.begin(); itr != metas.end(); ++itr) { + table_names.push_back((*itr)->dict_name); + } + return table_names; +} } // namespace mcube } // namespace rec diff --git a/core/general-server/CMakeLists.txt b/core/general-server/CMakeLists.txt index 82ce53462cb0a5f775cb0ee85b827bba38f5c306..9056e229a51f56463dc2eec5629f219d00dc6a38 100644 --- a/core/general-server/CMakeLists.txt +++ b/core/general-server/CMakeLists.txt @@ -1,3 +1,4 @@ +include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../../) include(op/CMakeLists.txt) include(proto/CMakeLists.txt) add_executable(serving ${serving_srcs}) diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp new file mode 100755 index 0000000000000000000000000000000000000000..ac4e7bb23e9410aede4fd353099d3c90ce91bcd3 --- /dev/null +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -0,0 +1,167 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "core/general-server/op/general_dist_kv_infer_op.h" +#include +#include +#include +#include +#include +#include +#include "core/cube/cube-api/include/cube_api.h" +#include "core/predictor/framework/infer.h" +#include "core/predictor/framework/memory.h" +#include "core/predictor/framework/resource.h" +#include "core/util/include/timer.h" + +namespace baidu { +namespace paddle_serving { +namespace serving { + +using baidu::paddle_serving::Timer; +using baidu::paddle_serving::predictor::MempoolWrapper; +using baidu::paddle_serving::predictor::general_model::Tensor; +using baidu::paddle_serving::predictor::general_model::Response; +using baidu::paddle_serving::predictor::general_model::Request; +using baidu::paddle_serving::predictor::general_model::FetchInst; +using baidu::paddle_serving::predictor::InferManager; +using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; + +int GeneralDistKVInferOp::inference() { + VLOG(2) << "Going to run inference"; + const GeneralBlob *input_blob = get_depend_argument(pre_name()); + VLOG(2) << "Get precedent op name: " << pre_name(); + GeneralBlob *output_blob = mutable_data(); + + if (!input_blob) { + LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); + return -1; + } + + const TensorVector *in = &input_blob->tensor_vector; + TensorVector *out = &output_blob->tensor_vector; + int batch_size = input_blob->GetBatchSize(); + VLOG(2) << "input batch size: " << batch_size; + std::vector keys; + std::vector values; + int sparse_count = 0; + int dense_count = 0; + std::vector> dataptr_size_pairs; + size_t key_len = 0; + for (size_t i = 0; i < in->size(); ++i) { + if (in->at(i).dtype != paddle::PaddleDType::INT64) { + ++dense_count; + continue; + } + ++sparse_count; + size_t elem_num = 1; + for (size_t s = 0; s < in->at(i).shape.size(); ++s) { + elem_num *= in->at(i).shape[s]; + } + key_len += elem_num; + int64_t *data_ptr = static_cast(in->at(i).data.data()); + dataptr_size_pairs.push_back(std::make_pair(data_ptr, elem_num)); + } + keys.resize(key_len); + int key_idx = 0; + for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) { + std::copy(dataptr_size_pairs[i].first, + dataptr_size_pairs[i].first + dataptr_size_pairs[i].second, + keys.begin() + key_idx); + key_idx += dataptr_size_pairs[i].second; + } + rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); + std::vector table_names = cube->get_table_names(); + if (table_names.size() == 0) { + LOG(ERROR) << "cube init error or cube config not given."; + return -1; + } + int ret = cube->seek(table_names[0], keys, &values); + + if (values.size() != keys.size() || values[0].buff.size() == 0) { + LOG(ERROR) << "cube value return null"; + } + size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); + TensorVector sparse_out; + sparse_out.resize(sparse_count); + TensorVector dense_out; + dense_out.resize(dense_count); + int cube_val_idx = 0; + int sparse_idx = 0; + int dense_idx = 0; + std::unordered_map in_out_map; + baidu::paddle_serving::predictor::Resource &resource = + baidu::paddle_serving::predictor::Resource::instance(); + std::shared_ptr model_config = + resource.get_general_model_config(); + for (size_t i = 0; i < in->size(); ++i) { + if (in->at(i).dtype != paddle::PaddleDType::INT64) { + dense_out[dense_idx] = in->at(i); + ++dense_idx; + continue; + } + + sparse_out[sparse_idx].lod.resize(in->at(i).lod.size()); + for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) { + sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size()); + std::copy(in->at(i).lod[x].begin(), + in->at(i).lod[x].end(), + sparse_out[sparse_idx].lod[x].begin()); + } + sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32; + sparse_out[sparse_idx].shape.push_back( + sparse_out[sparse_idx].lod[0].back()); + sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE); + sparse_out[sparse_idx].name = model_config->_feed_name[i]; + sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() * + EMBEDDING_SIZE * sizeof(float)); + float *dst_ptr = static_cast(sparse_out[sparse_idx].data.data()); + for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { + float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; + memcpy(data_ptr, + values[cube_val_idx].buff.data(), + values[cube_val_idx].buff.size()); + cube_val_idx++; + } + ++sparse_idx; + } + TensorVector infer_in; + infer_in.insert(infer_in.end(), dense_out.begin(), dense_out.end()); + infer_in.insert(infer_in.end(), sparse_out.begin(), sparse_out.end()); + + output_blob->SetBatchSize(batch_size); + + VLOG(2) << "infer batch size: " << batch_size; + + Timer timeline; + int64_t start = timeline.TimeStampUS(); + timeline.Start(); + + if (InferManager::instance().infer( + GENERAL_MODEL_NAME, &infer_in, out, batch_size)) { + LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; + return -1; + } + + int64_t end = timeline.TimeStampUS(); + CopyBlobInfo(input_blob, output_blob); + AddBlobInfo(output_blob, start); + AddBlobInfo(output_blob, end); + return 0; +} +DEFINE_OP(GeneralDistKVInferOp); + +} // namespace serving +} // namespace paddle_serving +} // namespace baidu diff --git a/core/general-server/op/general_dist_kv_infer_op.h b/core/general-server/op/general_dist_kv_infer_op.h new file mode 100644 index 0000000000000000000000000000000000000000..2dee5bca6f9e12dbb8b36a6c39aa0a8e77763d23 --- /dev/null +++ b/core/general-server/op/general_dist_kv_infer_op.h @@ -0,0 +1,46 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#ifdef BCLOUD +#ifdef WITH_GPU +#include "paddle/paddle_inference_api.h" +#else +#include "paddle/fluid/inference/api/paddle_inference_api.h" +#endif +#else +#include "paddle_inference_api.h" // NOLINT +#endif +#include "core/general-server/general_model_service.pb.h" +#include "core/general-server/op/general_infer_helper.h" + +namespace baidu { +namespace paddle_serving { +namespace serving { + +class GeneralDistKVInferOp + : public baidu::paddle_serving::predictor::OpWithChannel { + public: + typedef std::vector TensorVector; + + DECLARE_OP(GeneralDistKVInferOp); + + int inference(); +}; + +} // namespace serving +} // namespace paddle_serving +} // namespace baidu diff --git a/core/predictor/CMakeLists.txt b/core/predictor/CMakeLists.txt index 38d977f1d2aeee556c46bb16da32bd2b20de3911..1b9dc7b29845a2b8c7f958c1d8e836cb57e91d41 100644 --- a/core/predictor/CMakeLists.txt +++ b/core/predictor/CMakeLists.txt @@ -3,7 +3,7 @@ include(common/CMakeLists.txt) include(op/CMakeLists.txt) include(mempool/CMakeLists.txt) include(framework/CMakeLists.txt) -#include(plugin/CMakeLists.txt) +include(tools/CMakeLists.txt) include(src/CMakeLists.txt) diff --git a/core/predictor/common/CMakeLists.txt b/core/predictor/common/CMakeLists.txt index 0eebb3a7b79ba1f71930ba8453fd541ecd312675..7beb2317f2b6e84c1424b8ae84de7ea8851e92dc 100644 --- a/core/predictor/common/CMakeLists.txt +++ b/core/predictor/common/CMakeLists.txt @@ -1,2 +1,2 @@ -FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) +FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/constant.cpp) LIST(APPEND pdserving_srcs ${common_srcs}) diff --git a/core/predictor/framework/resource.cpp b/core/predictor/framework/resource.cpp index f4f9091f689c36bc853730e0dd68cba089c7fb8b..11b015bfcb2f3c4f72a67a5687c4fdfc95481a31 100644 --- a/core/predictor/framework/resource.cpp +++ b/core/predictor/framework/resource.cpp @@ -141,6 +141,17 @@ int Resource::initialize(const std::string& path, const std::string& file) { LOG(ERROR) << "unable to create tls_bthread_key of thrd_data"; return -1; } + // init rocksDB or cube instance + if (resource_conf.has_cube_config_file() && + resource_conf.has_cube_config_path()) { + LOG(INFO) << "init cube client, path[ " << resource_conf.cube_config_path() + << " ], config file [ " << resource_conf.cube_config_file() + << " ]."; + rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); + std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() + + "/" + resource_conf.cube_config_file(); + this->cube_config_fullpath = cube_config_fullpath; + } THREAD_SETSPECIFIC(_tls_bspec_key, NULL); return 0; @@ -149,6 +160,15 @@ int Resource::initialize(const std::string& path, const std::string& file) { // model config int Resource::general_model_initialize(const std::string& path, const std::string& file) { + if (this->cube_config_fullpath.size() != 0) { + LOG(INFO) << "init cube by config file : " << this->cube_config_fullpath; + rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); + int ret = cube->init(this->cube_config_fullpath.c_str()); + if (ret != 0) { + LOG(ERROR) << "cube init error"; + return -1; + } + } VLOG(2) << "general model path: " << path; VLOG(2) << "general model file: " << file; if (!FLAGS_enable_general_model) { diff --git a/core/predictor/framework/resource.h b/core/predictor/framework/resource.h index 3cf494b97667474157dc744caea4bc77a6b8b443..1a648f0fa363efa4eb915a64553949206ec96153 100644 --- a/core/predictor/framework/resource.h +++ b/core/predictor/framework/resource.h @@ -108,6 +108,7 @@ class Resource { private: int thread_finalize() { return 0; } std::shared_ptr _config; + std::string cube_config_fullpath; THREAD_KEY_T _tls_bspec_key; }; diff --git a/core/predictor/proto/CMakeLists.txt b/core/predictor/proto/CMakeLists.txt index 773b5bfccea2a787409eb9be4b5722dd85f5fe61..1a9e2d35ef350cbe3dfb62d02d08dad19794300c 100644 --- a/core/predictor/proto/CMakeLists.txt +++ b/core/predictor/proto/CMakeLists.txt @@ -7,6 +7,7 @@ LIST(APPEND protofiles ${CMAKE_CURRENT_LIST_DIR}/./builtin_format.proto ${CMAKE_CURRENT_LIST_DIR}/./msg_data.proto ${CMAKE_CURRENT_LIST_DIR}/./xrecord_format.proto + ${CMAKE_CURRENT_LIST_DIR}/./framework.proto ) PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles}) diff --git a/core/predictor/proto/framework.proto b/core/predictor/proto/framework.proto new file mode 100755 index 0000000000000000000000000000000000000000..c0fb3ada51bf7b7af41f602bf08d3c9596884aab --- /dev/null +++ b/core/predictor/proto/framework.proto @@ -0,0 +1,217 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +syntax = "proto2"; +option optimize_for = LITE_RUNTIME; +package paddle.framework.proto; + +// Any incompatible changes to ProgramDesc and its dependencies should +// raise the version defined version.h. +// +// Serailization and Deserialization codes should be modified in a way +// that supports old versions following the version and compatibility policy. +message Version { optional int64 version = 1 [ default = 0 ]; } + +enum AttrType { + INT = 0; + FLOAT = 1; + STRING = 2; + INTS = 3; + FLOATS = 4; + STRINGS = 5; + BOOLEAN = 6; + BOOLEANS = 7; + BLOCK = 8; + LONG = 9; + BLOCKS = 10; + LONGS = 11; +} + +// OpDesc describes an instance of a C++ framework::OperatorBase +// derived class type. +message OpDesc { + + message Attr { + required string name = 1; + required AttrType type = 2; + optional int32 i = 3; + optional float f = 4; + optional string s = 5; + repeated int32 ints = 6; + repeated float floats = 7; + repeated string strings = 8; + optional bool b = 10; + repeated bool bools = 11; + optional int32 block_idx = 12; + optional int64 l = 13; + repeated int32 blocks_idx = 14; + repeated int64 longs = 15; + }; + + message Var { + required string parameter = 1; + repeated string arguments = 2; + }; + + required string type = 3; + repeated Var inputs = 1; + repeated Var outputs = 2; + repeated Attr attrs = 4; + optional bool is_target = 5 [ default = false ]; +}; + +// OpProto describes a C++ framework::OperatorBase derived class. +message OpProto { + + // VarProto describes the C++ type framework::Variable. + message Var { + required string name = 1; + required string comment = 2; + + optional bool duplicable = 3 [ default = false ]; + optional bool intermediate = 4 [ default = false ]; + optional bool dispensable = 5 [ default = false ]; + } + + // AttrProto describes the C++ type Attribute. + message Attr { + required string name = 1; + required AttrType type = 2; + required string comment = 3; + // If that attribute is generated, it means the Paddle third + // language binding has responsibility to fill that + // attribute. End-User should not set that attribute. + optional bool generated = 4 [ default = false ]; + } + + required string type = 1; + repeated Var inputs = 2; + repeated Var outputs = 3; + repeated Attr attrs = 4; + required string comment = 5; +} + +message VarType { + enum Type { + // Pod Types + BOOL = 0; + INT16 = 1; + INT32 = 2; + INT64 = 3; + FP16 = 4; + FP32 = 5; + FP64 = 6; + // Tensor is used in C++. + SIZE_T = 19; + UINT8 = 20; + INT8 = 21; + + // Other types that may need additional descriptions + LOD_TENSOR = 7; + SELECTED_ROWS = 8; + FEED_MINIBATCH = 9; + FETCH_LIST = 10; + STEP_SCOPES = 11; + LOD_RANK_TABLE = 12; + LOD_TENSOR_ARRAY = 13; + PLACE_LIST = 14; + READER = 15; + // Any runtime decided variable type is raw + // raw variables should manage their own allocations + // in operators like nccl_op + RAW = 17; + TUPLE = 18; + } + + required Type type = 1; + + message TensorDesc { + // Should only be PODType. Is enforced in C++ + required Type data_type = 1; + repeated int64 dims = 2; // [UNK, 640, 480] is saved as [-1, 640, 480] + } + optional TensorDesc selected_rows = 2; + + message LoDTensorDesc { + required TensorDesc tensor = 1; + optional int32 lod_level = 2 [ default = 0 ]; + } + optional LoDTensorDesc lod_tensor = 3; + + message LoDTensorArrayDesc { + required TensorDesc tensor = 1; + optional int32 lod_level = 2 [ default = 0 ]; + } + optional LoDTensorArrayDesc tensor_array = 4; + + message ReaderDesc { repeated LoDTensorDesc lod_tensor = 1; } + optional ReaderDesc reader = 5; + + message Tuple { repeated Type element_type = 1; } + optional Tuple tuple = 7; +} + +message VarDesc { + required string name = 1; + required VarType type = 2; + optional bool persistable = 3 [ default = false ]; + // True if the variable is an input data and + // have to check the feed data shape and dtype + optional bool need_check_feed = 4 [ default = false ]; +} + +message BlockDesc { + required int32 idx = 1; + required int32 parent_idx = 2; + repeated VarDesc vars = 3; + repeated OpDesc ops = 4; + optional int32 forward_block_idx = 5 [ default = -1 ]; +} + +// CompatibleInfo is used to determine if a feature is compatible and +// provides the information. +message CompatibleInfo { + enum Type { + COMPATIBLE = 0; + DEFINITELY_NOT = 1; + POSSIBLE = 2; + BUG_FIX = 3; + PRECISION_CHANGE = 4; + } + required string version = 1; + required Type type = 2; +} + +// In some cases, Paddle Fluid may perform operator definition iterations, +// and the operator uses OpCompatibleMap for compatibility testing. +message OpCompatibleMap { + message OpCompatiblePair { + required string op_name = 1; + required CompatibleInfo compatible_info = 2; + } + repeated OpCompatiblePair pair = 1; + optional string default_required_version = 2; +} + +// Please refer to +// https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/program.md +// for more details. +// TODO(panyx0718): A model can have multiple programs. Need a +// way to distinguish them. Maybe ID or name? +message ProgramDesc { + reserved 2; // For backward compatibility. + repeated BlockDesc blocks = 1; + optional Version version = 4; + optional OpCompatibleMap op_compatible_map = 3; +} diff --git a/core/predictor/tools/CMakeLists.txt b/core/predictor/tools/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f74e3ea571933665b4b8a3fc795ce4db3f1b1493 --- /dev/null +++ b/core/predictor/tools/CMakeLists.txt @@ -0,0 +1,4 @@ +set(seq_gen_src ${CMAKE_CURRENT_LIST_DIR}/seq_generator.cpp ${CMAKE_CURRENT_LIST_DIR}/seq_file.cpp) +LIST(APPEND seq_gen_src ${PROTO_SRCS}) +add_executable(seq_generator ${seq_gen_src}) +target_link_libraries(seq_generator protobuf -lpthread) diff --git a/core/predictor/tools/seq_file.cpp b/core/predictor/tools/seq_file.cpp new file mode 100644 index 0000000000000000000000000000000000000000..839b455d048752df060c5bc073d40906c2c05089 --- /dev/null +++ b/core/predictor/tools/seq_file.cpp @@ -0,0 +1,83 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include +#include "seq_file.h" + +SeqFileWriter::SeqFileWriter(const char *file) { + _fs = new std::ofstream(file, std::ios::binary); + std::srand(std::time(0)); + for (int i = 0; i < SYNC_MARKER_SIZE; ++i) { + _sync_marker[i] = std::rand() % 255; + } + + _write_seq_header(); + + _bytes_to_prev_sync = 0; +} + +void SeqFileWriter::close() { + _fs->close(); + delete _fs; +} + +SeqFileWriter::~SeqFileWriter() { close(); } + +void SeqFileWriter::_write_sync_marker() { + char begin[] = {'\xFF', '\xFF', '\xFF', '\xFF'}; + _fs->write(begin, 4); + + _fs->write(_sync_marker, SYNC_MARKER_SIZE); +} + +void SeqFileWriter::_write_seq_header() { + _fs->write(SEQ_HEADER, sizeof(SEQ_HEADER) - 1); + _fs->write(_sync_marker, SYNC_MARKER_SIZE); +} + +int SeqFileWriter::write(const char *key, + size_t key_len, + const char *value, + size_t value_len) { + if (key_len != sizeof(int64_t)) { + std::cout << "Key length not equal to " << sizeof(int64_t) << std::endl; + return -1; + } + + uint32_t record_len = key_len + value_len; + uint32_t b_record_len = htonl(record_len); + uint32_t b_key_len = htonl((uint32_t)key_len); + // std::cout << "b_record_len " << b_record_len << " record_len " << + // record_len << std::endl; + _fs->write((char *)&b_record_len, sizeof(uint32_t)); + _fs->write((char *)&b_key_len, sizeof(uint32_t)); + _fs->write(key, key_len); + _fs->write(value, value_len); + _bytes_to_prev_sync += record_len; + + if (_bytes_to_prev_sync >= SYNC_INTERVAL) { + _write_sync_marker(); + _bytes_to_prev_sync = 0; + } + + return 0; +} + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/core/predictor/tools/seq_file.h b/core/predictor/tools/seq_file.h new file mode 100644 index 0000000000000000000000000000000000000000..ae3944265e2fa1b01a6d0d34c19c0710a45f6cac --- /dev/null +++ b/core/predictor/tools/seq_file.h @@ -0,0 +1,52 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __SEQ_FILE_H_ +#define __SEQ_FILE_H_ + +#include + +const int SYNC_MARKER_SIZE = 16; +const char SEQ_HEADER[] = + "SEQ\x06" + "\"org.apache.hadoop.io.BytesWritable\"" + "org.apache.hadoop.io.BytesWritable" + "\x00\x00\x00\x00\x00\x00"; +const int SYNC_INTERVAL = 2000; + +class SeqFileWriter { + public: + SeqFileWriter(const char *file); + ~SeqFileWriter(); + + public: + int write(const char *key, + size_t key_len, + const char *value, + size_t value_len); + + private: + void close(); + void _write_sync_marker(); + void _write_seq_header(); + + private: + char _sync_marker[SYNC_MARKER_SIZE]; + int _bytes_to_prev_sync; + std::ofstream *_fs; +}; + +#endif //__SEQ_FILE_H_ + +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/core/predictor/tools/seq_generator.cpp b/core/predictor/tools/seq_generator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2efda77485b919a3fd14d05b43fa4729c97234fb --- /dev/null +++ b/core/predictor/tools/seq_generator.cpp @@ -0,0 +1,116 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include "core/predictor/framework.pb.h" +#include "seq_file.h" +using paddle::framework::proto::VarType; +std::map var_type_size; +void reg_var_types() { + var_type_size[static_cast(VarType::FP16)] = sizeof(int16_t); + var_type_size[static_cast(VarType::FP32)] = sizeof(float); + var_type_size[static_cast(VarType::FP64)] = sizeof(double); + var_type_size[static_cast(VarType::INT32)] = sizeof(int); + var_type_size[static_cast(VarType::INT64)] = sizeof(int64_t); + var_type_size[static_cast(VarType::BOOL)] = sizeof(bool); + var_type_size[static_cast(VarType::SIZE_T)] = sizeof(size_t); + var_type_size[static_cast(VarType::INT16)] = sizeof(int16_t); + var_type_size[static_cast(VarType::UINT8)] = sizeof(uint8_t); + var_type_size[static_cast(VarType::INT8)] = sizeof(int8_t); +} +int dump_parameter(const char *input_file, const char *output_file) { + std::ifstream is(input_file); + // the 1st field, unit32_t version for LoDTensor + uint32_t version; + is.read(reinterpret_cast(&version), sizeof(version)); + if (version != 0) { + std::cout << "Version number " << version << " not supported" << std::endl; + return -1; + } + // the 2st field, LoD information + uint64_t lod_level; + is.read(reinterpret_cast(&lod_level), sizeof(lod_level)); + std::vector> lod; + lod.resize(lod_level); + for (uint64_t i = 0; i < lod_level; ++i) { + uint64_t size; + is.read(reinterpret_cast(&size), sizeof(size)); + std::vector tmp(size / sizeof(size_t)); + is.read(reinterpret_cast(tmp.data()), + static_cast(size)); + lod[i] = tmp; + } + // the 3st filed, Tensor + // Note: duplicate version field + is.read(reinterpret_cast(&version), sizeof(version)); + if (version != 0) { + std::cout << "Version number " << version << " not supported" << std::endl; + return -1; + } + // int32_t size + // proto buffer + VarType::TensorDesc desc; + int32_t size; + is.read(reinterpret_cast(&size), sizeof(size)); + std::unique_ptr buf(new char[size]); + is.read(reinterpret_cast(buf.get()), size); + if (!desc.ParseFromArray(buf.get(), size)) { + std::cout << "Cannot parse tensor desc" << std::endl; + return -1; + } + // read tensor + std::vector dims; + dims.reserve(static_cast(desc.dims().size())); + std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims)); + std::cout << "Dims:"; + for (auto x : dims) { + std::cout << " " << x; + } + std::cout << std::endl; + if (dims.size() != 2) { + std::cout << "Parameter dims not 2D" << std::endl; + return -1; + } + size_t numel = 1; + for (auto x : dims) { + numel *= x; + } + size_t buf_size = numel * var_type_size[desc.data_type()]; + char *tensor_buf = new char[buf_size]; + is.read(static_cast(tensor_buf), buf_size); + is.close(); + SeqFileWriter seq_file_writer(output_file); + int value_buf_len = var_type_size[desc.data_type()] * dims[1]; + char *value_buf = new char[value_buf_len]; + size_t offset = 0; + for (int64_t i = 0; i < dims[0]; ++i) { + // std::cout << "key_len " << key_len << " value_len " << value_buf_len << + // std::endl; + memcpy(value_buf, tensor_buf + offset, value_buf_len); + seq_file_writer.write((char *)&i, sizeof(i), value_buf, value_buf_len); + offset += value_buf_len; + } + return 0; +} +int main(int argc, char **argv) { + if (argc != 3) { + std::cout << "Usage: seq_generator PARAMETER_FILE OUTPUT_FILE" << std::endl; + return -1; + } + reg_var_types(); + dump_parameter(argv[1], argv[2]); +} +/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/doc/criteo-cube-benchmark-avgcost.png b/doc/criteo-cube-benchmark-avgcost.png new file mode 100644 index 0000000000000000000000000000000000000000..5db3a04ffb87ac57202aa6d1e92ead3449b0b5d1 Binary files /dev/null and b/doc/criteo-cube-benchmark-avgcost.png differ diff --git a/doc/criteo-cube-benchmark-qps.png b/doc/criteo-cube-benchmark-qps.png new file mode 100644 index 0000000000000000000000000000000000000000..4b7ce971ff5840c06beda2bd20a55cc585c9d11f Binary files /dev/null and b/doc/criteo-cube-benchmark-qps.png differ diff --git a/python/examples/criteo_ctr_with_cube/README.md b/python/examples/criteo_ctr_with_cube/README.md new file mode 100755 index 0000000000000000000000000000000000000000..9c80f935454ca5b6c5de961c4f06c83ebca5a5b3 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/README.md @@ -0,0 +1,61 @@ +## 带稀疏参数服务器的CTR预测服务 + +### 获取样例数据 +``` +sh get_data.sh +``` + +### 保存模型和配置文件 +``` +python local_train.py +``` +执行脚本后会在当前目录生成ctr_server_model和ctr_client_config文件夹,以及ctr_server_model_kv, ctr_client_conf_kv。 + +### 启动稀疏参数服务器 +``` +cp ../../../build_server/core/predictor/seq_generator seq_generator +cp ../../../build_server/output/bin/cube* ./cube/ +sh cube_prepare.sh & +``` + +### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置) + +``` +python test_server.py ctr_serving_model_kv +``` + +### 执行预测 + +``` +python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data +``` + +### Benchmark + +设备 :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz + +模型 :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/ctr_criteo_with_cube/network_conf.py) + +server core/thread num : 4/8 + +执行 +``` +bash benchmark.sh +``` +客户端每个线程会发送1000个batch + +| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps | +| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- | +| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 | +| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 | +| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 | +| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 | +| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 | + +平均每个线程耗时图如下 + +![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png) + +每个线程QPS耗时如下 + +![qps](../../../doc/criteo-cube-benchmark-qps.png) diff --git a/python/examples/criteo_ctr_with_cube/args.py b/python/examples/criteo_ctr_with_cube/args.py new file mode 100755 index 0000000000000000000000000000000000000000..30124d4ebd9cd27cdb4580e654a8a47c55b178bf --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/args.py @@ -0,0 +1,105 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +import argparse + + +def parse_args(): + parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") + parser.add_argument( + '--train_data_path', + type=str, + default='./data/raw/train.txt', + help="The path of training dataset") + parser.add_argument( + '--sparse_only', + type=bool, + default=False, + help="Whether we use sparse features only") + parser.add_argument( + '--test_data_path', + type=str, + default='./data/raw/valid.txt', + help="The path of testing dataset") + parser.add_argument( + '--batch_size', + type=int, + default=1000, + help="The size of mini-batch (default:1000)") + parser.add_argument( + '--embedding_size', + type=int, + default=10, + help="The size for embedding layer (default:10)") + parser.add_argument( + '--num_passes', + type=int, + default=10, + help="The number of passes to train (default: 10)") + parser.add_argument( + '--model_output_dir', + type=str, + default='models', + help='The path for model to store (default: models)') + parser.add_argument( + '--sparse_feature_dim', + type=int, + default=1000001, + help='sparse feature hashing space for index processing') + parser.add_argument( + '--is_local', + type=int, + default=1, + help='Local train or distributed train (default: 1)') + parser.add_argument( + '--cloud_train', + type=int, + default=0, + help='Local train or distributed train on paddlecloud (default: 0)') + parser.add_argument( + '--async_mode', + action='store_true', + default=False, + help='Whether start pserver in async mode to support ASGD') + parser.add_argument( + '--no_split_var', + action='store_true', + default=False, + help='Whether split variables into blocks when update_method is pserver') + parser.add_argument( + '--role', + type=str, + default='pserver', # trainer or pserver + help='The path for model to store (default: models)') + parser.add_argument( + '--endpoints', + type=str, + default='127.0.0.1:6000', + help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001') + parser.add_argument( + '--current_endpoint', + type=str, + default='127.0.0.1:6000', + help='The path for model to store (default: 127.0.0.1:6000)') + parser.add_argument( + '--trainer_id', + type=int, + default=0, + help='The path for model to store (default: models)') + parser.add_argument( + '--trainers', + type=int, + default=1, + help='The num of trianers, (default: 1)') + return parser.parse_args() diff --git a/python/examples/criteo_ctr_with_cube/benchmark.py b/python/examples/criteo_ctr_with_cube/benchmark.py new file mode 100755 index 0000000000000000000000000000000000000000..e5bde9f996fccc41027fa6d255ca227cba212e22 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/benchmark.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import Client +import sys +import os +import criteo as criteo +import time +from paddle_serving_client.utils import MultiThreadRunner +from paddle_serving_client.utils import benchmark_args +from paddle_serving_client.metric import auc + +args = benchmark_args() + + +def single_func(idx, resource): + client = Client() + client.load_client_config('ctr_client_conf/serving_client_conf.prototxt') + client.connect(['127.0.0.1:9292']) + batch = 1 + buf_size = 100 + dataset = criteo.CriteoDataset() + dataset.setup(1000001) + test_filelists = [ + "./raw_data/part-%d" % x for x in range(len(os.listdir("./raw_data"))) + ] + reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:], + batch, buf_size) + args.batch_size = 1 + if args.request == "rpc": + fetch = ["prob"] + print("Start Time") + start = time.time() + itr = 1000 + for ei in range(itr): + if args.batch_size == 1: + data = reader().next() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] + result = client.predict(feed=feed_dict, fetch=fetch) + else: + print("unsupport batch size {}".format(args.batch_size)) + + elif args.request == "http": + raise ("Not support http service.") + end = time.time() + qps = itr / (end - start) + return [[end - start, qps]] + + +if __name__ == '__main__': + multi_thread_runner = MultiThreadRunner() + endpoint_list = ["127.0.0.1:9292"] + #result = single_func(0, {"endpoint": endpoint_list}) + result = multi_thread_runner.run(single_func, args.thread, + {"endpoint": endpoint_list}) + avg_cost = 0 + qps = 0 + for i in range(args.thread): + avg_cost += result[0][i * 2 + 0] + qps += result[0][i * 2 + 1] + avg_cost = avg_cost / args.thread + print("average total cost {} s.".format(avg_cost)) + print("qps {} ins/s".format(qps)) diff --git a/python/examples/criteo_ctr_with_cube/benchmark.sh b/python/examples/criteo_ctr_with_cube/benchmark.sh new file mode 100755 index 0000000000000000000000000000000000000000..4bea258a5cfa4e12ed6848c61270fe44bbc7ba44 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/benchmark.sh @@ -0,0 +1,10 @@ +rm profile_log +batch_size=1 +for thread_num in 1 2 4 8 16 +do + $PYTHONROOT/bin/python benchmark.py --thread $thread_num --model ctr_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 + echo "========================================" + echo "batch size : $batch_size" >> profile_log + $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log + tail -n 2 profile >> profile_log +done diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.py b/python/examples/criteo_ctr_with_cube/benchmark_batch.py new file mode 100755 index 0000000000000000000000000000000000000000..b4b15892375e830486afa320151fac619aab2ba7 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/benchmark_batch.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import Client +import sys +import os +import criteo as criteo +import time +from paddle_serving_client.utils import MultiThreadRunner +from paddle_serving_client.utils import benchmark_args +from paddle_serving_client.metric import auc + +args = benchmark_args() + + +def single_func(idx, resource): + client = Client() + print([resource["endpoint"][idx % len(resource["endpoint"])]]) + client.load_client_config('ctr_client_conf/serving_client_conf.prototxt') + client.connect(['127.0.0.1:9292']) + batch = 1 + buf_size = 100 + dataset = criteo.CriteoDataset() + dataset.setup(1000001) + test_filelists = [ + "./raw_data/part-%d" % x for x in range(len(os.listdir("./raw_data"))) + ] + reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:], + batch, buf_size) + if args.request == "rpc": + fetch = ["prob"] + start = time.time() + itr = 1000 + for ei in range(itr): + if args.batch_size > 1: + feed_batch = [] + for bi in range(args.batch_size): + data = reader().next() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][ + i] + feed_batch.append(feed_dict) + result = client.batch_predict( + feed_batch=feed_batch, fetch=fetch) + else: + print("unsupport batch size {}".format(args.batch_size)) + + elif args.request == "http": + raise ("Not support http service.") + end = time.time() + qps = itr * args.batch_size / (end - start) + return [[end - start, qps]] + + +if __name__ == '__main__': + multi_thread_runner = MultiThreadRunner() + endpoint_list = ["127.0.0.1:9292"] + #result = single_func(0, {"endpoint": endpoint_list}) + result = multi_thread_runner.run(single_func, args.thread, + {"endpoint": endpoint_list}) + print(result) + avg_cost = 0 + qps = 0 + for i in range(args.thread): + avg_cost += result[0][i * 2 + 0] + qps += result[0][i * 2 + 1] + avg_cost = avg_cost / args.thread + print("average total cost {} s.".format(avg_cost)) + print("qps {} ins/s".format(qps)) diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.sh b/python/examples/criteo_ctr_with_cube/benchmark_batch.sh new file mode 100755 index 0000000000000000000000000000000000000000..3a51c0de68bf47fb798c165d2fb34868056ddab6 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/benchmark_batch.sh @@ -0,0 +1,12 @@ +rm profile_log +for thread_num in 1 2 4 8 16 +do +for batch_size in 1 2 4 8 16 32 64 128 256 512 +do + $PYTHONROOT/bin/python benchmark_batch.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 + echo "========================================" + echo "batch size : $batch_size" >> profile_log + $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log + tail -n 2 profile >> profile_log +done +done diff --git a/python/examples/criteo_ctr_with_cube/clean.sh b/python/examples/criteo_ctr_with_cube/clean.sh new file mode 100755 index 0000000000000000000000000000000000000000..522a602a4e1ea8c9fb8902d8cd0d2d872cba6edd --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/clean.sh @@ -0,0 +1,5 @@ +ps -ef | grep cube | awk {'print $2'} | xargs kill -9 +ps -ef | grep SimpleHTTPServer | awk {'print $2'} | xargs kill -9 +rm -rf cube/cube_data cube/data cube/log* cube/nohup* cube/output/ cube/donefile cube/input cube/monitor cube/cube-builder.INFO +ps -ef | grep test | awk {'print $2'} | xargs kill -9 +ps -ef | grep serving | awk {'print $2'} | xargs kill -9 diff --git a/python/examples/criteo_ctr_with_cube/criteo.py b/python/examples/criteo_ctr_with_cube/criteo.py new file mode 100755 index 0000000000000000000000000000000000000000..f37eb1d2c1d8df6975ec0c28923c6e17c0272290 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/criteo.py @@ -0,0 +1,81 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + + +class CriteoDataset(object): + def setup(self, sparse_feature_dim): + self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.hash_dim_ = sparse_feature_dim + # here, training data are lines with line_index < train_idx_ + self.train_idx_ = 41256555 + self.continuous_range_ = range(1, 14) + self.categorical_range_ = range(14, 40) + + def _process_line(self, line): + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in self.continuous_range_: + if features[idx] == '': + dense_feature.append(0.0) + else: + dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \ + self.cont_diff_[idx - 1]) + for idx in self.categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % self.hash_dim_]) + + return dense_feature, sparse_feature, [int(features[0])] + + def infer_reader(self, filelist, batch, buf_size): + def local_iter(): + for fname in filelist: + with open(fname.strip(), "r") as fin: + for line in fin: + dense_feature, sparse_feature, label = self._process_line( + line) + #yield dense_feature, sparse_feature, label + yield [dense_feature] + sparse_feature + [label] + + import paddle + batch_iter = paddle.batch( + paddle.reader.shuffle( + local_iter, buf_size=buf_size), + batch_size=batch) + return batch_iter + + def generate_sample(self, line): + def data_iter(): + dense_feature, sparse_feature, label = self._process_line(line) + feature_name = ["dense_input"] + for idx in self.categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return data_iter + + +if __name__ == "__main__": + criteo_dataset = CriteoDataset() + criteo_dataset.setup(int(sys.argv[1])) + criteo_dataset.run_from_stdin() diff --git a/python/examples/criteo_ctr_with_cube/criteo_reader.py b/python/examples/criteo_ctr_with_cube/criteo_reader.py new file mode 100755 index 0000000000000000000000000000000000000000..2a80af78a9c2033bf246f703ca70a817ab786af3 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/criteo_reader.py @@ -0,0 +1,83 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import sys +import paddle.fluid.incubate.data_generator as dg + + +class CriteoDataset(dg.MultiSlotDataGenerator): + def setup(self, sparse_feature_dim): + self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.hash_dim_ = sparse_feature_dim + # here, training data are lines with line_index < train_idx_ + self.train_idx_ = 41256555 + self.continuous_range_ = range(1, 14) + self.categorical_range_ = range(14, 40) + + def _process_line(self, line): + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in self.continuous_range_: + if features[idx] == '': + dense_feature.append(0.0) + else: + dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \ + self.cont_diff_[idx - 1]) + for idx in self.categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % self.hash_dim_]) + + return dense_feature, sparse_feature, [int(features[0])] + + def infer_reader(self, filelist, batch, buf_size): + def local_iter(): + for fname in filelist: + with open(fname.strip(), "r") as fin: + for line in fin: + dense_feature, sparse_feature, label = self._process_line( + line) + #yield dense_feature, sparse_feature, label + yield [dense_feature] + sparse_feature + [label] + + import paddle + batch_iter = paddle.batch( + paddle.reader.shuffle( + local_iter, buf_size=buf_size), + batch_size=batch) + return batch_iter + + def generate_sample(self, line): + def data_iter(): + dense_feature, sparse_feature, label = self._process_line(line) + feature_name = ["dense_input"] + for idx in self.categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return data_iter + + +if __name__ == "__main__": + criteo_dataset = CriteoDataset() + criteo_dataset.setup(int(sys.argv[1])) + criteo_dataset.run_from_stdin() diff --git a/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf b/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf new file mode 100755 index 0000000000000000000000000000000000000000..b70f6e34247e410f9b80054010338d3c8f452ec6 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/cube/conf/cube.conf @@ -0,0 +1,13 @@ +[{ + "dict_name": "test_dict", + "shard": 1, + "dup": 1, + "timeout": 200, + "retry": 3, + "backup_request": 100, + "type": "ipport_list", + "load_balancer": "rr", + "nodes": [{ + "ipport_list": "list://127.0.0.1:8027" + }] +}] diff --git a/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf b/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf new file mode 100755 index 0000000000000000000000000000000000000000..21c7bddebd8f22b91d0ba26a6121007f96a4380b --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/cube/conf/gflags.conf @@ -0,0 +1,4 @@ +--port=8027 +--dict_split=1 +--in_mem=true +--log_dir=./log/ diff --git a/python/examples/criteo_ctr_with_cube/cube/keys b/python/examples/criteo_ctr_with_cube/cube/keys new file mode 100755 index 0000000000000000000000000000000000000000..f00c965d8307308469e537302baa73048488f162 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/cube/keys @@ -0,0 +1,10 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 diff --git a/python/examples/criteo_ctr_with_cube/cube_prepare.sh b/python/examples/criteo_ctr_with_cube/cube_prepare.sh new file mode 100755 index 0000000000000000000000000000000000000000..ceeda0a603e7474f0845333ef94e05d923bde4f4 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/cube_prepare.sh @@ -0,0 +1,7 @@ + +mkdir -p cube_model +mkdir -p cube/data +./seq_generator ctr_serving_model/SparseFeatFactors ./cube_model/feature +./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=./cube/data -shard_num=1 -only_build=false +mv ./cube/data/0_0/test_dict_part0/* ./cube/data/ +cd cube && ./cube diff --git a/python/examples/criteo_ctr_with_cube/get_data.sh b/python/examples/criteo_ctr_with_cube/get_data.sh new file mode 100755 index 0000000000000000000000000000000000000000..1f244b3a4aa81488bb493825576ba30c4b3bba22 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz +tar -zxvf ctr_data.tar.gz diff --git a/python/examples/criteo_ctr_with_cube/local_train.py b/python/examples/criteo_ctr_with_cube/local_train.py new file mode 100755 index 0000000000000000000000000000000000000000..d4a1bc930924e348048f7ac3e5c46381d9b6441b --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/local_train.py @@ -0,0 +1,100 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from __future__ import print_function + +from args import parse_args +import os +import paddle.fluid as fluid +import sys +from network_conf import dnn_model + +dense_feature_dim = 13 + + +def train(): + args = parse_args() + sparse_only = args.sparse_only + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data( + name="C" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(1, 27) + ] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + #nn_input = None if sparse_only else dense_input + nn_input = dense_input + predict_y, loss, auc_var, batch_auc_var, infer_vars = dnn_model( + nn_input, sparse_input_ids, label, args.embedding_size, + args.sparse_feature_dim) + + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + + python_executable = "python" + pipe_command = "{} criteo_reader.py {}".format(python_executable, + args.sparse_feature_dim) + + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(128) + thread_num = 10 + dataset.set_thread(thread_num) + + whole_filelist = [ + "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data"))) + ] + + print(whole_filelist) + dataset.set_filelist(whole_filelist[:100]) + dataset.load_into_memory() + fluid.layers.Print(auc_var) + epochs = 1 + for i in range(epochs): + exe.train_from_dataset( + program=fluid.default_main_program(), dataset=dataset, debug=True) + print("epoch {} finished".format(i)) + + import paddle_serving_client.io as server_io + feed_var_dict = {} + feed_var_dict['dense_input'] = dense_input + for i, sparse in enumerate(sparse_input_ids): + feed_var_dict["embedding_{}.tmp_0".format(i)] = sparse + fetch_var_dict = {"prob": predict_y} + + feed_kv_dict = {} + feed_kv_dict['dense_input'] = dense_input + for i, emb in enumerate(infer_vars): + feed_kv_dict["embedding_{}.tmp_0".format(i)] = emb + fetch_var_dict = {"prob": predict_y} + + server_io.save_model("ctr_serving_model", "ctr_client_conf", feed_var_dict, + fetch_var_dict, fluid.default_main_program()) + + server_io.save_model("ctr_serving_model_kv", "ctr_client_conf_kv", + feed_kv_dict, fetch_var_dict, + fluid.default_main_program()) + + +if __name__ == '__main__': + train() diff --git a/python/examples/criteo_ctr_with_cube/network_conf.py b/python/examples/criteo_ctr_with_cube/network_conf.py new file mode 100755 index 0000000000000000000000000000000000000000..2975533a72ad21d6dd5896446fd06c1f9bdfe8b4 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/network_conf.py @@ -0,0 +1,77 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import paddle.fluid as fluid +import math + + +def dnn_model(dense_input, sparse_inputs, label, embedding_size, + sparse_feature_dim): + def embedding_layer(input): + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + x = fluid.layers.sequence_pool(input=emb, pool_type='sum') + return emb, x + + def mlp_input_tensor(emb_sums, dense_tensor): + #if isinstance(dense_tensor, fluid.Variable): + # return fluid.layers.concat(emb_sums, axis=1) + #else: + return fluid.layers.concat(emb_sums + [dense_tensor], axis=1) + + def mlp(mlp_input): + fc1 = fluid.layers.fc(input=mlp_input, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(mlp_input.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + pre = fluid.layers.fc(input=fc3, + size=2, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + return pre + + emb_pair_sums = list(map(embedding_layer, sparse_inputs)) + emb_sums = [x[1] for x in emb_pair_sums] + infer_vars = [x[0] for x in emb_pair_sums] + mlp_in = mlp_input_tensor(emb_sums, dense_input) + predict = mlp(mlp_in) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=label) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=label, num_thresholds=2 ** 12, slide_steps=20) + return predict, avg_cost, auc_var, batch_auc_var, infer_vars diff --git a/python/examples/criteo_ctr_with_cube/test_client.py b/python/examples/criteo_ctr_with_cube/test_client.py new file mode 100755 index 0000000000000000000000000000000000000000..de205ebc68af02e8dd978da51a4c43bef0cec0d4 --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/test_client.py @@ -0,0 +1,48 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import Client +import sys +import os +import criteo as criteo +import time +from paddle_serving_client.metric import auc + +client = Client() +client.load_client_config(sys.argv[1]) +client.connect(["127.0.0.1:9292"]) + +batch = 1 +buf_size = 100 +dataset = criteo.CriteoDataset() +dataset.setup(1000001) +test_filelists = ["{}/part-0".format(sys.argv[2])] +reader = dataset.infer_reader(test_filelists, batch, buf_size) +label_list = [] +prob_list = [] +start = time.time() +for ei in range(10000): + data = reader().next() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] + fetch_map = client.predict(feed=feed_dict, fetch=["prob"]) + prob_list.append(fetch_map['prob'][1]) + label_list.append(data[0][-1][0]) + +print(auc(label_list, prob_list)) +end = time.time() +print(end - start) diff --git a/python/examples/criteo_ctr_with_cube/test_server.py b/python/examples/criteo_ctr_with_cube/test_server.py new file mode 100755 index 0000000000000000000000000000000000000000..5399ace839a00071c0ed9ce384e5523b68db27fc --- /dev/null +++ b/python/examples/criteo_ctr_with_cube/test_server.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_dist_kv_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.set_num_threads(4) +server.load_model_config(sys.argv[1]) +server.prepare_server(workdir="work_dir1", port=9292, device="cpu") +server.run_server() diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 87c6b4960e439dd529cc0354c667c244369c133a..55c5f8b4ae2f4728be87a9f5ed15e28e584b66c6 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -32,6 +32,7 @@ class OpMaker(object): "general_text_reader": "GeneralTextReaderOp", "general_text_response": "GeneralTextResponseOp", "general_single_kv": "GeneralSingleKVOp", + "general_dist_kv_infer": "GeneralDistKVInferOp", "general_dist_kv": "GeneralDistKVOp", "general_copy": "GeneralCopyOp" } @@ -82,6 +83,7 @@ class Server(object): self.infer_service_fn = "infer_service.prototxt" self.model_toolkit_fn = "model_toolkit.prototxt" self.general_model_config_fn = "general_model.prototxt" + self.cube_config_fn = "cube.conf" self.workdir = "" self.max_concurrency = 0 self.num_threads = 4 @@ -157,6 +159,11 @@ class Server(object): "w") as fout: fout.write(str(self.model_conf)) self.resource_conf = server_sdk.ResourceConf() + for workflow in self.workflow_conf.workflows: + for node in workflow.nodes: + if "dist_kv" in node.name: + self.resource_conf.cube_config_path = workdir + self.resource_conf.cube_config_file = self.cube_config_fn self.resource_conf.model_toolkit_path = workdir self.resource_conf.model_toolkit_file = self.model_toolkit_fn self.resource_conf.general_model_path = workdir @@ -295,6 +302,6 @@ class Server(object): self.workdir, self.workflow_fn, self.num_threads) - print("Going to Run Comand") + print("Going to Run Command") print(command) os.system(command) diff --git a/tools/serving_build.sh b/tools/serving_build.sh index b810e3139803bd363c771c6f655cef6595177dc8..dd6a3f6da8b3e40f2e379cb9457c4e5f00bc900c 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -49,7 +49,7 @@ function build_server() { -DPYTHON_LIBRARIES=$PYTHONROOT/lib64/libpython2.7.so \ -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \ -DCLIENT_ONLY=OFF .. - check_cmd "make -j2 >/dev/null" + check_cmd "make -j2 >/dev/null && make install -j2 >/dev/null" pip install python/dist/paddle_serving_server* >/dev/null ;; GPU) @@ -58,7 +58,7 @@ function build_server() { -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \ -DCLIENT_ONLY=OFF \ -DWITH_GPU=ON .. - check_cmd "make -j2 >/dev/null" + check_cmd "make -j2 >/dev/null && make install -j2 >/dev/null" pip install python/dist/paddle_serving_server* >/dev/null ;; *) @@ -68,13 +68,13 @@ function build_server() { esac echo "build server $TYPE part finished as expected." cd .. - rm -rf $DIRNAME } function python_test_fit_a_line() { cd fit_a_line sh get_data.sh local TYPE=$1 + echo $TYPE case $TYPE in CPU) # test rpc @@ -102,12 +102,43 @@ function python_test_fit_a_line() { cd .. } +function python_run_criteo_ctr_with_cube() { + local TYPE=$1 + yum install -y bc >/dev/null + cd criteo_ctr_with_cube + check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz" + check_cmd "tar xf ctr_cube_unittest.tar.gz" + check_cmd "mv models/ctr_client_conf ./" + check_cmd "mv models/ctr_serving_model_kv ./" + check_cmd "mv models/data ./cube/" + check_cmd "mv models/ut_data ./" + cp ../../../build-server-$TYPE/output/bin/cube* ./cube/ + mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/ + yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/ + + sh cube_prepare.sh & + check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/" + python test_server.py ctr_serving_model_kv & + check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score" + AUC=$(tail -n 2 score | awk 'NR==1') + VAR2="0.70" + RES=$( echo "$AUC>$VAR2" | bc ) + if [[ $RES -eq 0 ]]; then + echo "error with criteo_ctr_with_cube inference auc test, auc should > 0.70" + exit 1 + fi + echo "criteo_ctr_with_cube inference auc test success" + ps -ef | grep "paddle_serving_server" | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill +} + function python_run_test() { cd python/examples local TYPE=$1 # Frist time run, downloading PaddleServing components ... python -c "from paddle_serving_server import Server; server = Server(); server.download_bin()" python_test_fit_a_line $TYPE + python_run_criteo_ctr_with_cube $TYPE echo "test python $TYPE part finished as expected." cd ../.. } @@ -117,6 +148,7 @@ function main() { init build_client $TYPE build_server $TYPE + cd Serving/ python_run_test $TYPE echo "serving $TYPE part finished as expected." }