提交 1bb982eb 编写于 作者: W wangguibao

Codestyle check

Change-Id: Iff5fdd63bb96832212d31bfa114f663f1190c678
上级 3f2c56e7
...@@ -70,7 +70,8 @@ include(generic) ...@@ -70,7 +70,8 @@ include(generic)
include(paddlepaddle) include(paddlepaddle)
include(external/opencv) include(external/opencv)
include_directories("${PADDLE_SERVING_SOURCE_DIR}") include_directories(${PADDLE_SERVING_SOURCE_DIR})
include_directories(${PADDLE_SERVING_BINARY_DIR})
set(EXTERNAL_LIBS set(EXTERNAL_LIBS
gflags gflags
...@@ -94,6 +95,3 @@ add_subdirectory(predictor) ...@@ -94,6 +95,3 @@ add_subdirectory(predictor)
add_subdirectory(inferencer-fluid-cpu) add_subdirectory(inferencer-fluid-cpu)
add_subdirectory(serving) add_subdirectory(serving)
add_subdirectory(sdk-cpp) add_subdirectory(sdk-cpp)
...@@ -336,4 +336,3 @@ Paddle serving框架为策略工程师提供以下三层面的功能性扩展: ...@@ -336,4 +336,3 @@ Paddle serving框架为策略工程师提供以下三层面的功能性扩展:
{:name => 'main', :conf => 'predictor_valid.conf', :target => 'port'}, // valid工具向这个端口发送测试请求,确保服务已正常启动 {:name => 'main', :conf => 'predictor_valid.conf', :target => 'port'}, // valid工具向这个端口发送测试请求,确保服务已正常启动
] ]
``` ```
...@@ -11,6 +11,9 @@ list(APPEND configure_srcs ${CMAKE_CURRENT_LIST_DIR}/src/configure_parser.cpp) ...@@ -11,6 +11,9 @@ list(APPEND configure_srcs ${CMAKE_CURRENT_LIST_DIR}/src/configure_parser.cpp)
add_library(configure ${configure_srcs}) add_library(configure ${configure_srcs})
add_dependencies(configure brpc) add_dependencies(configure brpc)
target_include_directories(configure PUBLIC
${CMAKE_CURRENT_LIST_DIR}/
)
add_executable(test_configure add_executable(test_configure
${CMAKE_CURRENT_LIST_DIR}/tests/test_configure.cpp) ${CMAKE_CURRENT_LIST_DIR}/tests/test_configure.cpp)
...@@ -23,4 +26,3 @@ target_link_libraries(test_configure configure protobuf) ...@@ -23,4 +26,3 @@ target_link_libraries(test_configure configure protobuf)
install(TARGETS configure install(TARGETS configure
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
) )
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once #pragma once
#include <google/protobuf/message.h> #include <google/protobuf/message.h>
#include <string>
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace configure { namespace configure {
int read_proto_conf(const std::string &conf_path, int read_proto_conf(const std::string &conf_path,
const std::string &conf_file, const std::string &conf_file,
google::protobuf::Message *conf); google::protobuf::Message *conf);
int write_proto_conf(google::protobuf::Message *message, int write_proto_conf(google::protobuf::Message *message,
const std::string &output_path, const std::string &output_path,
const std::string &output_file); const std::string &output_file);
} // configure } // namespace configure
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
syntax="proto2"; // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure; package baidu.paddle_serving.configure;
message SigmoidConf { message SigmoidConf {
required string dnn_model_path = 1; required string dnn_model_path = 1;
required string sigmoid_w_file = 2; required string sigmoid_w_file = 2;
required string sigmoid_b_file = 3; required string sigmoid_b_file = 3;
required float exp_max_input = 4; required float exp_max_input = 4;
required float exp_min_input = 5; required float exp_min_input = 5;
}; };
syntax="proto2"; // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure; package baidu.paddle_serving.configure;
message ConnectionConf { message ConnectionConf {
required uint32 connect_timeout_ms = 1; required uint32 connect_timeout_ms = 1;
required uint32 rpc_timeout_ms = 2; required uint32 rpc_timeout_ms = 2;
required uint32 connect_retry_count = 3; required uint32 connect_retry_count = 3;
required uint32 max_connection_per_host = 4; required uint32 max_connection_per_host = 4;
required uint32 hedge_request_timeout_ms = 5; required uint32 hedge_request_timeout_ms = 5;
required uint32 hedge_fetch_retry_count = 6; required uint32 hedge_fetch_retry_count = 6;
required string connection_type = 7; required string connection_type = 7;
}; };
message NamingConf { message NamingConf {
optional string cluster_filter_strategy = 1; optional string cluster_filter_strategy = 1;
optional string load_balance_strategy = 2; optional string load_balance_strategy = 2;
optional string cluster = 3; optional string cluster = 3;
}; };
message RpcParameter { message RpcParameter {
// 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4 // 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
required uint32 compress_type = 1; required uint32 compress_type = 1;
required uint32 package_size = 2; required uint32 package_size = 2;
required string protocol = 3; required string protocol = 3;
required uint32 max_channel_per_request = 4; required uint32 max_channel_per_request = 4;
}; };
message SplitConf{ message SplitConf {
optional string split_tag_name = 1; optional string split_tag_name = 1;
optional string tag_candidates = 2; optional string tag_candidates = 2;
}; };
message VariantConf { message VariantConf {
required string tag = 1; required string tag = 1;
optional ConnectionConf connection_conf = 2; optional ConnectionConf connection_conf = 2;
optional NamingConf naming_conf = 3; optional NamingConf naming_conf = 3;
optional RpcParameter rpc_parameter = 4; optional RpcParameter rpc_parameter = 4;
optional SplitConf split_conf = 5; optional SplitConf split_conf = 5;
optional string variant_router = 6; optional string variant_router = 6;
}; };
message WeightedRandomRenderConf { message WeightedRandomRenderConf { required string variant_weight_list = 1; };
required string variant_weight_list = 1;
};
message Predictor { message Predictor {
required string name = 1; required string name = 1;
required string service_name = 2; required string service_name = 2;
required string endpoint_router = 3; required string endpoint_router = 3;
required WeightedRandomRenderConf weighted_random_render_conf = 4; required WeightedRandomRenderConf weighted_random_render_conf = 4;
repeated VariantConf variants = 5; repeated VariantConf variants = 5;
}; };
// SDK conf // SDK conf
message SDKConf { message SDKConf {
required VariantConf default_variant_conf = 1; required VariantConf default_variant_conf = 1;
repeated Predictor predictors = 2; repeated Predictor predictors = 2;
}; };
syntax="proto2"; // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package baidu.paddle_serving.configure; package baidu.paddle_serving.configure;
message EngineDesc { message EngineDesc {
required string name = 1; required string name = 1;
required string type = 2; required string type = 2;
required string reloadable_meta = 3; required string reloadable_meta = 3;
required string reloadable_type = 4; required string reloadable_type = 4;
required string model_data_path = 5; required string model_data_path = 5;
required uint32 runtime_thread_num = 6; required uint32 runtime_thread_num = 6;
required uint32 batch_infer_size = 7; required uint32 batch_infer_size = 7;
required uint32 enable_batch_align = 8; required uint32 enable_batch_align = 8;
optional string version_file = 9; optional string version_file = 9;
optional string version_type = 10; optional string version_type = 10;
}; };
// model_toolkit conf // model_toolkit conf
message ModelToolkitConf { message ModelToolkitConf { repeated EngineDesc engines = 1; };
repeated EngineDesc engines = 1;
};
// reource conf // reource conf
message ResourceConf { message ResourceConf {
required string model_toolkit_path = 1; required string model_toolkit_path = 1;
required string model_toolkit_file = 2; required string model_toolkit_file = 2;
}; };
// DAG node depency info // DAG node depency info
message DAGNodeDependency { message DAGNodeDependency {
required string name = 1; required string name = 1;
required string mode = 2; required string mode = 2;
}; };
// DAG Node // DAG Node
message DAGNode { message DAGNode {
required string name = 1; required string name = 1;
required string type = 2; required string type = 2;
repeated DAGNodeDependency dependencies = 3; repeated DAGNodeDependency dependencies = 3;
}; };
// workflow entry // workflow entry
message Workflow { message Workflow {
required string name = 1; required string name = 1;
required string workflow_type = 2; required string workflow_type = 2;
repeated DAGNode nodes = 3; repeated DAGNode nodes = 3;
}; };
// Workflow conf // Workflow conf
message WorkflowConf { message WorkflowConf { repeated Workflow workflows = 1; }
repeated Workflow workflows = 1;
}
// request_field_key: specifies use which request field as mapping key (see // request_field_key: specifies use which request field as mapping key (see
// request_field_key in InferService below) // request_field_key in InferService below)
...@@ -58,32 +67,32 @@ message WorkflowConf { ...@@ -58,32 +67,32 @@ message WorkflowConf {
// matches the value of `request_field_value` in one of the // matches the value of `request_field_value` in one of the
// ValueMappedWorkflows, the request will be directed to the workflow specified // ValueMappedWorkflows, the request will be directed to the workflow specified
// in the `workflow` field of that ValueMappedWorkflow // in the `workflow` field of that ValueMappedWorkflow
// //
message ValueMappedWorkflow { message ValueMappedWorkflow {
required string request_field_value = 1; required string request_field_value = 1;
required string workflow = 2; required string workflow = 2;
}; };
message InferService { message InferService {
required string name = 1; required string name = 1;
optional string merger = 2; optional string merger = 2;
optional bool enable_map_request_to_workflow = 3 [default = false]; optional bool enable_map_request_to_workflow = 3 [ default = false ];
// If enable_map_request_to_workfow = true // If enable_map_request_to_workfow = true
// //
// Each request will be mapped to a workflow according to the value in // Each request will be mapped to a workflow according to the value in
// in user request field specified by `request_field_key` (see the // in user request field specified by `request_field_key` (see the
// comments for ValueMappedWorkflow above) // comments for ValueMappedWorkflow above)
optional string request_field_key = 4; optional string request_field_key = 4;
repeated ValueMappedWorkflow value_mapped_workflows = 5; repeated ValueMappedWorkflow value_mapped_workflows = 5;
// If enable_map_request_to_workflow = false // If enable_map_request_to_workflow = false
repeated string workflows = 6; repeated string workflows = 6;
}; };
// InferService conf // InferService conf
message InferServiceConf { message InferServiceConf {
optional uint32 port = 1; optional uint32 port = 1;
repeated InferService services = 2; repeated InferService services = 2;
}; };
#include <sys/types.h> // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#include <sys/stat.h> //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "include/configure_parser.h"
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream> #include <fstream>
#include "butil/logging.h" #include "butil/logging.h"
#include <google/protobuf/text_format.h> #include "google/protobuf/io/zero_copy_stream_impl.h"
#include <google/protobuf/io/zero_copy_stream_impl.h> #include "google/protobuf/text_format.h"
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
...@@ -13,46 +28,44 @@ namespace configure { ...@@ -13,46 +28,44 @@ namespace configure {
int read_proto_conf(const std::string &conf_path, int read_proto_conf(const std::string &conf_path,
const std::string &conf_file, const std::string &conf_file,
google::protobuf::Message *conf) google::protobuf::Message *conf) {
{ std::string file_str = conf_path + "/" + conf_file;
std::string file_str = conf_path + "/" + conf_file; int fd = open(file_str.c_str(), O_RDONLY);
int fd = open(file_str.c_str(), O_RDONLY); if (fd == -1) {
if (fd == -1) { LOG(WARNING) << "File not found: " << file_str.c_str();
LOG(WARNING) << "File not found: " << file_str.c_str(); return -1;
return -1; }
}
google::protobuf::io::FileInputStream input(fd);
google::protobuf::io::FileInputStream input(fd); bool success = google::protobuf::TextFormat::Parse(&input, conf);
bool success = google::protobuf::TextFormat::Parse(&input, conf); close(fd);
close(fd); if (!success) {
if (!success) { return -1;
return -1; }
}
return 0;
return 0;
} }
int write_proto_conf(google::protobuf::Message *message, int write_proto_conf(google::protobuf::Message *message,
const std::string &output_path, const std::string &output_path,
const std::string &output_file) const std::string &output_file) {
{ std::string binary_str;
std::string binary_str; google::protobuf::TextFormat::PrintToString(*message, &binary_str);
google::protobuf::TextFormat::PrintToString(*message, &binary_str);
std::string file_str = output_path + "/" + output_file;
std::string file_str = output_path + "/" + output_file; std::ofstream fout_bin((file_str.c_str()));
std::ofstream fout_bin((file_str.c_str())); if (!fout_bin) {
if (!fout_bin) { LOG(WARNING) << "Open file error: " << file_str.c_str();
LOG(WARNING) << "Open file error: " << file_str.c_str(); return -1;
return -1; }
}
fout_bin.write(binary_str.c_str(), binary_str.size());
fout_bin.write((char *)binary_str.c_str(), binary_str.size()); fout_bin.close();
fout_bin.close();
return 0;
return 0;
} }
} // configure } // namespace configure
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ /* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */
#include <sys/types.h> // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <iostream> #include <iostream>
#include "server_configure.pb.h" #include "configure/inferencer_configure.pb.h"
#include "sdk_configure.pb.h" #include "configure/sdk_configure.pb.h"
#include "inferencer_configure.pb.h" #include "configure/server_configure.pb.h"
#include "configure_parser.h" #include "include/configure_parser.h"
using baidu::paddle_serving::configure::EngineDesc; using baidu::paddle_serving::configure::EngineDesc;
using baidu::paddle_serving::configure::ModelToolkitConf; using baidu::paddle_serving::configure::ModelToolkitConf;
...@@ -30,234 +44,247 @@ using baidu::paddle_serving::configure::SDKConf; ...@@ -30,234 +44,247 @@ using baidu::paddle_serving::configure::SDKConf;
using baidu::paddle_serving::configure::SigmoidConf; using baidu::paddle_serving::configure::SigmoidConf;
const std::string output_dir = "./conf/"; const char *output_dir = "./conf/";
const std::string model_toolkit_conf_file = "model_toolkit.prototxt"; const char *model_toolkit_conf_file = "model_toolkit.prototxt";
const std::string resource_conf_file = "resource.prototxt"; const char *resource_conf_file = "resource.prototxt";
const std::string workflow_conf_file = "workflow.prototxt"; const char *workflow_conf_file = "workflow.prototxt";
const std::string service_conf_file = "service.prototxt"; const char *service_conf_file = "service.prototxt";
const std::string sdk_conf_file = "predictors.prototxt"; const char *sdk_conf_file = "predictors.prototxt";
const std::string sigmoid_conf_file = "inferencer.prototxt"; const char *sigmoid_conf_file = "inferencer.prototxt";
int test_write_conf() int test_write_conf() {
{ // model_toolkit conf
// model_toolkit conf ModelToolkitConf model_toolkit_conf;
ModelToolkitConf model_toolkit_conf;
// This engine has a default version
// This engine has a default version EngineDesc *engine = model_toolkit_conf.add_engines();
EngineDesc *engine = model_toolkit_conf.add_engines(); engine->set_name("image_classification_resnet");
engine->set_name("image_classification_resnet"); engine->set_type("FLUID_CPU_NATIVE_DIR");
engine->set_type("FLUID_CPU_NATIVE_DIR"); engine->set_reloadable_meta("./data/model/paddle/fluid_time_file");
engine->set_reloadable_meta("./data/model/paddle/fluid_time_file"); engine->set_reloadable_type("timestamp_ne");
engine->set_reloadable_type("timestamp_ne"); engine->set_model_data_path("./data/model/paddle/fluid/SE_ResNeXt50_32x4d");
engine->set_model_data_path("./data/model/paddle/fluid/SE_ResNeXt50_32x4d"); engine->set_runtime_thread_num(0);
engine->set_runtime_thread_num(0); engine->set_batch_infer_size(0);
engine->set_batch_infer_size(0); engine->set_enable_batch_align(0);
engine->set_enable_batch_align(0);
int ret = baidu::paddle_serving::configure::write_proto_conf(
int ret = baidu::paddle_serving::configure::write_proto_conf(&model_toolkit_conf, output_dir, model_toolkit_conf_file); &model_toolkit_conf, output_dir, model_toolkit_conf_file);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
// resource conf // resource conf
ResourceConf resource_conf; ResourceConf resource_conf;
resource_conf.set_model_toolkit_path(output_dir); resource_conf.set_model_toolkit_path(output_dir);
resource_conf.set_model_toolkit_file("model_toolkit.prototxt"); resource_conf.set_model_toolkit_file("model_toolkit.prototxt");
ret = baidu::paddle_serving::configure::write_proto_conf(&resource_conf, output_dir, resource_conf_file); ret = baidu::paddle_serving::configure::write_proto_conf(
if (ret != 0) { &resource_conf, output_dir, resource_conf_file);
return ret; if (ret != 0) {
} return ret;
}
// workflow entries conf
WorkflowConf workflow_conf; // workflow entries conf
Workflow *workflow = workflow_conf.add_workflows(); WorkflowConf workflow_conf;
workflow->set_name("workflow1"); Workflow *workflow = workflow_conf.add_workflows();
workflow->set_workflow_type("Sequence"); workflow->set_name("workflow1");
workflow->set_workflow_type("Sequence");
DAGNode *dag_node = workflow->add_nodes();
dag_node->set_name("image_reader_op"); DAGNode *dag_node = workflow->add_nodes();
dag_node->set_type("ReaderOp"); dag_node->set_name("image_reader_op");
dag_node->set_type("ReaderOp");
dag_node = workflow->add_nodes();
dag_node->set_name("imag_classify_op"); dag_node = workflow->add_nodes();
dag_node->set_type("ClassifyOp"); dag_node->set_name("imag_classify_op");
DAGNodeDependency *node_dependency = dag_node->add_dependencies(); dag_node->set_type("ClassifyOp");
node_dependency->set_name("image_reader_op"); DAGNodeDependency *node_dependency = dag_node->add_dependencies();
node_dependency->set_mode("RO"); node_dependency->set_name("image_reader_op");
node_dependency->set_mode("RO");
dag_node = workflow->add_nodes();
dag_node->set_name("write_json_op"); dag_node = workflow->add_nodes();
dag_node->set_type("WriteOp"); dag_node->set_name("write_json_op");
node_dependency = dag_node->add_dependencies(); dag_node->set_type("WriteOp");
node_dependency->set_name("image_classify_op"); node_dependency = dag_node->add_dependencies();
node_dependency->set_mode("RO"); node_dependency->set_name("image_classify_op");
node_dependency->set_mode("RO");
workflow = workflow_conf.add_workflows();
workflow->set_name("workflow2"); workflow = workflow_conf.add_workflows();
workflow->set_workflow_type("Sequence"); workflow->set_name("workflow2");
workflow->set_workflow_type("Sequence");
dag_node = workflow->add_nodes();
dag_node->set_name("dense_op"); dag_node = workflow->add_nodes();
dag_node->set_type("DenseOp"); dag_node->set_name("dense_op");
dag_node->set_type("DenseOp");
ret = baidu::paddle_serving::configure::write_proto_conf(&workflow_conf, output_dir, workflow_conf_file);
if (ret != 0) { ret = baidu::paddle_serving::configure::write_proto_conf(
return ret; &workflow_conf, output_dir, workflow_conf_file);
} if (ret != 0) {
return ret;
InferServiceConf infer_service_conf; }
infer_service_conf.set_port(0);
InferService *infer_service = infer_service_conf.add_services(); InferServiceConf infer_service_conf;
infer_service->set_name("ImageClassifyService"); infer_service_conf.set_port(0);
infer_service->add_workflows("workflow1"); InferService *infer_service = infer_service_conf.add_services();
infer_service->add_workflows("workflow2"); infer_service->set_name("ImageClassifyService");
infer_service->add_workflows("workflow1");
infer_service = infer_service_conf.add_services(); infer_service->add_workflows("workflow2");
infer_service->set_name("BuiltinDenseFormatService");
infer_service->add_workflows("workflow2"); infer_service = infer_service_conf.add_services();
infer_service->set_name("BuiltinDenseFormatService");
ret = baidu::paddle_serving::configure::write_proto_conf(&infer_service_conf, output_dir, service_conf_file); infer_service->add_workflows("workflow2");
if (ret != 0) {
return ret; ret = baidu::paddle_serving::configure::write_proto_conf(
} &infer_service_conf, output_dir, service_conf_file);
if (ret != 0) {
SDKConf sdk_conf; return ret;
VariantConf *default_variant_conf = sdk_conf.mutable_default_variant_conf(); }
default_variant_conf->set_tag("default");
SDKConf sdk_conf;
ConnectionConf *connection_conf = default_variant_conf->mutable_connection_conf(); VariantConf *default_variant_conf = sdk_conf.mutable_default_variant_conf();
connection_conf->set_connect_timeout_ms(2000); default_variant_conf->set_tag("default");
connection_conf->set_rpc_timeout_ms(20000);
connection_conf->set_connect_retry_count(2); ConnectionConf *connection_conf =
connection_conf->set_max_connection_per_host(100); default_variant_conf->mutable_connection_conf();
connection_conf->set_hedge_request_timeout_ms(-1); connection_conf->set_connect_timeout_ms(2000);
connection_conf->set_hedge_fetch_retry_count(2); connection_conf->set_rpc_timeout_ms(20000);
connection_conf->set_connection_type("pooled"); connection_conf->set_connect_retry_count(2);
connection_conf->set_max_connection_per_host(100);
NamingConf *naming_conf = default_variant_conf->mutable_naming_conf(); connection_conf->set_hedge_request_timeout_ms(-1);
naming_conf->set_cluster_filter_strategy("Default"); connection_conf->set_hedge_fetch_retry_count(2);
naming_conf->set_load_balance_strategy("la"); connection_conf->set_connection_type("pooled");
RpcParameter *rpc_parameter = default_variant_conf->mutable_rpc_parameter(); NamingConf *naming_conf = default_variant_conf->mutable_naming_conf();
rpc_parameter->set_compress_type(0); naming_conf->set_cluster_filter_strategy("Default");
rpc_parameter->set_package_size(20); naming_conf->set_load_balance_strategy("la");
rpc_parameter->set_protocol("baidu_std");
rpc_parameter->set_max_channel_per_request(3); RpcParameter *rpc_parameter = default_variant_conf->mutable_rpc_parameter();
rpc_parameter->set_compress_type(0);
Predictor *predictor = sdk_conf.add_predictors(); rpc_parameter->set_package_size(20);
predictor->set_name("ximage"); rpc_parameter->set_protocol("baidu_std");
predictor->set_service_name("baidu.paddle_serving.predictor.image_classification.ImageClassifyService"); rpc_parameter->set_max_channel_per_request(3);
predictor->set_endpoint_router("WeightedRandomRender");
Predictor *predictor = sdk_conf.add_predictors();
WeightedRandomRenderConf *weighted_random_render_conf = predictor->mutable_weighted_random_render_conf(); predictor->set_name("ximage");
weighted_random_render_conf->set_variant_weight_list("50"); predictor->set_service_name(
"baidu.paddle_serving.predictor.image_classification."
VariantConf *variant_conf = predictor->add_variants(); "ImageClassifyService");
variant_conf->set_tag("var1"); predictor->set_endpoint_router("WeightedRandomRender");
naming_conf = variant_conf->mutable_naming_conf();
naming_conf->set_cluster("list://127.0.0.1:8010"); WeightedRandomRenderConf *weighted_random_render_conf =
predictor->mutable_weighted_random_render_conf();
ret = baidu::paddle_serving::configure::write_proto_conf(&sdk_conf, output_dir, sdk_conf_file); weighted_random_render_conf->set_variant_weight_list("50");
if (ret != 0) {
return ret; VariantConf *variant_conf = predictor->add_variants();
} variant_conf->set_tag("var1");
naming_conf = variant_conf->mutable_naming_conf();
SigmoidConf sigmoid_conf; naming_conf->set_cluster("list://127.0.0.1:8010");
sigmoid_conf.set_dnn_model_path("data/dnn_model");
sigmoid_conf.set_sigmoid_w_file("data/dnn_model/_sigmoid_.w_0"); ret = baidu::paddle_serving::configure::write_proto_conf(
sigmoid_conf.set_sigmoid_b_file("data/dnn_model/_sigmoid_.b_0"); &sdk_conf, output_dir, sdk_conf_file);
sigmoid_conf.set_exp_max_input(0.75); if (ret != 0) {
sigmoid_conf.set_exp_min_input(0.25); return ret;
}
ret = baidu::paddle_serving::configure::write_proto_conf(&sigmoid_conf, output_dir, sigmoid_conf_file);
if (ret != 0) { SigmoidConf sigmoid_conf;
return ret; sigmoid_conf.set_dnn_model_path("data/dnn_model");
} sigmoid_conf.set_sigmoid_w_file("data/dnn_model/_sigmoid_.w_0");
sigmoid_conf.set_sigmoid_b_file("data/dnn_model/_sigmoid_.b_0");
return 0; sigmoid_conf.set_exp_max_input(0.75);
sigmoid_conf.set_exp_min_input(0.25);
ret = baidu::paddle_serving::configure::write_proto_conf(
&sigmoid_conf, output_dir, sigmoid_conf_file);
if (ret != 0) {
return ret;
}
return 0;
} }
int test_read_conf() int test_read_conf() {
{ int ret = 0;
int ret = 0;
ModelToolkitConf model_toolkit_conf;
ModelToolkitConf model_toolkit_conf; ret = baidu::paddle_serving::configure::read_proto_conf(
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, model_toolkit_conf_file, &model_toolkit_conf); output_dir, model_toolkit_conf_file, &model_toolkit_conf);
if (ret != 0) { if (ret != 0) {
std::cout << "Read conf fail: " << model_toolkit_conf_file << std::endl; std::cout << "Read conf fail: " << model_toolkit_conf_file << std::endl;
return -1; return -1;
} }
ResourceConf resource_conf; ResourceConf resource_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, resource_conf_file, &resource_conf); ret = baidu::paddle_serving::configure::read_proto_conf(
if (ret != 0) { output_dir, resource_conf_file, &resource_conf);
std::cout << "Read conf fail: " << resource_conf_file << std::endl; if (ret != 0) {
return -1; std::cout << "Read conf fail: " << resource_conf_file << std::endl;
} return -1;
}
WorkflowConf workflow_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, workflow_conf_file, &workflow_conf); WorkflowConf workflow_conf;
if (ret != 0) { ret = baidu::paddle_serving::configure::read_proto_conf(
std::cout << "Read conf fail: " << workflow_conf_file << std::endl; output_dir, workflow_conf_file, &workflow_conf);
return -1; if (ret != 0) {
} std::cout << "Read conf fail: " << workflow_conf_file << std::endl;
return -1;
InferServiceConf service_conf; }
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, service_conf_file, &service_conf);
if (ret != 0) { InferServiceConf service_conf;
std::cout << "Read conf fail: " << service_conf_file << std::endl; ret = baidu::paddle_serving::configure::read_proto_conf(
return -1; output_dir, service_conf_file, &service_conf);
} if (ret != 0) {
std::cout << "Read conf fail: " << service_conf_file << std::endl;
return -1;
}
SDKConf sdk_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sdk_conf_file, &sdk_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
SigmoidConf sigmoid_conf;
ret = baidu::paddle_serving::configure::read_proto_conf(
output_dir, sigmoid_conf_file, &sigmoid_conf);
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
}
return 0;
}
SDKConf sdk_conf; int main() {
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, sdk_conf_file, &sdk_conf); int ret = 0;
struct stat stat_buf;
if (stat(output_dir, &stat_buf) != 0) {
int ret = mkdir("./conf", 0777);
if (ret != 0) { if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl; std::cout << "mkdir ./conf fail" << std::endl;
return -1; return -1;
} }
if (stat("./conf", &stat_buf) != 0) {
SigmoidConf sigmoid_conf; std::cout << "./conf not exist and creating it failed" << std::endl;
ret = baidu::paddle_serving::configure::read_proto_conf(output_dir, sigmoid_conf_file, &sigmoid_conf); return -1;
if (ret != 0) {
std::cout << "Read conf fail: " << sdk_conf_file << std::endl;
return -1;
} }
}
return 0; ret = test_write_conf();
} if (ret != 0) {
std::cout << "test_write_conf fail" << std::endl;
return -1;
}
int main() std::cout << "test_write_conf success" << std::endl;
{
int ret = 0;
struct stat stat_buf;
if (stat(output_dir.c_str(), &stat_buf) != 0) {
int ret = mkdir("./conf", 0777);
if (ret != 0) {
std::cout << "mkdir ./conf fail" << std::endl;
return -1;
}
if (stat("./conf", &stat_buf) != 0) {
std::cout << "./conf not exist and creating it failed" << std::endl;
return -1;
}
}
ret = test_write_conf(); ret = test_read_conf();
if (ret != 0) { if (ret != 0) {
std::cout << "test_write_conf fail" << std::endl; std::cout << "test_read_conf fail" << std::endl;
return -1; return -1;
} }
std::cout << "test_read_conf success" << std::endl;
std::cout << "test_write_conf success" << std::endl;
ret = test_read_conf();
if (ret != 0) {
std::cout << "test_read_conf fail" << std::endl;
return -1;
}
std::cout << "test_read_conf success" << std::endl;
return 0; return 0;
} }
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ /* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
...@@ -10,4 +10,3 @@ target_link_libraries(fluid_cpu_engine pdserving paddle_fluid -liomp5 -lmklml_in ...@@ -10,4 +10,3 @@ target_link_libraries(fluid_cpu_engine pdserving paddle_fluid -liomp5 -lmklml_in
install(TARGETS fluid_cpu_engine install(TARGETS fluid_cpu_engine
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
) )
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "inferencer-fluid-cpu/include/fluid_cpu_engine.h"
#include "framework/factory.h" #include "framework/factory.h"
#include "fluid_cpu_engine.h"
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace fluid_cpu { namespace fluid_cpu {
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuAnalysisCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuAnalysisCore>,
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_ANALYSIS"); ::baidu::paddle_serving::predictor::InferEngine,
"FLUID_CPU_ANALYSIS");
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuAnalysisDirCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_ANALYSIS_DIR"); FluidCpuAnalysisDirCore>,
::baidu::paddle_serving::predictor::InferEngine,
"FLUID_CPU_ANALYSIS_DIR");
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuAnalysisDirWithSigmoidCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_ANALYSIS_DIR_SIGMOID"); FluidCpuAnalysisDirWithSigmoidCore>,
::baidu::paddle_serving::predictor::InferEngine,
"FLUID_CPU_ANALYSIS_DIR_SIGMOID");
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuNativeCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuNativeCore>,
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_NATIVE"); ::baidu::paddle_serving::predictor::InferEngine,
"FLUID_CPU_NATIVE");
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuNativeDirCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuNativeDirCore>,
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_NATIVE_DIR"); ::baidu::paddle_serving::predictor::InferEngine,
"FLUID_CPU_NATIVE_DIR");
REGIST_FACTORY_OBJECT_IMPL_WITH_NAME( REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(
::baidu::paddle_serving::predictor::FluidInferEngine<FluidCpuNativeDirWithSigmoidCore>, ::baidu::paddle_serving::predictor::FluidInferEngine<
::baidu::paddle_serving::predictor::InferEngine, "FLUID_CPU_NATIVE_DIR_SIGMOID"); FluidCpuNativeDirWithSigmoidCore>,
::baidu::paddle_serving::predictor::InferEngine,
} // namespace fluid_cpu "FLUID_CPU_NATIVE_DIR_SIGMOID");
} // namespace paddle_serving
} // namespace baidu } // namespace fluid_cpu
} // namespace paddle_serving
} // namespace baidu
...@@ -34,5 +34,3 @@ install(TARGETS pdserving pdcodegen ...@@ -34,5 +34,3 @@ install(TARGETS pdserving pdcodegen
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so
) )
[TOC]
# 概述
PaddlePaddle是公司开源的机器学习框架,广泛支持各种深度学习模型的定制化开发;
Paddle cloud是基于PaddlePaddle框架实现的一整套云平台,对外提供全流程的AI开发平台,对内托管集团内各产品线的机器学习云服务。
Paddle serving是Paddle cloud的在线预测部分,与Paddle cloud模型训练环节无缝衔接,对外提供机器学习预测共有云服务,对内为公司各业务线提供统一的模型预测开发框架和云服务。
# Getting Started
## 运行示例
说明:Imagenet图像分类模型,默认采用CPU模式(GPU模式请修改BCLOUD配置项,并用Dockerfile构建运行环境,[Docker部署请参考Wiki](http://agroup.baidu.com/share/md/044f552e866f4078900be503784e2468))。
Step1:启动Server端:
```shell
git clone ssh://icode.baidu.com:8235/baidu/paddle-serving/serving ~/my_paddle_serving/baidu/paddle-serving/serving && cd ~/my_paddle_serving/baidu/paddle-serving/serving && bcloud build && ./output/bin/image_class &
```
Step2:启动Client端:
```shell
git clone ssh://icode.baidu.com:8235/baidu/paddle-serving/sdk-cpp ~/my_paddle_serving/baidu/paddle-serving/sdk-cpp && cd ~/my_paddle_serving/baidu/paddle-serving/sdk-cpp && bcloud build && ./output/bin/ximage && pkill image_class
```
## 示例说明
### 预测接口定义
```c++
syntax="proto2";
package baidu.paddle_serving.predictor.image_class;
option cc_generic_services = true;
// x-image request相关(批量接口)
message XImageReqInstance {
required bytes image_binary = 1;
required uint32 image_length = 2;
};
message Request {
repeated XImageReqInstance instances = 1;
};
// x-image response相关(批量接口)
message DensePrediction {
repeated float categories = 1;
};
message ClassResponse {
repeated DensePrediction predictions = 1;
};
message XImageResInstance {
required string response_json = 1;
};
message Response {
// Each json string is serialized from ClassResponse
repeated XImageResInstance predictions = 1;
};
// Service/method相关
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
};
```
### Server端实现
用户只需定制或配置以下三类信息的实现,即可快速搭建完整的Paddle-Serving预测模块。
#### 接口改造([proto目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:proto/))
Server端需对预测接口作如下修改即可:
```c++
// 改动1:依赖paddle-serving option接口文件
import "pds_option.proto";
...
service ClassService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
// 改动2:打开generate_impl开关(以支持配置驱动)
option (pds.options).generate_impl = true;
};
```
#### 示例配置([conf目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:conf/))
- gflags配置项
| name | 默认值 | 含义 |
|------|--------|------|
| workflow_path | ./conf | workflow配置目录名 |
|workflow_file|workflow.conf|workflow配置文件名|
|inferservice_path|./conf|service配置目录名|
|inferservice_file|service.conf|service配置文件名|
|logger_path|./conf|日志配置目录名|
|logger_file|log.conf|日志配置文件名|
|resource_path|./conf|资源管理器目录名|
|resource_file|resource.conf|资源管理器文件名|
|reload_interval_s|10|重载线程间隔时间(s)|
- 配置文件实例(Image图像分类demo)
```shell
# >>> service.conf
[@Service]
name: ImageClassifyService
@workflow: workflow_image_classification
# >>> workflow.conf
[@Workflow]
name: workflow_image_classification
path: ./conf
file: imagec_dag.conf
# >>> imagec_dag.conf
workflow_type: Sequence
[@Node]
name: image_reader_op
type: ImageReaderOp
[@Node]
name: image_classify_op
type: ImageClassifyOp
[.@Depend]
name: image_reader_op
mode: RO
[@Node]
name: write_json_op
type: WriteJsonOp
[.@Depend]
name: image_classify_op
mode: RO
# >>> resource.conf
model_manager_path: ./conf
model_manager_file: model_toolkit.conf
```
#### 定制Op算子([op目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:op/))
- 预处理算子(ImageReaderOp):从Request中读取图像字节流,通过opencv解码,填充tensor对象并输出到channel;
- 预测调用算子(ImageClassifyOp):从ImageReaderOp的channel获得输入tensor,临时申请输出tensor,调用ModelToolkit进行预测,并将输出tensor写入channel
- 后处理算子(WriteJsonOp):从ImageClassifyop的channel获得输出tensor,将其序列化为json字符串,写入作为rpc的output;
### Client端实现
用户只需定制或配置以下三类信息,即可方便的接入预估请求,并在本地配置多套服务连接:
#### 接口改造([proto目录](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/tree/master:proto))
Client端接口只需对预测接口作如下修改即可:
```c++
// 改动1:依赖paddle-serving option接口文件
import "pds_option.proto";
...
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
// 改动2:打开generate_stub开关(以支持配置驱动)
option (pds.options).generate_stub = true;
};
```
#### 连接配置([conf目录](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/tree/master:conf))
```shell
# predictions.conf
## 默认配置共享
[DefaultVariantInfo]
Tag : default
[.Connection]
ConnectTimeoutMicroSec : 200
ReadTimeoutMicroSec : 2000
WriteTimeoutMicroSec : 500
ConnectRetryCount : 2
MaxConnectionPerHost : 100
HedgeRequestTimeoutMicroSec : -1
HedgeFetchRetryCount : 2
BnsReloadIntervalSeconds : 10
ConnectionType : pooled
[.NamingInfo]
ClusterFilterStrategy : Default
LoadBalanceStrategy : la
[.RpcParameter]
# 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
CompressType : 0
Protocol : baidu_std
MaxChannelPerRequest : 3
[@Predictor]
name : ximage
service_name : baidu.paddle_serving.predictor.image_class.ImageClassifyService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 30|70 # 30% vs 70% pvs
[.@VariantInfo]
Tag : var1 # 变体版本标识,提供上游辨识
[..NamingInfo]
Cluster : list://127.0.0.1:8010
[.@VariantInfo]
Tag : var2
[..NamingInfo]
Cluster : list://127.0.0.1:8011
```
#### 请求逻辑([demo/ximage.cpp](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/blob/master:demo/ximage.cpp))
```c++
// 进程级初始化
assert(PredictorAPI::instance().create("./conf/predictions.conf") == 0);
// 线程级预测调用:
Request req;
// fill request
// ...
Response res;
Predictor* ximage = PredictorAPI::instance().fetch_predictor("ximage");
assert(ximage != NULL);
ximage->inference(req, res);
// parse response
// ...
assert(PredictorAPI::instance().free_predictor(ximage) == 0);
// 进程级销毁
assert(PredictorAPI::instance().destroy() == 0);
```
## 凤巢协议兼容
Paddle Serving由凤巢观星框架发展而来,而之前框架的通信协议是nshead+compack+idl,为方便新老接口的兼容,Paddle Serving的server和client均支持向后兼容:
- 老API访问新Server,为适配老观星客户端数据包格式,新Server需通过mcpack2pb生成能解析idl格式的pb对象,详见:[wtitleq server实现](http://icode.baidu.com/repos/baidu/paddle-serving/lr-model/tree/master)
- 新SDK访问老Server,为能够访问老观星server服务,SDK需通过mcpack2pb插件生成基于idl格式的序列化逻辑;详见:[wtitleq api实现](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/tree/master)
凤巢广告拆包支持:Paddle Serving的SDK-Cpp为用户提供了简单易用的拆包功能,通过修改proto/conf文件开启:
```c++
// interface.proto文件
message PredictorRequest {
message AdvRequest {
// 广告级别字段
repeated uint32 ideaid = 1;
repeated string title = 2;
}
// query级别字段
required uint64 sid = 1;
required string query = 2;
// ...
// 广告级别字段
repeated AdvRequest advs = 3 [(pds.pack_on)=true]; // 改动1:对advs字段进行拆包
}
// ...
service WtitleqService {
rpc ...
rpc ...
option (pds.options).package_size = 10; // 改动2:限制单包大小
}
```
[wtitleq sdk的proto实例](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/blob/master:proto/predictor_api.proto)
```bash
# predictions.conf文件
[@Predictor]
# ...
[.@VariantInfo]
#...
[..RpcParameter]
Protocol : itp # 改动3:修改rpc请求参数为itp协议
```
[wtitleq sdk的conf实例](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/blob/master:conf/predictors.conf)
# 框架简介
![图片](http://agroup-bos.cdn.bcebos.com/63a5076471e96a08124b89101e12c1a0ec7b642a)
- 基础框架:屏蔽一个RPC服务所需的所有元素,让用户只关注自己的业务算子的开发;
- 业务框架:基于Protobuf定制请求接口,基于有限DAG定制业务逻辑,并行化调度;
- 模型框架:CPU/FPGA/GPU等硬件异构,多模型间异步优先级调度,新引擎灵活扩展,配置化驱动;
- 用户接口:搭建服务=定义proto文件+实现/复用Op+撰写配置,支持sdk/http请求;
## 名词解释
- 预测引擎:对PaddlePaddle/Abacus/Tensorflow等各种推理计算Lib的封装,屏蔽预测模型动态Reload细节,对上层暴露统一的预测接口;
- 预测模型:由离线训练框架生成、在线预测引擎加载的数据文件或目录,以PaddleFluid模型为例,通常包括拓扑文件和参数文件;
- Op 算子:Paddle-serving对在线(预处理/后处理等)业务逻辑的最小粒度封装,框架提供OpWithChannel和OpWithChannelAndConf这两种常用的Op基类;框架默认实现通用Op算子;
- Node:由某个Op算子类结合参数配置组成的Op算子实例,也是Workflow中的一个执行单元;
- DAG/Workflow:由若干个相互依赖的Node组成,每个Node均可通过特定接口获得Request对象,节点Op通过依赖关系获得其前置Op的输出对象,最后一个Node的输出默认就是Response对象;
- Service:对一次pv的请求封装,可配置若干条Workflow,彼此之间复用当前PV的Request对象,然后各自并行/串行执行,最后将Response写入对应的输出slot中;一个Paddle-serving进程可配置多套Service接口,上游根据ServiceName决定当前访问的Service接口。
![图片](http://agroup-bos.cdn.bcebos.com/2e5e3cdcc9426d16e2090e64e7d33098ae5ad826)
## 主要功能
Paddle serving框架为策略工程师提供以下三层面的功能性扩展:
### 模型
- 预测引擎:集成PaddlePaddle、Abacus、Tensorrt、Anakin、Tensorflow等常用机器学习框架的预测Lib;
- 模型种类:支持PaddlePaddle(V1、V2、Fluid)、TensorrtUFF、Anakin、Tensorflow、Caffe等常见模型格式;
- 用户接口:支持模型加载、重载的配置化驱动,不同种类模型的预测接口完全一致;
- 模型调度:支持基于异步线程模型的多模型预估调度,实现异构资源的优先级调度;
### 业务
- 预测流程:通过有限DAG图描述一次预测从Request到Response的业务流程,节点Node是一个最小逻辑单元——OP;
- 预测逻辑:框架封装常用预处理、预测计算、后处理等常用OP,用户通过自定义OP算子实现特化处理逻辑;
### 服务
- RPC:底层通过Baidu-rpc封装网络交互,Server端可配置化启动多个独立Service,框架会搜集Service粒度的详细业务指标,并按照BVar接口对接到Noah等监控平台;
- SDK:基于Baidu-rpc的client进行封装,提供多下游连接管理、可扩展路由策略、可定制参数实验、自动分包等机制,支持同步、半同步、纯异步等交互模式,以及多种兼容协议,所有连接策略均通过配置驱动
# 平台简介
![图片](http://agroup-bos.cdn.bcebos.com/42a0e34a7c6b36976e3932639209fd823d8f25e0)
- [运维API](http://agroup.baidu.com/share/md/e582f543fb574e9b92445286955a976d)
- [预测API](http://agroup.baidu.com/share/md/eb91a51739514319844ceccdb331564c)
## 名词解释
- 用户(User):云平台注册用户,可基于平台Dashboard对账户下的端点信息进行增、删、查、改;
- 端点(Endpoit):对一个预测需求的逻辑抽象,通常包含一到多个服务变体,以方便多版本模型管理;
- 变体(Variant):一套同质化的Paddle-serving集群服务,每个实例起一个Paddle-serving进程;
- 实验(A/B Test):支持变体实验和参数化实验两种模式,变体实验根据Endpoint所属变体流量百分比实现流量随机抽样;参数化实验通过对pv绑定实验参数、由Paddle-serving进程解析参数、选择不同的代码分支进行实验;
## 主要功能
在公有云落地场景为Infinite(天衍)云平台,主要为策略工程师提供以下三方面的全流程托管:
- 统一接入代理:提供代理服务,通过zk和云平台实时同步元信息,支持多模型版本管理和A/B测试路由策略,提供统一入口和标准预测API;
- 自动化部署:对接K8S/Opera等常见PaaS部署平台,支持服务的一键部署、回滚、下线等运维操作,支持endpoint/variant/model等维度的资源管理;
- 可视化运维:对接console、notebook、dashboard等前端工具和页面,满足可视化运维需求;
# 设计文档
- [总体设计文档](http://agroup.baidu.com/paddleserving/view/office/895070)
- [框架详设文档](http://agroup.baidu.com:8964/static/a3/e40876e464ba08ae5de14aa7710cf326456751.pdf?filename=PaddleServing%E6%9C%8D%E5%8A%A1%E6%A1%86%E6%9E%B6%E8%AF%A6%E7%BB%86%E8%AE%BE%E8%AE%A1%E6%96%87%E6%A1%A3v0_1.pdf)
- [平台详设文档](http://agroup.baidu.com/share/office/042a0941579e49adb8c255c8b5e92d51)
# FAQ
1. 如何修改端口配置?
- 使用该框架搭建的服务需要申请一个端口,可以通过以下方式修改端口号:
- 如果在inferservice_file里指定了port:xxx,那么就去申请该端口号;
- 否则,如果在gflags.conf里指定了--port:xxx,那就去申请该端口号;
- 否则,使用程序里指定的默认端口号:8010。
2. 如何在部署的时候配置动态端口?
- 如果使用FCCI部署协议(凤巢检索端内部的部署协议),需要(1)通过inferservice_file指定端口号;(2)修改[Rakefile.opera](http://wiki.baidu.com/pages/viewpage.action?pageId=399979183#id-%E4%BB%8E%E9%9B%B6%E5%BC%80%E5%A7%8B%E5%86%99production-%E7%BC%96%E5%86%99Rakefile)的dynamic_port_config配置
- `@dynamic_port_config为动态端口配置,向Opera申请名为:name的动态端口,其端口号会被写到:conf文件中的:target配置项。`例子如下:
```
@dynamic_port_config = [
{:name => 'main', :conf => 'framework/service.conf', :target => 'port'}, // 部署时自动向Opera申请端口,服务将会监听这个端口
{:name => 'main', :conf => 'predictor_valid.conf', :target => 'port'}, // valid工具向这个端口发送测试请求,确保服务已正常启动
]
```
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "common/constant.h" #include "common/constant.h"
namespace baidu { namespace baidu {
...@@ -16,23 +30,28 @@ DEFINE_string(logger_path, "./conf", ""); ...@@ -16,23 +30,28 @@ DEFINE_string(logger_path, "./conf", "");
DEFINE_string(logger_file, "log.conf", ""); DEFINE_string(logger_file, "log.conf", "");
DEFINE_string(resource_path, "./conf", ""); DEFINE_string(resource_path, "./conf", "");
DEFINE_string(resource_file, "resource.prototxt", ""); DEFINE_string(resource_file, "resource.prototxt", "");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel, 0: unlimited"); DEFINE_int32(max_concurrency,
DEFINE_int32(num_threads, 0, "Number of pthreads that server runs on, not change if this value <= 0"); 0,
"Limit of request processing in parallel, 0: unlimited");
DEFINE_int32(
num_threads,
0,
"Number of pthreads that server runs on, not change if this value <= 0");
DEFINE_int32(reload_interval_s, 10, ""); DEFINE_int32(reload_interval_s, 10, "");
DEFINE_bool(enable_model_toolkit, false, "enable model toolkit"); DEFINE_bool(enable_model_toolkit, false, "enable model toolkit");
DEFINE_string(enable_protocol_list, "baidu_std", "set protocol list"); DEFINE_string(enable_protocol_list, "baidu_std", "set protocol list");
const char* START_OP_NAME = "startup_op"; const char* START_OP_NAME = "startup_op";
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
// Baidurpc // Baidurpc
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_INTERNAL_FAILURE, BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_INTERNAL_FAILURE,
"Paddle Serving Framework Internal Error."); "Paddle Serving Framework Internal Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_MEM_ALLOC_FAILURE, BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_MEM_ALLOC_FAILURE,
"Paddle Serving Memory Alloc Error."); "Paddle Serving Memory Alloc Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OVERFLOW_FAILURE, BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OVERFLOW_FAILURE,
"Paddle Serving Array Overflow Error."); "Paddle Serving Array Overflow Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OP_INFER_FAILURE, BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OP_INFER_FAILURE,
"Paddle Serving Op Inference Error."); "Paddle Serving Op Inference Error.");
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_CONSTANT_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_CONSTANT_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "common/inner_common.h" #include "common/inner_common.h"
namespace baidu { namespace baidu {
...@@ -34,26 +45,24 @@ extern const char* START_OP_NAME; ...@@ -34,26 +45,24 @@ extern const char* START_OP_NAME;
// ERRORCODE // ERRORCODE
enum { enum {
// internal error // internal error
ERR_INTERNAL_FAILURE = -5000, ERR_INTERNAL_FAILURE = -5000,
ERR_MEM_ALLOC_FAILURE = -5001, ERR_MEM_ALLOC_FAILURE = -5001,
ERR_OVERFLOW_FAILURE = -5002, ERR_OVERFLOW_FAILURE = -5002,
// op error // op error
ERR_OP_INFER_FAILURE = -5100, ERR_OP_INFER_FAILURE = -5100,
// no error // no error
ERR_OK = 0, ERR_OK = 0,
// internal ignore // internal ignore
ERR_IGNORE_FAILURE = 5000, ERR_IGNORE_FAILURE = 5000,
// op ignore // op ignore
ERR_OP_IGNORE_FAILURE = 5100, ERR_OP_IGNORE_FAILURE = 5100,
}; };
static const size_t MAX_WORKFLOW_NUM_IN_ONE_SERVICE = 20; static const size_t MAX_WORKFLOW_NUM_IN_ONE_SERVICE = 20;
static const uint32_t DEFAULT_CACHE_CAPACITY = 10000; static const uint32_t DEFAULT_CACHE_CAPACITY = 10000;
static const uint32_t DEFAULT_CACHE_UNITSIZE = 8192; static const uint32_t DEFAULT_CACHE_UNITSIZE = 8192;
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
#endif
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_INNER_COMMON_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_INNER_COMMON_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <sys/types.h> #include <error.h>
#include <unistd.h> #include <getopt.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <strings.h> #include <strings.h>
#include <getopt.h> #include <sys/types.h>
#include <unistd.h>
#include <typeinfo> #include <typeinfo>
#include <google/protobuf/text_format.h> #include "boost/algorithm/string.hpp" // for boost::split&trim
#include <boost/unordered_map.hpp> #include "boost/function.hpp"
#include <boost/function.hpp> #include "boost/unordered_map.hpp"
#include <boost/algorithm/string.hpp> // for boost::split&trim #include "google/protobuf/text_format.h"
#include <gflags/gflags.h> #include "gflags/gflags.h"
#include <butil/logging.h> #include "brpc/channel.h"
#include <butil/time.h> #include "brpc/policy/giano_authenticator.h"
#include <butil/object_pool.h> #include "brpc/server.h"
#include <brpc/channel.h> #include "bthread/bthread.h"
#include <brpc/server.h> #include "butil/logging.h"
#include <brpc/policy/giano_authenticator.h> #include "butil/object_pool.h"
#include <bthread/bthread.h> #include "butil/time.h"
#include <error.h>
#include "server_configure.pb.h" #include "configure/include/configure_parser.h"
#include "configure_parser.h" #include "configure/server_configure.pb.h"
#include "common/utils.h"
#include "common/types.h"
#include "common/constant.h" #include "common/constant.h"
#include "common/types.h"
#endif #include "common/utils.h"
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_MACROS_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_MACROS_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "common/inner_common.h" #include "common/inner_common.h"
namespace baidu { namespace baidu {
...@@ -8,11 +20,11 @@ namespace paddle_serving { ...@@ -8,11 +20,11 @@ namespace paddle_serving {
namespace predictor { namespace predictor {
#ifndef CATCH_ANY_AND_RET #ifndef CATCH_ANY_AND_RET
#define CATCH_ANY_AND_RET(errno) \ #define CATCH_ANY_AND_RET(errno) \
catch (...) { \ catch (...) { \
LOG(ERROR) << "exception catched"; \ LOG(ERROR) << "exception catched"; \
return errno; \ return errno; \
} }
#endif #endif
#ifdef USE_PTHREAD #ifdef USE_PTHREAD
...@@ -26,7 +38,7 @@ namespace predictor { ...@@ -26,7 +38,7 @@ namespace predictor {
#define THREAD_CREATE pthread_create #define THREAD_CREATE pthread_create
#define THREAD_CANCEL pthread_cancel #define THREAD_CANCEL pthread_cancel
#define THREAD_JOIN pthread_join #define THREAD_JOIN pthread_join
#define THREAD_KEY_DELETE pthread_key_delete #define THREAD_KEY_DELETE pthread_key_delete
#define THREAD_MUTEX_INIT pthread_mutex_init #define THREAD_MUTEX_INIT pthread_mutex_init
#define THREAD_MUTEX_LOCK pthread_mutex_lock #define THREAD_MUTEX_LOCK pthread_mutex_lock
#define THREAD_MUTEX_UNLOCK pthread_mutex_unlock #define THREAD_MUTEX_UNLOCK pthread_mutex_unlock
...@@ -48,7 +60,7 @@ namespace predictor { ...@@ -48,7 +60,7 @@ namespace predictor {
#define THREAD_CREATE bthread_start_background #define THREAD_CREATE bthread_start_background
#define THREAD_CANCEL bthread_stop #define THREAD_CANCEL bthread_stop
#define THREAD_JOIN bthread_join #define THREAD_JOIN bthread_join
#define THREAD_KEY_DELETE bthread_key_delete #define THREAD_KEY_DELETE bthread_key_delete
#define THREAD_MUTEX_INIT bthread_mutex_init #define THREAD_MUTEX_INIT bthread_mutex_init
#define THREAD_MUTEX_LOCK bthread_mutex_lock #define THREAD_MUTEX_LOCK bthread_mutex_lock
#define THREAD_MUTEX_UNLOCK bthread_mutex_unlock #define THREAD_MUTEX_UNLOCK bthread_mutex_unlock
...@@ -61,8 +73,6 @@ namespace predictor { ...@@ -61,8 +73,6 @@ namespace predictor {
#endif #endif
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
#endif
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
...@@ -10,12 +22,10 @@ typedef size_t Size; ...@@ -10,12 +22,10 @@ typedef size_t Size;
typedef const char* ConstByte; typedef const char* ConstByte;
struct Sequence { struct Sequence {
Byte data; Byte data;
Size size; Size size;
}; };
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_UTILS_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_UTILS_H //
// Licensed under the Apache License, Version 2.0 (the "License");
#include "common/macros.h" // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "common/inner_common.h" #include "common/inner_common.h"
#include "common/macros.h"
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
class TimerFlow { class TimerFlow {
public: public:
static const int MAX_SIZE = 1024; static const int MAX_SIZE = 1024;
TimerFlow() { TimerFlow() { init(); }
init();
void init() {
_csize = 0;
_name = NULL;
_started = false;
_auto = false;
}
explicit TimerFlow(const char* name) : _csize(0), _name(name) {
_last = _start = butil::cpuwide_time_us();
_auto = true;
_started = true;
}
void set_name(const char* name) { _name = name; }
void start() {
_last = _start = butil::cpuwide_time_us();
_started = true;
}
bool check(const char* tag) {
if (!_started) {
LOG(WARNING) << "Timer not started yet!";
return false;
} }
uint64_t now = butil::cpuwide_time_us();
void init() { if (!appendf("%s:%lu|", tag, now - _last)) {
_csize = 0; LOG(WARNING) << "Failed check timer: " << _name << ", value = [" << tag
_name = NULL; << ":" << (now - _last) << "]!";
_started = false; return false;
_auto = false;
} }
TimerFlow(const char* name) : _csize(0), _name(name) { _last = now;
_last = _start = butil::cpuwide_time_us(); return true;
_auto = true; }
_started = true;
}
void set_name(const char* name) { std::string info() { return std::string(_buf); }
_name = name;
}
void start() {
_last = _start = butil::cpuwide_time_us();
_started = true;
}
bool check(const char* tag) {
if (!_started) {
LOG(WARNING) << "Timer not started yet!";
return false;
}
uint64_t now = butil::cpuwide_time_us();
if (!appendf("%s:%lu|", tag, now - _last)) {
LOG(WARNING)
<< "Failed check timer: " << _name
<< ", value = [" << tag << ":"
<< (now - _last) << "]!";
return false;
}
_last = now;
return true;
}
std::string info() { void end() {
return std::string(_buf); uint64_t now = butil::cpuwide_time_us();
if (!appendf("total:%lu", now - _start)) {
LOG(WARNING) << "Failed dump time_info[" << _name << "]";
} }
_started = false;
}
void end() { ~TimerFlow() {
uint64_t now = butil::cpuwide_time_us(); if (!_auto) {
if (!appendf("total:%lu", now - _start)) { return;
LOG(WARNING) << "Failed dump time_info[" << _name << "]";
}
_started = false;
} }
uint64_t now = butil::cpuwide_time_us();
~TimerFlow() { if (appendf("total:%lu,%s", now - _start, _name)) {
if (!_auto) { LOG(INFO) << " " << _name << "_tc=[" << _buf << "]";
return; } else {
} LOG(WARNING) << "Failed dump time_info[" << _name << "]";
uint64_t now = butil::cpuwide_time_us();
if (appendf("total:%lu,%s", now - _start, _name)) {
LOG(INFO)
<< " " << _name << "_tc=[" << _buf << "]";
} else {
LOG(WARNING) << "Failed dump time_info[" << _name << "]";
}
} }
}
private:
bool appendf(const char* fmt, ...) { private:
va_list ap; bool appendf(const char* fmt, ...) {
va_start(ap, fmt); va_list ap;
try { va_start(ap, fmt);
int bytes = vsnprintf(_buf + _csize, MAX_SIZE - _csize, fmt, ap); try {
if (bytes >= MAX_SIZE - _csize || bytes < 0) { int bytes = vsnprintf(_buf + _csize, MAX_SIZE - _csize, fmt, ap);
LOG(WARNING) << "Overflow when appendf!"; if (bytes >= MAX_SIZE - _csize || bytes < 0) {
return false; LOG(WARNING) << "Overflow when appendf!";
} return false;
_csize += bytes; }
} CATCH_ANY_AND_RET(false); _csize += bytes;
va_end(ap);
return true;
} }
CATCH_ANY_AND_RET(false);
private:
char _buf[1024]; va_end(ap);
int _csize; return true;
uint64_t _start; }
uint64_t _last;
const char* _name; private:
bool _started; char _buf[1024];
bool _auto; int _csize;
uint64_t _start;
uint64_t _last;
const char* _name;
bool _started;
bool _auto;
}; };
template<bool flag> template <bool flag>
struct derived_from_message {}; struct derived_from_message {};
template<typename T, typename TBase> template <typename T, typename TBase>
class TIsDerivedFromB { class TIsDerivedFromB {
private: private:
static uint8_t check(TBase*) { static uint8_t check(TBase*) { return 1; }
return 1;
}
static uint32_t check(void*) { static uint32_t check(void*) { return 0; }
return 0;
}
public: public:
enum { enum {
// function call cannot apprear in a constant-expression // function call cannot apprear in a constant-expression
RESULT = (sizeof(uint8_t) == sizeof(check((T*)(NULL)))), RESULT = (sizeof(uint8_t) == sizeof(check(reinterpret_cast<T*>(NULL)))),
}; };
}; };
template<typename TBase> template <typename TBase>
class IsDerivedFrom { class IsDerivedFrom {
private: private:
static bool check(TBase*) { static bool check(TBase*) { return true; }
return true;
}
static bool check(void*) { static bool check(void*) { return false; }
return false;
}
public: public:
template<typename T> template <typename T>
static bool yes(T* x) { static bool yes(T* x) {
return check(x); return check(x);
} }
}; };
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
#endif
#!/bin/bash
# 启动路径
start_path="$(pwd)"
sh build.sh stop
# 定位到cts目录下
cd "$(dirname "$0")"/
if [[ "x"$@ = x*--module_name=* ]]
then
all_arg=$@
tmp=${all_arg##*--module_name=}
mod_name=${tmp%% *}
sed -i "/^run_mod=/s/run_mod.*/run_mod=$mod_name/" install-all.conf
else
sed -i "/^run_mod=/s/run_mod.*/run_mod=lr_engine/" install-all.conf
fi
env_num=`grep env_num install-all.conf | awk -F '=' '{print $2}'`
# 设置环境变量
export PATH="$(pwd)"/frame/tools/python27/bin:$PATH
export PYTHONPATH="$(pwd)"
alias | grep "alias cp=" >/dev/null
if [ $? -eq 0 ];then
unalias cp
fi
# 回到启动路径,执行main.py
cd "$start_path"
mem_free=`free -m | awk '{print $4}'| head -3 | awk 'END{print}'`
let thread_max=$mem_free/5000
if [ $thread_max -eq 0 ];then
echo "系统内存不足, 不能运行任何case"
exit 1
fi
if [ $thread_max -lt $env_num ];then
env_num=$thread_max
echo "目前系统内存最多支持运行$env_num个线程"
fi
temp_args="--paral=$env_num"
python "$(dirname "$0")"/control/main.py $temp_args $@
ret=$?
sh build.sh stop
if [ $ret -ne 0 ]
then
exit 1
fi
#!/bin/bash
function cfont()
{
while (($#!=0))
do
case $1 in
-b)
echo -ne " ";
;;
-t)
echo -ne "\t";
;;
-n)
echo -ne "\n";
;;
-black)
echo -ne "\033[30m";
;;
-red)
echo -ne "\033[31m";
echo -ne "\033[1m";
;;
-green)
echo -ne "\033[32m";
echo -ne "\033[1m";
;;
-yellow)
echo -ne "\033[33m";
;;
-blue)
echo -ne "\033[34m";
echo -ne "\033[1m";
;;
-purple)
echo -ne "\033[35m";
;;
-cyan)
echo -ne "\033[36m";
echo -ne "\033[1m";
;;
-white|-gray)
echo -ne "\033[37m";
;;
-reset)
echo -ne "\033[0m";
;;
-h|-help|--help)
echo "Usage: cfont -color1 message1 -color2 message2 ...";
echo "eg: cfont -red [ -blue message1 message2 -red ]";
;;
*)
echo -ne "$1"
;;
esac
shift
done
echo -ne "\033[0m";
}
cur_path=`pwd`
work_root=${cur_path%%/baidu/*}
CITOOLS="${work_root}/baidu/fengchao-qa/citools"
if [ ! -e ${CITOOLS}/lib/localbuild_lib.sh ];then
cfont -blue "=============== localbuild_lib.sh is not exist, downloading ...================" -n
git clone ssh://git@icode.baidu.com:8235/baidu/fengchao-qa/citools $CITOOLS >/dev/null
fi
source ${CITOOLS}/lib/localbuild_lib.sh
function get_framework_baseenv()
{
onlineFtp="ftp://tc-orp-app2.tc.baidu.com/home/heqing"
wgetOptions="--tries=3 --retry-connrefused -r -l0 -nv --limit-rate=50m -nH"
cfont -blue "##################################################" -n ;
cfont -blue "### build pdserving_framework xts base env ###" -n ;
cfont -blue "##################################################" -n ;
cfont -reset;
run_path="$(grep "run_path" "./install-all.conf" | cut -d "=" -f 2)"
cd $run_path
wget $wgetOptions --cut-dirs=4 "$onlineFtp"/scmbak/pdserving/framework_tester -o wget.log
ret=$?
retry=0
while [[ $retry -lt 3 ]]; do
if [[ $ret -eq 0 ]];then
break;
fi
wget $wgetOptions --cut-dirs=4 "$onlineFtp"/scmbak/pdserving/framework_tester -o wget.log
ret=$?
((retry++))
done
[[ $ret -ne 0 ]] && return 1
cfont -blue "[XTS] " -green "[ finish download: pdserving-framework ]" -n
cd -
return 0
}
# 搭建cts环境
function build_ctsenv()
{
# 搭建cts环境
if [ -z $1 ]; then
ENV_NUM=0
else
ENV_NUM=$1
fi
#更新安装配置设置
hostname=$(uname -n)
username="$(echo "`whoami`" | awk '{print $1}')"
LIBPATH=${PWD}/lib
echo "libpath is : $LIBPATH"
# 生成install-all.conf
{
echo "[config]"
echo "host=$hostname"
echo "user=$username"
echo "passwd=CAPHI2008"
echo "env_file=${PWD}/envfile"
echo "lib_path=$LIBPATH"
echo "run_path=${PWD}/run_env"
echo "env_num=$ENV_NUM"
} > ./install-all.conf
# 安装cts环境
{
cfont -blue "============= predictor env install =============" -n
rm -rf run_env && mkdir -p run_env
echo "current path is :${cur_path}"
#get_framework_baseenv
#if [ $? -ne 0 ]; then
# echo "pdserving-framework is not ready!!!"
# exit 1
#fi
mkdir -p run_env/predictor/bin
mkdir -p run_env/predictor/conf
# 拷贝pdserving到环境中
[[ -e ../output/bin/pdserving ]] && cp -rf ../output/bin/pdserving run_env/predictor/bin/predictor
[[ -e ../output/lib ]] && cp -rf ../output/lib/ run_env/predictor/
[[ -e ../conf ]] && cp -rf ../conf/* run_env/predictor/conf/
#搭建并行环境
if [ $ENV_NUM -ne 0 ]; then
cfont -blue "=============== build multi env ===============" -n
mkdir -p ${PWD}/run_env/1
mv -f ${PWD}/run_env/framework_tester ${PWD}/run_env/1/framework_tester
mv -f ${PWD}/run_env/model ${PWD}/run_env/1/model
mv -f ${PWD}/run_env/dict ${PWD}/run_env/1/dict
for ((i=2; i<=$ENV_NUM; i=i+1))
do
cp -rf ${PWD}/run_env/1 ${PWD}/run_env/$i
done
fi
}
#安装XTS环境
{
echo "now pwd is :`pwd`"
cfont -blue "=============== XTS(cts) install ================" -n
svn co https://svn.baidu.com/general-test/trunk/xts/frame frame> /dev/null
svn co https://svn.baidu.com/general-test/trunk/xts/im/core/control control>/dev/null
echo "now dir list is :`ls`"
cd lib
svn co https://svn.baidu.com/general-test/trunk/xts/im/core/lib/commonlib commonlib>/dev/null
cd -
}
cfont -blue "[XTS] " -green "[ finish XTS(cts) install ]" -n
onlineFtp="ftp://tc-orp-app2.tc.baidu.com/home/heqing"
wgetOptions="--tries=3 --retry-connrefused -r -l0 -nv --limit-rate=50m -nH"
#安装bidinfo 和基础protolib
{
cd lib
[[ -e bidinfo ]] && rm -rf bidinfo
[[ -e protolib ]] && rm -rf protolib
[[ -e pluginlib ]] && rm -rf pluginlib
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/bidinfo -o wget.log
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/protolib -o wget.log
wget $wgetOptions --cut-dirs=6 "$onlineFtp"/scmbak/common_lib/pdserving_cts/framework/pluginlib -o wget.log
cd -
}
#安装protolib
{
cfont -blue "============== protoc install ==================" -n
[[ -e protoc_tools ]] && rm -rf protoc_tools
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/protoc_tools -o wget.log
[[ -e ../proto ]] && cp -rf ../proto/* ./protoc_tools/proto/
cd protoc_tools
chmod +x ./protobuf-2.4.1/bin/protoc
chmod +x ./protobuf-2.4.1/lib/*
[[ -e protolib ]] && rm -rf protolib
mkdir ./protolib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:`pwd`/protobuf-2.4.1/lib
./protobuf-2.4.1/bin/protoc -I=./proto --python_out=./protolib/ ./proto/*.proto
cd -
cp ./protoc_tools/protolib/*.py ./lib/protolib/
}
cfont -reset
return 0
}
function get_pid
{
local prog=$1
local user=$2
local prog_path=$3
local ret=-1
local trash_path="/home/$(echo "`whoami`" | awk '{print $1}')/.__trash/"
pids=`pgrep $prog -u $user`
for pid in $pids
do
tmp_path=`ls -l /proc/$pid/exe 2>/dev/null | awk '{print $NF}'`
if [ "$tmp_path" == "$prog_path" ] || [ ! -e $tmp_path ] || [ 0 == `echo $tmp_path | grep -qs $trash_path;echo $?` ]
then
echo $pid
ret=0
fi
done
return $ret
}
function kill_prog()
{
name=$1
username=$2
prog_path=$3
pids=`get_pid $name $username $prog_path`
echo $pids>/dev/null
if [ $? -eq 0 ] ;then
for pid in $pids
do
#echo "$name,$pid"
kill -9 $pid
done
fi
}
function kill_predictor_prog()
{
username="$(echo "`whoami`" | awk '{print $1}')"
if [ -f install-all.conf ]
then
env_num=`grep env_num= install-all.conf|awk -F '=' '{print $2}'`
else
env_num=0
fi
for ((i=0; i<=$env_num; i=i+1))
do
if [ $i -eq 0 ]
then
run_path="${PWD}/run_env"
else
run_path="${PWD}/run_env/$i"
fi
kill_prog predictor $username $run_path/framework_tester/bin/predictor
done
}
function clean_ctsenv()
{
rm -rf install-all.conf ccover
rm -rf run_env fail_env output log frame control lib/commonlib lib/protolib
return 0
}
if [ $# -eq 1 ] && [ $1 == "clean" ]
then
clean_ctsenv
exit 0
fi
if [ $# -eq 1 ] && [ $1 == "stop" ]
then
kill_predictor_prog
exit 0
fi
clean_ctsenv
build_ctsenv "$1"
exit $?
#!/usr/bin/env python
# -*- coding:gbk -*-
"""
case created by templete
"""
import sys
sys.path.append(r'./lib/protolib')
print("sys path is : %s " % str(sys.path))
import os
import json
import commands
from lib.protolib.dense_service_pb2 import Request
from lib.protolib.dense_service_pb2 import Response
from lib.pluginlib.plugin_util import Util as ut
from lib.pluginlib.plugin_case import PluginCase
from lib.pluginlib.plugin_module import PluginModule
from lib.pluginlib.plugin_apistub import ApiStub
class TestDenseService(PluginCase):
"""test wtitleq case class"""
OWNER="zhangwenbo03"
quick=['ALL']
low=[]
daily=[]
ignorelist=[]
RESTART=True
def setUp(self):
"""setup something before run case"""
pass
def tearDown(self):
"""tear down after run case"""
self.t.stop()
print "stop finished"
pass
def testDemoCase(self):
"""demo case"""
req = Request()
denseIns = req.instances.add()
denseIns.features.append(10)
denseIns.features.append(13)
denseIns.features.append(200)
service = "BuiltinDenseFormatService"
type = "debug"
ut_obj = ut()
dict_val = ut_obj.pb2dict(req)
json_val = ut_obj.dict2json(dict_val)
self.t.restart()
self.t.tester.sendJsonData(json_val, service, type)
print "execute demo case"
"""plugin register """
from lib.plugin_tester import *
#!/usr/bin/env python
# -*- coding:gbk -*-
"""
注册类:RegxxxConfData,RegxxxReq,RegxxxXbox,RegxxxAd,xxx为组件名
"""
from lib.pluginlib.plugin_common import ConfData
from lib.pluginlib.plugin_common import TreeConfData
from lib.pluginlib.plugin_common import CommonIndex
class RegpredictorConfData(object):
"""
注册wtitleq组件的conf和data文件
"""
def __init__(self, path):
self.path = path
self.conf = {}
self.data = {}
self.conf['ub'] = ConfData(path=self.path + "/conf/ub.conf", connect_flag=":")
self.data['lr_model'] = CommonIndex(path=self.path + \
'/data/lr-model/wtitleq_model_file.sign',
col_list=['key', 'value'],
format='B')
class RegpredictorReq(object):
"""
注册wtitleq组件的默认请求
"""
def __init__(self):
self.plugin_term = {}
cmd_tag = 'cmd_tag0'
query_schema_list = []
query_value_list = []
pair_schema_list = ['query',
'wadptid',
'wbwsid',
'omit_buf',
'title',
'desc',
'cmatch',
'bidword',
'dynamic_new_title']
pair_value_list = ['鲜花',
'0',
'3',
'鲜花',
'鲜花%2C本地实体鲜花店100%25保证%21',
'鲜花品质100%25%2C主城最快2小时送到%2C全天24时在线订花%21市区内免费送花上门%21鲜%2E%2E',
'223',
'鲜花',
'美丽鲜花']
cmd_str = '/titleq/wise/ctr'
req_term = {"query_schema": query_schema_list,
"pair_schema": pair_schema_list,
"query_value": query_value_list,
"pair_value": pair_value_list,
"cmd": cmd_str}
self.plugin_term.update({cmd_tag: req_term})
self.plugin_list = self.plugin_term.keys()
class RegpredictorNewXbox(object):
"""
注册wtitleq组件的xbox
"""
def __init__(self):
self.need_xbox = True
self.stub_conf = 'xboxstub.conf'
self.stub_name = 'xboxstub'
self.conf_list = ['xbox-wtitleq_pegasus.conf']
class RegpredictorAd(object):
"""
注册wtitleq组件是否需要构造广告库
"""
def __init__(self):
self.need_adstub = False
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once #pragma once
#include <boost/bind.hpp>
#include <butil/atomicops.h> #include <butil/atomicops.h>
#include <sys/syscall.h>
#include <boost/bind.hpp>
#include "common/inner_common.h" #include "common/inner_common.h"
#include <sys/syscall.h>
namespace im { namespace im {
namespace bsf { namespace bsf {
template<typename TaskT> template <typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) { void* TaskExecutor<TaskT>::thread_entry(void* args) {
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args); ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
TaskExecutor<TaskT>* executor = static_cast<TaskExecutor<TaskT>*>(context->executor); TaskExecutor<TaskT>* executor =
executor->work(context); static_cast<TaskExecutor<TaskT>*>(context->executor);
executor->work(context);
return NULL; return NULL;
} }
template<typename TaskT> template <typename TaskT>
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) { int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
_stop = false; _stop = false;
if (!_thread_contexts.empty()) { if (!_thread_contexts.empty()) {
LOG(WARNING) << "BSF has started"; LOG(WARNING) << "BSF has started";
return 0; return 0;
} }
if (thread_num == 0) { if (thread_num == 0) {
LOG(ERROR) << "cannot init BSF with zero thread"; LOG(ERROR) << "cannot init BSF with zero thread";
return -1; return -1;
} }
ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num]; ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
for (uint32_t i = 0; i < thread_num; ++i) { for (uint32_t i = 0; i < thread_num; ++i) {
contexts[i].executor = this; contexts[i].executor = this;
if (_user_thread_contexts != NULL) { if (_user_thread_contexts != NULL) {
contexts[i].user_thread_context = _user_thread_contexts[i]; contexts[i].user_thread_context = _user_thread_contexts[i];
}
int rc = THREAD_CREATE(
&contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
if (rc != 0) {
LOG(ERROR) << "failed to create BSF worker thread: index=" << i << ", rc=" << rc << ", errno=" << errno << ":" << strerror(errno);
return -1;
}
_thread_contexts.push_back(&contexts[i]);
} }
int init_timeout = init_timeout_sec * 1000 * 1000; int rc = THREAD_CREATE(
bool has_error = false; &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
if (rc != 0) {
bool has_timeout = true; LOG(ERROR) << "failed to create BSF worker thread: index=" << i
if (init_timeout == 0) { << ", rc=" << rc << ", errno=" << errno << ":"
has_timeout = false; << strerror(errno);
return -1;
} }
while (!has_timeout || init_timeout > 0) { _thread_contexts.push_back(&contexts[i]);
bool done = true; }
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
if (_thread_contexts[i]->init_status < 0) {
has_error = true;
break;
}
if (_thread_contexts[i]->init_status == 0) {
done = false;
}
}
if (has_error) {
LOG(ERROR) << "BSF thread init error";
return -1;
}
if (done) {
LOG(INFO) << "BSF thread init done";
return 0;
}
// 100ms
const int sleep_interval = 100 * 1000;
usleep(sleep_interval);
init_timeout -= sleep_interval;
}
LOG(ERROR) << "BSF thread init timed out"; int init_timeout = init_timeout_sec * 1000 * 1000;
return -1; bool has_error = false;
}
template<typename TaskT> bool has_timeout = true;
void TaskExecutor<TaskT>::stop() { if (init_timeout == 0) {
_stop = true; has_timeout = false;
for (size_t i = 0; i < _thread_contexts.size(); ++i) { }
THREAD_CANCEL(_thread_contexts[i]->tid);
}
while (!has_timeout || init_timeout > 0) {
bool done = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) { for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_JOIN(_thread_contexts[i]->tid, NULL); if (_thread_contexts[i]->init_status < 0) {
} has_error = true;
_thread_contexts.clear(); break;
} }
template<typename TaskT> if (_thread_contexts[i]->init_status == 0) {
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule( done = false;
const InArrayT& in, OutArrayT& out) { }
TaskT* task = butil::get_object<TaskT>();
if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool";
return TaskHandler<TaskT>::valid_handle();
} }
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) { if (has_error) {
LOG(ERROR) << "Invalid input & output"; LOG(ERROR) << "BSF thread init error";
return TaskHandler<TaskT>::valid_handle(); return -1;
} }
int fds[2]; if (done) {
int rc = pipe(fds); LOG(INFO) << "BSF thread init done";
if (rc != 0) { return 0;
LOG(ERROR) << "call pipe() failed, errno=" << errno << ":" << strerror(errno);
return TaskHandler<TaskT>::valid_handle();
} }
task->read_fd = fds[0]; // 100ms
task->write_fd = fds[1]; const int sleep_interval = 100 * 1000;
task->owner_tid = ::syscall(SYS_gettid); usleep(sleep_interval);
init_timeout -= sleep_interval;
}
task->in = &in; LOG(ERROR) << "BSF thread init timed out";
task->out = &out; return -1;
task->rem = in.size(); }
task->size = in.size();
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut); template <typename TaskT>
_task_queue.push_back(task); void TaskExecutor<TaskT>::stop() {
THREAD_COND_SIGNAL(&_cond); _stop = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_CANCEL(_thread_contexts[i]->tid);
}
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_JOIN(_thread_contexts[i]->tid, NULL);
}
_thread_contexts.clear();
}
return TaskHandler<TaskT>(*task); template <typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(const InArrayT& in,
OutArrayT& out) { // NOLINT
TaskT* task = butil::get_object<TaskT>();
if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool";
return TaskHandler<TaskT>::valid_handle();
}
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
LOG(ERROR) << "Invalid input & output";
return TaskHandler<TaskT>::valid_handle();
}
int fds[2];
int rc = pipe(fds);
if (rc != 0) {
LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
<< strerror(errno);
return TaskHandler<TaskT>::valid_handle();
}
task->read_fd = fds[0];
task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid);
task->in = &in;
task->out = &out;
task->rem = in.size();
task->size = in.size();
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut);
_task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond);
return TaskHandler<TaskT>(*task);
} }
template<typename TaskT> template <typename TaskT>
bool TaskExecutor<TaskT>::fetch_batch(BatchTasks<TaskT>& batch) { bool TaskExecutor<TaskT>::fetch_batch(BatchTasks<TaskT>& batch) { // NOLINT
AutoMutex lock(_mut); AutoMutex lock(_mut);
while (_task_queue.empty()) { while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut); THREAD_COND_WAIT(&_cond, &_mut);
} }
if (_task_queue.empty()) { if (_task_queue.empty()) {
LOG(ERROR) << "invalid task queue!"; LOG(ERROR) << "invalid task queue!";
return false; return false;
} }
while (!_task_queue.empty()) { while (!_task_queue.empty()) {
TaskT* task = _task_queue.front(); TaskT* task = _task_queue.front();
size_t rem = batch.append_task(task); size_t rem = batch.append_task(task);
if (task->rem <= 0) { if (task->rem <= 0) {
_task_queue.pop_front(); _task_queue.pop_front();
}
if (rem <= 0) break;
} }
if (rem <= 0) break;
}
return true; return true;
} }
template<typename TaskT> template <typename TaskT>
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) { int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
if (_thread_init_fn != NULL) { if (_thread_init_fn != NULL) {
if (_thread_init_fn(context->user_thread_context) != 0) { if (_thread_init_fn(context->user_thread_context) != 0) {
LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit"; LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
context->init_status = -1; context->init_status = -1;
return -1; return -1;
} else { } else {
LOG(INFO) << "execute thread init thunk succeed"; LOG(INFO) << "execute thread init thunk succeed";
}
} }
}
context->init_status = 1; context->init_status = 1;
while (!_stop) { while (!_stop) {
if (_thread_reset_fn != NULL) { if (_thread_reset_fn != NULL) {
if (_thread_reset_fn(context->user_thread_context) != 0) { if (_thread_reset_fn(context->user_thread_context) != 0) {
LOG(ERROR) << "execute user thread reset failed"; LOG(ERROR) << "execute user thread reset failed";
} }
}
BatchTasks<TaskT> batch(_batch_size, _batch_align);
if (fetch_batch(batch)) {
batch.merge_tasks();
_fn(batch.in(), batch.out());
batch.notify_tasks();
}
} }
return 0; BatchTasks<TaskT> batch(_batch_size, _batch_align);
if (fetch_batch(batch)) {
batch.merge_tasks();
_fn(batch.in(), batch.out());
batch.notify_tasks();
}
}
return 0;
} }
template<typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const InArrayT& in, bool TaskManager<InItemT, OutItemT>::schedule(const InArrayT& in,
OutArrayT& out) { OutArrayT& out) { // NOLINT
TaskHandler<TaskT> handler = _executor.schedule(in, out); TaskHandler<TaskT> handler = _executor.schedule(in, out);
if (handler.valid()) { if (handler.valid()) {
_task_owned = handler; _task_owned = handler;
return true; return true;
} else { } else {
LOG(ERROR) << "failed to schedule task"; LOG(ERROR) << "failed to schedule task";
return false; return false;
} }
} }
template<typename InItemT, typename OutItemT> template <typename InItemT, typename OutItemT>
void TaskManager<InItemT, OutItemT>::wait() { void TaskManager<InItemT, OutItemT>::wait() {
char buffer[128]; char buffer[128];
while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
&& errno == EINTR) { errno == EINTR) {
; }
}
close(_task_owned.read_fd);
close(_task_owned.write_fd);
_task_owned.read_fd = -1; close(_task_owned.read_fd);
_task_owned.write_fd = -1; close(_task_owned.write_fd);
return;
}
_task_owned.read_fd = -1;
_task_owned.write_fd = -1;
return;
} }
} } // namespace bsf
} // namespace im
此差异已折叠。
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_CHANNEL_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_CHANNEL_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <utility>
#include "common/inner_common.h" #include "common/inner_common.h"
namespace baidu { namespace baidu {
...@@ -10,242 +24,202 @@ namespace predictor { ...@@ -10,242 +24,202 @@ namespace predictor {
class Channel; class Channel;
class Bus { class Bus {
public: public:
Bus() { Bus() { clear(); }
clear();
}
int regist(const std::string& op, Channel* channel) { int regist(const std::string& op, Channel* channel) {
std::pair<boost::unordered_map<std::string, Channel*>::iterator, bool> r std::pair<boost::unordered_map<std::string, Channel*>::iterator, bool> r =
= _op_channels.insert(std::make_pair(op, channel)); _op_channels.insert(std::make_pair(op, channel));
if (!r.second) { if (!r.second) {
LOG(ERROR) << "Failed insert op&channel into bus:" << op; LOG(ERROR) << "Failed insert op&channel into bus:" << op;
return -1; return -1;
}
return 0;
} }
return 0;
}
Channel* channel_by_name(const std::string& op_name) { Channel* channel_by_name(const std::string& op_name) {
typename boost::unordered_map<std::string, Channel*>::iterator it typename boost::unordered_map<std::string, Channel*>::iterator it =
= _op_channels.find(op_name); _op_channels.find(op_name);
if (it == _op_channels.end()) { if (it == _op_channels.end()) {
LOG(WARNING) LOG(WARNING) << "Not found channel in bus, op_name:" << op_name << ".";
<< "Not found channel in bus, op_name:" return NULL;
<< op_name << ".";
return NULL;
}
return it->second;
} }
void clear() { return it->second;
_op_channels.clear(); }
}
size_t size() const { void clear() { _op_channels.clear(); }
return _op_channels.size();
} size_t size() const { return _op_channels.size(); }
private: private:
boost::unordered_map<std::string, Channel*> _op_channels; boost::unordered_map<std::string, Channel*> _op_channels;
}; };
class Channel { class Channel {
public: public:
Channel() {} Channel() {}
void init(uint32_t id, const char* op) { void init(uint32_t id, const char* op) {
_id = id; _id = id;
_op = std::string(op); _op = std::string(op);
clear_data(); clear_data();
} }
void deinit() { void deinit() { clear_data(); }
clear_data();
}
uint32_t id() const { uint32_t id() const { return _id; }
return _id;
}
const std::string& op() { const std::string& op() { return _op; }
return _op;
}
int share_to_bus(Bus* bus) { int share_to_bus(Bus* bus) {
if (bus->regist(_op, this) != 0) { if (bus->regist(_op, this) != 0) {
LOG(ERROR) LOG(ERROR) << "Failed regist channel[" << _op << "] to bus!";
<< "Failed regist channel[" << _op return -1;
<< "] to bus!";
return -1;
}
return 0;
} }
virtual void clear_data() = 0; return 0;
}
virtual void clear_data() = 0;
virtual void* param() = 0; virtual void* param() = 0;
virtual const void* param() const = 0; virtual const void* param() const = 0;
virtual google::protobuf::Message* message() = 0; virtual google::protobuf::Message* message() = 0;
virtual const google::protobuf::Message* message() const = 0; virtual const google::protobuf::Message* message() const = 0;
virtual Channel& operator=(const Channel& channel) = 0; virtual Channel& operator=(const Channel& channel) = 0;
virtual std::string debug_string() const = 0; virtual std::string debug_string() const = 0;
private: private:
uint32_t _id; uint32_t _id;
std::string _op; std::string _op;
}; };
template<typename T> template <typename T>
class OpChannel : public Channel { class OpChannel : public Channel {
public: public:
OpChannel() { OpChannel() {}
}
void clear_data() { void clear_data() { _data.Clear(); }
_data.Clear();
}
void* param() { void* param() { return &_data; }
return &_data;
}
const void* param() const { const void* param() const { return &_data; }
return &_data;
}
google::protobuf::Message* message() { google::protobuf::Message* message() {
return message_impl(derived_from_message< return message_impl(
TIsDerivedFromB<T, google::protobuf::Message>::RESULT>()); derived_from_message<
} TIsDerivedFromB<T, google::protobuf::Message>::RESULT>());
}
google::protobuf::Message* message_impl(derived_from_message<true>) { google::protobuf::Message* message_impl(derived_from_message<true>) {
return dynamic_cast<const google::protobuf::Message*>(&_data); return dynamic_cast<const google::protobuf::Message*>(&_data);
} }
google::protobuf::Message* message_impl(derived_from_message<false>) { google::protobuf::Message* message_impl(derived_from_message<false>) {
LOG(ERROR) << "Current type: " << typeid(T).name() LOG(ERROR) << "Current type: " << typeid(T).name()
<< " is not derived from protobuf."; << " is not derived from protobuf.";
return NULL; return NULL;
} }
const google::protobuf::Message* message() const { const google::protobuf::Message* message() const {
return message_impl(derived_from_message< return message_impl(
TIsDerivedFromB<T, google::protobuf::Message>::RESULT>()); derived_from_message<
} TIsDerivedFromB<T, google::protobuf::Message>::RESULT>());
}
const google::protobuf::Message* message_impl(derived_from_message<true>) const { const google::protobuf::Message* message_impl(
return dynamic_cast<const google::protobuf::Message*>(&_data); derived_from_message<true>) const {
} return dynamic_cast<const google::protobuf::Message*>(&_data);
}
const google::protobuf::Message* message_impl(derived_from_message<false>) const { const google::protobuf::Message* message_impl(
LOG(ERROR) << "Current type: " << typeid(T).name() derived_from_message<false>) const {
<< " is not derived from protobuf."; LOG(ERROR) << "Current type: " << typeid(T).name()
return NULL; << " is not derived from protobuf.";
} return NULL;
}
Channel& operator=(const Channel& channel) {
_data = *(dynamic_cast<const OpChannel<T>&>(channel)).data();
return *this;
}
std::string debug_string() const { Channel& operator=(const Channel& channel) {
return _data.ShortDebugString(); _data = *(dynamic_cast<const OpChannel<T>&>(channel)).data();
} return *this;
}
// functions of derived class
T* data() { std::string debug_string() const { return _data.ShortDebugString(); }
return &_data;
}
const T* data() const { // functions of derived class
return &_data;
}
Channel& operator=(const T& obj) { T* data() { return &_data; }
_data = obj;
return *this;
}
private: const T* data() const { return &_data; }
T _data;
Channel& operator=(const T& obj) {
_data = obj;
return *this;
}
private:
T _data;
}; };
template<> template <>
class OpChannel<google::protobuf::Message> : public Channel { class OpChannel<google::protobuf::Message> : public Channel {
public: public:
OpChannel<google::protobuf::Message>() : _data(NULL) { OpChannel<google::protobuf::Message>() : _data(NULL) {}
}
virtual ~OpChannel<google::protobuf::Message>() { virtual ~OpChannel<google::protobuf::Message>() { _data = NULL; }
_data = NULL;
}
void clear_data() { void clear_data() { _data = NULL; }
_data = NULL;
}
void* param() { void* param() { return const_cast<void*>((const void*)_data); }
return const_cast<void*>((const void*)_data);
}
const void* param() const { const void* param() const { return _data; }
return _data;
}
google::protobuf::Message* message() { google::protobuf::Message* message() {
return const_cast<google::protobuf::Message*>(_data); return const_cast<google::protobuf::Message*>(_data);
} }
const google::protobuf::Message* message() const { const google::protobuf::Message* message() const { return _data; }
return _data;
}
Channel& operator=(const Channel& channel) { Channel& operator=(const Channel& channel) {
_data = channel.message(); _data = channel.message();
return *this; return *this;
} }
std::string debug_string() const { std::string debug_string() const {
if (_data) { if (_data) {
return _data->ShortDebugString(); return _data->ShortDebugString();
} else { } else {
return "{\"Error\": \"Null Message Ptr\"}"; return "{\"Error\": \"Null Message Ptr\"}";
}
} }
}
// derived function imiplements // derived function imiplements
google::protobuf::Message* data() { google::protobuf::Message* data() {
return const_cast<google::protobuf::Message*>(_data); return const_cast<google::protobuf::Message*>(_data);
} }
const google::protobuf::Message* data() const { const google::protobuf::Message* data() const { return _data; }
return _data;
}
OpChannel<google::protobuf::Message>& operator=( OpChannel<google::protobuf::Message>& operator=(
google::protobuf::Message* message) { google::protobuf::Message* message) {
_data = message; _data = message;
return *this; return *this;
} }
OpChannel<google::protobuf::Message>& operator=( OpChannel<google::protobuf::Message>& operator=(
const google::protobuf::Message* message) { const google::protobuf::Message* message) {
_data = message; _data = message;
return *this; return *this;
} }
private: private:
const google::protobuf::Message* _data; const google::protobuf::Message* _data;
}; };
} // predictor } // namespace predictor
} // paddle_serving } // namespace paddle_serving
} // baidu } // namespace baidu
#endif
此差异已折叠。
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#define BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "common/inner_common.h" #include "common/inner_common.h"
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
enum EdgeMode { enum EdgeMode { RO = 0, RW = 1, UNKNOWN };
RO = 0,
RW = 1,
UNKNOWN
};
struct DagNode { struct DagNode {
uint32_t id; uint32_t id;
uint32_t stage; uint32_t stage;
std::string name; // opname std::string name; // opname
std::string full_name; // workflow_stageindex_opname std::string full_name; // workflow_stageindex_opname
std::string type; std::string type;
void* conf; void* conf;
boost::unordered_map<std::string, EdgeMode> depends; boost::unordered_map<std::string, EdgeMode> depends;
}; };
struct DagStage { struct DagStage {
std::vector<DagNode*> nodes; std::vector<DagNode*> nodes;
std::string name; // stageindex std::string name; // stageindex
std::string full_name; // workflow_stageindex std::string full_name; // workflow_stageindex
}; };
class Dag { class Dag {
public: public:
Dag(); Dag();
virtual ~Dag(); virtual ~Dag();
EdgeMode parse_mode(std::string& mode); EdgeMode parse_mode(std::string& mode); // NOLINT
int init(const char* path, const char* file, const std::string& name);
int init(const configure::Workflow& conf, const std::string& name); int init(const char* path, const char* file, const std::string& name);
int deinit();
uint32_t nodes_size(); int init(const configure::Workflow& conf, const std::string& name);
const DagNode* node_by_id(uint32_t id); int deinit();
const DagNode* node_by_id(uint32_t id) const; uint32_t nodes_size();
const DagNode* node_by_name(std::string& name); const DagNode* node_by_id(uint32_t id);
const DagNode* node_by_name(const std::string& name) const; const DagNode* node_by_id(uint32_t id) const;
uint32_t stage_size(); const DagNode* node_by_name(std::string& name); // NOLINT
const DagStage* stage_by_index(uint32_t index); const DagNode* node_by_name(const std::string& name) const;
const std::string& name() const { uint32_t stage_size();
return _dag_name;
}
const std::string& full_name() const { const DagStage* stage_by_index(uint32_t index);
return _dag_name;
}
void regist_metric(const std::string& service_name); const std::string& name() const { return _dag_name; }
private:
int topo_sort();
private: const std::string& full_name() const { return _dag_name; }
std::string _dag_name;
boost::unordered_map<std::string, DagNode*> _name_nodes;
std::vector<DagNode*> _index_nodes;
std::vector<DagStage*> _stages;
};
} // predictor void regist_metric(const std::string& service_name);
} // paddle_serving
} // baidu private:
int topo_sort();
private:
std::string _dag_name;
boost::unordered_map<std::string, DagNode*> _name_nodes;
std::vector<DagNode*> _index_nodes;
std::vector<DagStage*> _stages;
};
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H } // namespace predictor
} // namespace paddle_serving
} // namespace baidu
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#include "predictor_metric.h" // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "predictor/framework/predictor_metric.h"
#include "butil/memory/singleton.h" #include "butil/memory/singleton.h"
namespace baidu { namespace baidu {
...@@ -6,9 +20,9 @@ namespace paddle_serving { ...@@ -6,9 +20,9 @@ namespace paddle_serving {
namespace predictor { namespace predictor {
PredictorMetric* PredictorMetric::GetInstance() { PredictorMetric* PredictorMetric::GetInstance() {
return Singleton<PredictorMetric>::get(); return Singleton<PredictorMetric>::get();
} }
} // namespace predictor } // namespace predictor
} // namespace paddle_serving } // namespace paddle_serving
} // namespace baidu } // namespace baidu
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
//syntax="proto2"; // Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// syntax="proto2";
package pds.ut; package pds.ut;
message OpMessageData { message OpMessageData {
optional int32 a = 1 [default=33]; optional int32 a = 1 [ default = 33 ];
optional float b = 2 [default=4.4]; optional float b = 2 [ default = 4.4 ];
}; };
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册