提交 f3c782b8 编写于 作者: W wangguibao

Reorganize ops&protos for demo

Change-Id: Ie2d6e600edcd0c1c9757510540d28a980076354d
上级 06f45ada
...@@ -28,41 +28,8 @@ target_include_directories(pdserving PUBLIC ...@@ -28,41 +28,8 @@ target_include_directories(pdserving PUBLIC
target_link_libraries(pdserving target_link_libraries(pdserving
brpc protobuf boost leveldb configure -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) brpc protobuf boost leveldb configure -lpthread -lcrypto -lm -lrt -lssl -ldl -lz)
add_executable(pdserving_exe ${pdserving_srcs})
set_source_files_properties(
${pdserving_srcs}
PROPERTIES
COMPILE_FLAGS "-Wno-strict-aliasing -Wno-unused-variable -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
add_dependencies(pdserving_exe
protobuf boost brpc leveldb pdcodegen configure)
target_include_directories(pdserving_exe PUBLIC
${CMAKE_CURRENT_LIST_DIR}/
${CMAKE_CURRENT_BINARY_DIR}/
${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include
)
target_link_libraries(pdserving_exe brpc protobuf leveldb
configure -lpthread -lcrypto -lm -lrt -lssl
-ldl -lz)
add_library(pdclient ${pdclient_srcs})
set_source_files_properties(
${pdclient_srcs}
PROPERTIES
COMPILE_FLAGS "-Wno-strict-aliasing -Wno-unused-variable -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
add_dependencies(pdclient protobuf boost brpc pdcodegen configure)
target_include_directories(pdclient PUBLIC
${CMAKE_CURRENT_LIST_DIR}/
${CMAKE_CURRENT_BINARY_DIR}/
${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include
)
target_link_libraries(pdclient protobuf boost brpc -lpthread -lcrypto -lm -lrt -lssl -ldl -lz)
# install # install
install(TARGETS pdclient pdserving pdcodegen install(TARGETS pdserving pdcodegen
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so
......
FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) FILE(GLOB common_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND pdserving_srcs ${common_srcs}) LIST(APPEND pdserving_srcs ${common_srcs})
LIST(APPEND pdclient_srcs ${common_srcs})
FILE(GLOB mempool_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) FILE(GLOB mempool_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND pdserving_srcs ${mempool_srcs}) LIST(APPEND pdserving_srcs ${mempool_srcs})
LIST(APPEND pdclient_srcs ${mempool_srcs})
FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND pdserving_srcs ${op_srcs}) LIST(APPEND pdserving_srcs ${op_srcs})
LIST(APPEND pdclient_srcs ${op_srcs})
...@@ -5,15 +5,9 @@ LIST(APPEND pdcodegen_srcs ${pdcodegen_proto_srcs}) ...@@ -5,15 +5,9 @@ LIST(APPEND pdcodegen_srcs ${pdcodegen_proto_srcs})
LIST(APPEND protofiles LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/./builtin_format.proto ${CMAKE_CURRENT_LIST_DIR}/./builtin_format.proto
${CMAKE_CURRENT_LIST_DIR}/./dense_service.proto
${CMAKE_CURRENT_LIST_DIR}/./echo_service.proto
${CMAKE_CURRENT_LIST_DIR}/./image_classification.proto
${CMAKE_CURRENT_LIST_DIR}/./int64tensor_service.proto
${CMAKE_CURRENT_LIST_DIR}/./msg_data.proto ${CMAKE_CURRENT_LIST_DIR}/./msg_data.proto
${CMAKE_CURRENT_LIST_DIR}/./sparse_service.proto
${CMAKE_CURRENT_LIST_DIR}/./xrecord_format.proto ${CMAKE_CURRENT_LIST_DIR}/./xrecord_format.proto
) )
PROTOBUF_GENERATE_SERVING_CPP(PROTO_SRCS PROTO_HDRS ${protofiles}) PROTOBUF_GENERATE_SERVING_CPP(PROTO_SRCS PROTO_HDRS ${protofiles})
LIST(APPEND pdserving_srcs ${PROTO_SRCS} ${pdcodegen_proto_srcs}) LIST(APPEND pdserving_srcs ${PROTO_SRCS} ${pdcodegen_proto_srcs})
LIST(APPEND pdclient_srcs ${PROTO_SRCS} ${pdcodegen_proto_srcs})
syntax="proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.image_classification;
option cc_generic_services = true;
message ClassifyResponse {
repeated baidu.paddle_serving.predictor.format.DensePrediction predictions = 1;
};
message Request {
repeated baidu.paddle_serving.predictor.format.XImageReqInstance instances = 1;
};
message Response {
// Each json string is serialized from ClassifyResponse predictions
repeated baidu.paddle_serving.predictor.format.XImageResInstance predictions = 1;
};
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_impl = true;
};
LIST(APPEND pdcodegen_srcs ${CMAKE_CURRENT_LIST_DIR}/pdcodegen.cpp) LIST(APPEND pdcodegen_srcs ${CMAKE_CURRENT_LIST_DIR}/pdcodegen.cpp)
LIST(APPEND pdserving_srcs ${CMAKE_CURRENT_LIST_DIR}/pdserving.cpp) LIST(APPEND pdserving_srcs ${CMAKE_CURRENT_LIST_DIR}/pdserving.cpp)
LIST(APPEND pdclient_srcs ${CMAKE_CURRENT_LIST_DIR}/pdclient.cpp)
// Copyright (c) 2014 baidu-rpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending requests to server every 1 second.
//
#include <fstream>
#include "dense_service.pb.h"
#include "image_classification.pb.h"
#include "sparse_service.pb.h"
#include "int64tensor_service.pb.h"
#include "common/utils.h"
#include "common/inner_common.h"
#include "common/constant.h"
#include "butil/logging.h"
DEFINE_string(attachment, "foo", "Carry this along with requests");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in protocol/baidu/rpc/options.proto");
DEFINE_bool(compress, true, "Enable compression");
//DEFINE_string(protocol, "http", "Protocol type. Defined in protocol/baidu/rpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8010", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
DEFINE_string(http_content_type, "application/json", "Content type of http request");
using baidu::paddle_serving::predictor::FLAGS_logger_path;
using baidu::paddle_serving::predictor::FLAGS_logger_file;
namespace dense_format {
using baidu::paddle_serving::predictor::dense_service::BuiltinDenseFormatService_Stub;
using baidu::paddle_serving::predictor::dense_service::Request;
using baidu::paddle_serving::predictor::dense_service::Response;
using baidu::paddle_serving::predictor::format::DenseInstance;
using baidu::paddle_serving::predictor::format::DensePrediction;
void send_dense_format(BuiltinDenseFormatService_Stub& stub, int log_id) {
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
baidu::paddle_serving::predictor::TimerFlow timer("dense");
Request dense_request;
Response dense_response;
// set request header
DenseInstance* ins = NULL;
ins = dense_request.mutable_instances()->Add();
ins->add_features(1.5);
ins->add_features(16.0);
ins->add_features(14.0);
ins->add_features(23.0);
timer.check("fill");
cntl.set_log_id(log_id ++); // set by user
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c") {
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
} else {
cntl.http_request().set_content_type(FLAGS_http_content_type);
}
if (FLAGS_compress) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.debug(&cntl, &dense_request, &dense_response, NULL);
timer.check("inference");
if (!cntl.Failed()) {
if (cntl.response_attachment().empty()) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << dense_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << dense_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
}
timer.check("dump");
}
} // namespace dense_format
namespace sparse_format {
using baidu::paddle_serving::predictor::sparse_service::BuiltinSparseFormatService_Stub;
using baidu::paddle_serving::predictor::sparse_service::Request;
using baidu::paddle_serving::predictor::sparse_service::Response;
using baidu::paddle_serving::predictor::format::SparseInstance;
using baidu::paddle_serving::predictor::format::SparsePrediction;
void send_sparse_format(BuiltinSparseFormatService_Stub& stub, int log_id) {
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
baidu::paddle_serving::predictor::TimerFlow timer("sparse");
Request sparse_request;
Response sparse_response;
// set request body
SparseInstance* ins = NULL;
ins = sparse_request.mutable_instances()->Add();
ins->add_keys(26);
ins->add_keys(182);
ins->add_keys(232);
ins->add_keys(243);
ins->add_keys(431);
ins->add_shape(2000);
ins->add_values(1);
ins->add_values(1);
ins->add_values(1);
ins->add_values(4);
ins->add_values(14);
ins = sparse_request.mutable_instances()->Add();
ins->add_keys(0);
ins->add_keys(182);
ins->add_keys(232);
ins->add_keys(243);
ins->add_keys(431);
ins->add_shape(2000);
ins->add_values(13);
ins->add_values(1);
ins->add_values(1);
ins->add_values(4);
ins->add_values(1);
timer.check("fill");
cntl.set_log_id(log_id ++); // set by user
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c") {
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
} else {
cntl.http_request().set_content_type(FLAGS_http_content_type);
}
if (FLAGS_compress) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.inference(&cntl, &sparse_request, &sparse_response, NULL);
timer.check("inference");
if (!cntl.Failed()) {
if (cntl.response_attachment().empty()) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << sparse_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << sparse_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us";
}
} else {
LOG(WARNING) << cntl.ErrorText();
}
timer.check("dump");
}
}
namespace fluid_format {
using baidu::paddle_serving::predictor::int64tensor_service::BuiltinFluidService_Stub;
using baidu::paddle_serving::predictor::int64tensor_service::Request;
using baidu::paddle_serving::predictor::int64tensor_service::Response;
using baidu::paddle_serving::predictor::format::Int64TensorInstance;
using baidu::paddle_serving::predictor::format::Float32TensorPredictor;
void send_fluid_format(BuiltinFluidService_Stub& stub, int log_id) {
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
baidu::paddle_serving::predictor::TimerFlow timer("fluid");
Request fluid_request;
Response fluid_response;
// set request header
Int64TensorInstance* ins = NULL;
ins = fluid_request.mutable_instances()->Add();
ins->add_data(15);
ins->add_data(160);
ins->add_data(14);
ins->add_data(23);
ins->add_data(18);
ins->add_data(39);
ins->add_shape(2);
ins->add_shape(3);
timer.check("fill");
cntl.set_log_id(log_id); // set by user
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c") {
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
} else {
cntl.http_request().set_content_type(FLAGS_http_content_type);
}
if (FLAGS_compress) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.debug(&cntl, &fluid_request, &fluid_response, NULL);
timer.check("inference");
if (!cntl.Failed()) {
if (cntl.response_attachment().empty()) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << fluid_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << fluid_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
}
timer.check("dump");
}
} // namespace fluid_format
namespace ximage_format {
char* g_image_buffer = NULL;
size_t g_image_size = 0;
std::string g_image_path = "./data/images/what.jpg";
using baidu::paddle_serving::predictor::image_classification::ImageClassifyService_Stub;
using baidu::paddle_serving::predictor::image_classification::Request;
using baidu::paddle_serving::predictor::image_classification::Response;
using baidu::paddle_serving::predictor::format::XImageReqInstance;
using baidu::paddle_serving::predictor::format::XImageResInstance;
void send_ximage_format(ImageClassifyService_Stub& stub, int log_id) {
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
baidu::paddle_serving::predictor::TimerFlow timer("ximage");
Request ximage_request;
Response ximage_response;
// set request header
std::ifstream fin(g_image_path.c_str(), std::ios::binary);
fin.seekg(0, std::ios::end);
int isize = fin.tellg();
if (g_image_size < isize || !g_image_buffer) {
g_image_buffer = new (std::nothrow) char[isize];
g_image_size = isize;
}
fin.seekg(0, std::ios::beg);
fin.read(g_image_buffer, sizeof(char) * isize);
fin.close();
timer.check("read");
XImageReqInstance* ins = ximage_request.mutable_instances()->Add();
ins->set_image_binary(g_image_buffer, isize);
ins->set_image_length(isize);
timer.check("fill");
cntl.set_log_id(log_id ++); // set by user
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c") {
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
} else {
cntl.http_request().set_content_type(FLAGS_http_content_type);
}
if (FLAGS_compress) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.inference(&cntl, &ximage_request, &ximage_response, NULL);
timer.check("inference");
if (!cntl.Failed()) {
if (cntl.response_attachment().empty()) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << ximage_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << ximage_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
}
timer.check("dump");
if (g_image_buffer) {
delete[] g_image_buffer;
g_image_buffer = NULL;
}
}
} // namespace ximage_format
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
// initialize logger instance
google::InitGoogleLogging(strdup(argv[0]));
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// Normally, you should not call a Channel directly, but instead construct
// a stub Service wrapping it. stub can be shared by all threads as well.
baidu::paddle_serving::predictor::sparse_service::BuiltinSparseFormatService_Stub
stub1(&channel);
baidu::paddle_serving::predictor::dense_service::BuiltinDenseFormatService_Stub
stub2(&channel);
baidu::paddle_serving::predictor::int64tensor_service::BuiltinFluidService_Stub
stub3(&channel);
baidu::paddle_serving::predictor::image_classification::ImageClassifyService_Stub
stub4(&channel);
// Send a request and wait for the response every 1 second.
int log_id = 0;
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
log_id++;
sparse_format::send_sparse_format(stub1, log_id);
usleep(FLAGS_interval_ms * 1000L);
log_id++;
dense_format::send_dense_format(stub2, log_id);
usleep(FLAGS_interval_ms * 1000L);
//log_id++;
//fluid_format::send_fluid_format(stub3, log_id);
//usleep(FLAGS_interval_ms * 1000L);
log_id++;
ximage_format::send_ximage_format(stub4, log_id);
usleep(FLAGS_interval_ms * 1000L);
}
LOG(INFO) << "Pdserving Client is going to quit";
google::ShutdownGoogleLogging();
return 0;
}
...@@ -17,28 +17,49 @@ target_include_directories(ximage PUBLIC ...@@ -17,28 +17,49 @@ target_include_directories(ximage PUBLIC
${CMAKE_CURRENT_BINARY_DIR}/../configure ${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include ${CMAKE_CURRENT_LIST_DIR}/../configure/include
) )
target_link_libraries(ximage sdk-cpp -lpthread -lcrypto -lm -lrt -lssl -ldl target_link_libraries(ximage -Wl,--whole-archive sdk-cpp
-Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-lz) -lz)
add_executable(mapcnn_dense ${CMAKE_CURRENT_LIST_DIR}/demo/mapcnn_dense.cpp) add_executable(echo ${CMAKE_CURRENT_LIST_DIR}/demo/echo.cpp)
target_include_directories(mapcnn_dense PUBLIC target_include_directories(echo PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include ${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}/ ${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include
)
target_link_libraries(echo -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-lz)
add_executable(dense_format ${CMAKE_CURRENT_LIST_DIR}/demo/dense_format.cpp)
target_include_directories(dense_format PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_BINARY_DIR}/../configure ${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include ${CMAKE_CURRENT_LIST_DIR}/../configure/include
) )
target_link_libraries(mapcnn_dense sdk-cpp -lpthread -lcrypto -lm -lrt -lssl target_link_libraries(dense_format -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-ldl -lz) -lz)
add_executable(mapcnn_sparse ${CMAKE_CURRENT_LIST_DIR}/demo/mapcnn_sparse.cpp) add_executable(sparse_format ${CMAKE_CURRENT_LIST_DIR}/demo/sparse_format.cpp)
target_include_directories(mapcnn_sparse PUBLIC target_include_directories(sparse_format PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include ${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}/ ${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_BINARY_DIR}/../configure ${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include ${CMAKE_CURRENT_LIST_DIR}/../configure/include
) )
target_link_libraries(mapcnn_sparse sdk-cpp -lpthread -lcrypto -lm -lrt -lssl target_link_libraries(sparse_format -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-ldl -lz) -lz)
add_executable(int64tensor_format ${CMAKE_CURRENT_LIST_DIR}/demo/int64tensor_format.cpp)
target_include_directories(int64tensor_format PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_BINARY_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../configure/include
)
target_link_libraries(int64tensor_format -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-lz)
# install # install
install(TARGETS sdk-cpp install(TARGETS sdk-cpp
...@@ -46,8 +67,33 @@ install(TARGETS sdk-cpp ...@@ -46,8 +67,33 @@ install(TARGETS sdk-cpp
) )
install(TARGETS ximage install(TARGETS ximage
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/bin) RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/image_classification/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/) ${PADDLE_SERVING_INSTALL_DIR}/demo/client/image_classification/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/) ${PADDLE_SERVING_INSTALL_DIR}/demo/client/image_classification/)
install(TARGETS echo
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/)
install(TARGETS dense_format
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/dense_format/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/dense_format/)
install(TARGETS sparse_format
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/sparse_format/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/sparse_format/)
install(TARGETS int64tensor_format
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/int64tensor_format/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/int64tensor_format/)
...@@ -34,3 +34,63 @@ predictors { ...@@ -34,3 +34,63 @@ predictors {
} }
} }
} }
predictors {
name: "echo_service"
service_name: "baidu.paddle_serving.predictor.echo_service.BuiltinTestEchoService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
predictors {
name: "dense_service"
service_name: "baidu.paddle_serving.predictor.dense_service.BuiltinDenseFormatService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
predictors {
name: "sparse_service"
service_name: "baidu.paddle_serving.predictor.sparse_service.BuiltinSparseFormatService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
predictors {
name: "int64tensor_service"
service_name: "baidu.paddle_serving.predictor.int64tensor_service.BuiltinFluidService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h"
#include <fstream>
#include "predictor_sdk.h"
#include "dense_service.pb.h"
#include "builtin_format.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::dense_service::Request;
using baidu::paddle_serving::predictor::dense_service::Response;
using baidu::paddle_serving::predictor::format::DensePrediction;
using baidu::paddle_serving::predictor::format::DenseInstance;
int create_req(Request& req) {
DenseInstance *ins = req.mutable_instances()->Add();
ins->add_features(1.5);
ins->add_features(16.0);
ins->add_features(14.0);
ins = req.mutable_instances()->Add();
ins->add_features(1.0);
ins->add_features(2.0);
ins->add_features(3.0);
return 0;
}
void print_res(
const Request& req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
for (uint32_t i = 0; i < res.predictions_size(); ++i) {
const DensePrediction &prediction = res.predictions(i);
std::ostringstream oss;
for (uint32_t j = 0; j < prediction.categories_size(); ++j) {
oss << prediction.categories(j) << " ";
}
LOG(INFO) << "Receive result " << oss.str();
}
LOG(INFO)
<< "Succ call predictor[dense_format], the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
int main(int argc, char** argv) {
PredictorApi api;
// initialize logger instance
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
Request req;
Response res;
api.thrd_initialize();
while (true) {
timeval start;
gettimeofday(&start, NULL);
api.thrd_clear();
Predictor* predictor = api.fetch_predictor("dense_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: echo_service";
return -1;
}
req.Clear();
res.Clear();
if (create_req(req) != 0) {
return -1;
}
butil::IOBufBuilder debug_os;
if (predictor->debug(&req, &res, &debug_os) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req.ShortDebugString();
return -1;
}
butil::IOBuf debug_buf;
debug_os.move_to(debug_buf);
LOG(INFO) << "Debug string: " << debug_buf;
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
res.Clear();
usleep(50);
} // while (true)
api.thrd_finalize();
api.destroy();
google::ShutdownGoogleLogging();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h"
#include <fstream>
#include "predictor_sdk.h"
#include "echo_service.pb.h"
#include "builtin_format.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::echo_service::RequestAndResponse;
int create_req(RequestAndResponse& req) {
req.set_a(1);
req.set_b(0.1);
return 0;
}
void print_res(
const RequestAndResponse& req,
const RequestAndResponse& res,
std::string route_tag,
uint64_t elapse_ms) {
LOG(INFO) << "Reqeive result: a = " << res.a() << ", b = " << res.b();
LOG(INFO)
<< "Succ call predictor[echo_service], the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
int main(int argc, char** argv) {
PredictorApi api;
// initialize logger instance
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
RequestAndResponse req;
RequestAndResponse res;
api.thrd_initialize();
while (true) {
timeval start;
gettimeofday(&start, NULL);
api.thrd_clear();
Predictor* predictor = api.fetch_predictor("echo_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: echo_service";
return -1;
}
req.Clear();
res.Clear();
if (create_req(req) != 0) {
return -1;
}
butil::IOBufBuilder debug_os;
if (predictor->debug(&req, &res, &debug_os) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req.ShortDebugString();
return -1;
}
butil::IOBuf debug_buf;
debug_os.move_to(debug_buf);
LOG(INFO) << "Debug string: " << debug_buf;
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
res.Clear();
usleep(50);
} // while (true)
api.thrd_finalize();
api.destroy();
google::ShutdownGoogleLogging();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h"
#include <fstream>
#include "predictor_sdk.h"
#include "int64tensor_service.pb.h"
#include "builtin_format.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::int64tensor_service::Request;
using baidu::paddle_serving::predictor::int64tensor_service::Response;
using baidu::paddle_serving::predictor::format::Float32TensorPredictor;
using baidu::paddle_serving::predictor::format::Int64TensorInstance;
int create_req(Request& req) {
Int64TensorInstance *ins = req.mutable_instances()->Add();
ins->add_data(1);
ins->add_data(2);
ins->add_data(3);
ins->add_data(4);
ins->add_shape(2);
ins->add_shape(2);
ins = req.mutable_instances()->Add();
ins->add_data(5);
ins->add_data(6);
ins->add_data(7);
ins->add_data(8);
ins->add_data(9);
ins->add_shape(5);
ins->add_shape(1);
return 0;
}
void print_res(
const Request& req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
for (uint32_t i = 0; i < res.predictions_size(); ++i) {
const Float32TensorPredictor &prediction = res.predictions(i);
std::ostringstream oss1;
for (uint32_t j = 0; j < prediction.data_size(); ++j) {
oss1 << prediction.data(j) << " ";
}
std::ostringstream oss2;
for (uint32_t j = 0; j < prediction.shape_size(); ++j) {
oss2 << prediction.shape(j) << " ";
}
LOG(INFO) << "Receive result " << oss1.str() << ", shape " << oss2.str();
}
LOG(INFO)
<< "Succ call predictor[int64tensor_format], the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
int main(int argc, char** argv) {
PredictorApi api;
// initialize logger instance
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
Request req;
Response res;
api.thrd_initialize();
while (true) {
timeval start;
gettimeofday(&start, NULL);
api.thrd_clear();
Predictor* predictor = api.fetch_predictor("int64tensor_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: int64tensor_service";
return -1;
}
req.Clear();
res.Clear();
if (create_req(req) != 0) {
return -1;
}
butil::IOBufBuilder debug_os;
if (predictor->debug(&req, &res, &debug_os) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req.ShortDebugString();
return -1;
}
butil::IOBuf debug_buf;
debug_os.move_to(debug_buf);
LOG(INFO) << "Debug string: " << debug_buf;
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
res.Clear();
usleep(50);
} // while (true)
api.thrd_finalize();
api.destroy();
google::ShutdownGoogleLogging();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author root(root@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <fstream>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include "common.h"
#include "predictor_sdk.h"
#include "map_cnn.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::infinite::map_model::SparseTensor;
using baidu::infinite::map_model::SparseInstance;
using baidu::infinite::map_model::DensePrediction;
using baidu::infinite::map_model::Request;
using baidu::infinite::map_model::Response;
using baidu::infinite::map_model::MapCnnService;
static const uint32_t SELECT_VALID_UNIT = 1000;
class InputData {
public:
InputData() {}
~InputData() {}
int create(const std::string file_name, size_t buf_size,
size_t batch_size, int qps) {
pthread_mutex_init(&_mutex, NULL);
FILE* fp = fopen(file_name.c_str(), "r");
if (!fp) {
LOG(ERROR) << "Failed open data file: "
<< file_name;
return -1;
}
_data.clear();
char buffer[2048000];
std::vector<std::string> tokens;
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
tokens.clear();
baidu::paddle_serving::sdk_cpp::str_split(
buffer, ",", &tokens);
std::vector<float> feature_one;
for (size_t i = 0; i < tokens.size(); i++){
feature_one.push_back(
strtof(tokens[i].c_str(), NULL));
}
_data.push_back(feature_one);
}
printf("succ load data, size:%ld\n", _data.size());
for (size_t ri = 0; ri < buf_size; ri++) {
Request* req = new Request();
if (generate_one_req(*req, batch_size) != 0) {
LOG(ERROR) << "Failed generate req at: " << ri;
fclose(fp);
return -1;
}
_req_list.push_back(req);
}
fclose(fp);
_current = 0;
_waitingtm = 0;
_lasttm.tv_sec = _lasttm.tv_usec = 0;
if (qps == 0) {
_interval = 0;
} else if (qps < 1) {
_interval = 1000 * 1000;
} else {
_interval = 1000 * 1000 / qps;
}
LOG(INFO) << "Succ create req, size: " << buf_size
<< ", batch_size: " << batch_size;
return 0;
}
void destroy() {
size_t ds = _data.size();
for (size_t di = 0; di < ds; di++) {
_data[di].clear();
}
_data.clear();
size_t rs = _req_list.size();
for (size_t ri = 0; ri < rs; ri++) {
delete _req_list[ri];
}
_req_list.clear();
}
Request* next_req() {
pthread_mutex_lock(&_mutex);
if (_interval != 0)
{
if (_lasttm.tv_sec == 0 && _lasttm.tv_usec == 0)
{
gettimeofday(&_lasttm, NULL);
}
else
{
timeval curtm;
gettimeofday(&curtm, NULL);
long elapse =
((curtm.tv_sec - _lasttm.tv_sec) * 1000*1000 +
(curtm.tv_usec - _lasttm.tv_usec));
_waitingtm += _interval - elapse;
_lasttm = curtm;
if (_waitingtm >= SELECT_VALID_UNIT) // select的最小响应单位
{
long tm_unit
= _waitingtm / SELECT_VALID_UNIT * SELECT_VALID_UNIT;
timeval tmp_tm = {tm_unit / 1000000, tm_unit % 1000000};
select(1, NULL, NULL, NULL, &tmp_tm); //延时以控制压力speed
}
else if (_waitingtm <= SELECT_VALID_UNIT * (-2))
{
_waitingtm = -SELECT_VALID_UNIT;
}
}
}
size_t rs = _req_list.size();
Request* req = _req_list[(_current++) % rs];
pthread_mutex_unlock(&_mutex);
return req;
}
int generate_one_req(Request& req, int batch) {
int batch_size = batch;
std::vector<std::vector<int> > shapes;
shapes.clear();
int p_shape[] = {batch_size, 37, 1, 1};
std::vector<int> shape(p_shape, p_shape + 4);
shapes.push_back(shape);
int p_shape1[] = {batch_size, 1, 50, 12};
std::vector<int> shape1(p_shape1, p_shape1 + 4);
shapes.push_back(shape1);
int p_shape2[] = {batch_size, 1, 50, 19};
std::vector<int> shape2(p_shape2, p_shape2 + 4);
shapes.push_back(shape2);
int p_shape3[] = {batch_size, 1, 50, 1};
std::vector<int> shape3(p_shape3, p_shape3 + 4);
shapes.push_back(shape3);
int p_shape4[] = {batch_size, 4, 50, 1};
std::vector<int> shape4(p_shape4, p_shape4 + 4);
shapes.push_back(shape4);
int p_shape5[] = {batch_size, 1, 50, 1};
std::vector<int> shape5(p_shape5, p_shape5 + 4);
shapes.push_back(shape5);
int p_shape6[] = {batch_size, 5, 50, 1};
std::vector<int> shape6(p_shape6, p_shape6 + 4);
shapes.push_back(shape6);
int p_shape7[] = {batch_size, 7, 50, 1};
std::vector<int> shape7(p_shape7, p_shape7 + 4);
shapes.push_back(shape7);
int p_shape8[] = {batch_size, 3, 50, 1};
std::vector<int> shape8(p_shape8, p_shape8 + 4);
shapes.push_back(shape8);
int p_shape9[] = {batch_size, 32, 50, 1}; // added
std::vector<int> shape9(p_shape9, p_shape9 + 4);
shapes.push_back(shape9);
std::vector<std::string> tensor_names;
tensor_names.push_back("input_0");
tensor_names.push_back("input_1");
tensor_names.push_back("input_2");
tensor_names.push_back("input_3");
tensor_names.push_back("input_4");
tensor_names.push_back("input_5");
tensor_names.push_back("input_6");
tensor_names.push_back("input_7");
tensor_names.push_back("input_8");
tensor_names.push_back("input_9");
SparseInstance* ins = req.add_instances();
for (int fi = 0; fi < _data.size(); ++fi) {
SparseTensor* tensor = ins->add_tensors();
tensor->set_name(tensor_names[fi]);
int len = 1;
for (int si = 0; si < shapes[fi].size(); ++si) {
len *= shapes[fi][si];
}
for (int si = 0; si < shapes[fi].size(); ++si) {
tensor->add_shape(shapes[fi][si]);
}
tensor->set_features(&(_data[fi][0]), len * sizeof(float));
}
return 0;
}
private:
std::vector<std::vector<float> > _data;
std::vector<Request*> _req_list;
pthread_mutex_t _mutex;
long _waitingtm;
long _interval;
timeval _lasttm;
int _current;
};
void print_res(
const Request* req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
uint32_t sample_size = res.predictions_size();
for (int i = 0; i < sample_size; ++i) {
const ::baidu::infinite::map_model::DensePrediction& prediction = res.predictions(i);
int cat_size = prediction.categories_size();
for (int j = 0; j < cat_size; ++j) {
//LOG(INFO) << "categories:" << prediction.categories(j);
}
}
LOG(INFO)
<< "Succ call predictor[wasq], res sample size: "
<< sample_size << ", the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
struct Arg {
PredictorApi* api;
InputData* input;
};
void* work(void* p) {
Arg* arg = (Arg*) p;
InputData* input = arg->input;
PredictorApi* api = arg->api;
Response res;
LOG(WARNING) << "Thread entry!";
while (true) {
Predictor* predictor = api->fetch_predictor("mapcnn");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: wasq";
return NULL;
}
Request* req = input->next_req();
res.Clear();
timeval start;
gettimeofday(&start, NULL);
if (predictor->inference(req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req->ShortDebugString();
return NULL;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
if (api->free_predictor(predictor) != 0) {
printf("failed free predictor\n");
}
}
LOG(WARNING) << "Thread exit!";
return NULL;
}
int main(int argc, char** argv) {
if (argc != 5) {
printf("Usage: demo req_buf_size batch_size threads qps\n");
return -1;
}
int req_buffer = atoi(argv[1]);
int batch_size = atoi(argv[2]);
int thread_num = atoi(argv[3]);
int qps = atoi(argv[4]);
PredictorApi api;
if (api.create("./conf", "predictors.conf") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
InputData data;
if (data.create(
"./data/pure_feature", req_buffer, batch_size, qps) != 0) {
LOG(ERROR) << "Failed create inputdata!";
return -1;
}
Arg arg = {&api, &data};
pthread_t* threads = new pthread_t[thread_num];
if (!threads) {
LOG(ERROR) << "Failed create threads, num:" << thread_num;
return -1;
}
for (int i = 0; i < thread_num; ++i) {
pthread_create(threads + i, NULL, work, &arg);
}
for (int i = 0; i < thread_num; ++i) {
pthread_join(threads[i], NULL);
}
delete[] threads;
data.destroy();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author root(root@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <fstream>
#include <unistd.h>
#include <stdlib.h>
#include <bthread/bthread.h>
#include "common.h"
#include "predictor_sdk.h"
#include "default_schema.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::fluid_engine::SparseTensor;
using baidu::paddle_serving::fluid_engine::SparseInstance;
using baidu::paddle_serving::fluid_engine::Prediction;
using baidu::paddle_serving::fluid_engine::SparseRequest;
using baidu::paddle_serving::fluid_engine::Response;
static const uint32_t SELECT_VALID_UNIT = 1000;
class InputData {
public:
InputData() {}
~InputData() {}
int create(const std::string file_name, size_t buf_size,
size_t batch_size, int qps) {
bthread_mutex_init(&_mutex, NULL);
FILE* fp = fopen(file_name.c_str(), "r");
if (!fp) {
LOG(ERROR) << "Failed open data file: "
<< file_name;
return -1;
}
_data.clear();
char buffer[2048000];
std::vector<std::string> tokens;
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
tokens.clear();
baidu::paddle_serving::sdk_cpp::str_split(
buffer, ",", &tokens);
std::vector<float> feature_one;
for (size_t i = 0; i < tokens.size(); i++){
feature_one.push_back(
strtof(tokens[i].c_str(), NULL));
}
_data.push_back(feature_one);
}
printf("succ load data, size:%ld\n", _data.size());
for (size_t ri = 0; ri < buf_size; ri++) {
SparseRequest* req = new SparseRequest();
if (generate_one_req(*req, batch_size) != 0) {
LOG(ERROR) << "Failed generate req at: " << ri;
fclose(fp);
return -1;
}
_req_list.push_back(req);
}
fclose(fp);
_current = 0;
_waitingtm = 0;
_lasttm.tv_sec = _lasttm.tv_usec = 0;
if (qps == 0) {
_interval = 0;
} else if (qps < 1) {
_interval = 1000 * 1000;
} else {
_interval = 1000 * 1000 / qps;
}
LOG(INFO) << "Succ create req, size: " << buf_size
<< ", batch_size: " << batch_size;
return 0;
}
void destroy() {
size_t ds = _data.size();
for (size_t di = 0; di < ds; di++) {
_data[di].clear();
}
_data.clear();
size_t rs = _req_list.size();
for (size_t ri = 0; ri < rs; ri++) {
delete _req_list[ri];
}
_req_list.clear();
}
SparseRequest* next_req() {
bthread_mutex_lock(&_mutex);
/*
if (_interval != 0)
{
if (_lasttm.tv_sec == 0 && _lasttm.tv_usec == 0)
{
gettimeofday(&_lasttm, NULL);
}
else
{
timeval curtm;
gettimeofday(&curtm, NULL);
long elapse =
((curtm.tv_sec - _lasttm.tv_sec) * 1000*1000 +
(curtm.tv_usec - _lasttm.tv_usec));
_waitingtm += _interval - elapse;
_lasttm = curtm;
if (_waitingtm >= SELECT_VALID_UNIT) // select的最小响应单位
{
long tm_unit
= _waitingtm / SELECT_VALID_UNIT * SELECT_VALID_UNIT;
timeval tmp_tm = {tm_unit / 1000000, tm_unit % 1000000};
select(1, NULL, NULL, NULL, &tmp_tm); //延时以控制压力speed
}
else if (_waitingtm <= SELECT_VALID_UNIT * (-2))
{
_waitingtm = -SELECT_VALID_UNIT;
}
}
}*/
size_t rs = _req_list.size();
SparseRequest* req = _req_list[(_current++) % rs];
bthread_mutex_unlock(&_mutex);
return req;
}
int generate_one_req(SparseRequest& req, int batch) {
int batch_size = batch;
std::vector<std::vector<int> > shapes;
shapes.clear();
int p_shape[] = {batch_size, 37, 1, 1};
std::vector<int> shape(p_shape, p_shape + 4);
shapes.push_back(shape);
int p_shape1[] = {batch_size, 1, 50, 12};
std::vector<int> shape1(p_shape1, p_shape1 + 4);
shapes.push_back(shape1);
int p_shape2[] = {batch_size, 1, 50, 19};
std::vector<int> shape2(p_shape2, p_shape2 + 4);
shapes.push_back(shape2);
int p_shape3[] = {batch_size, 1, 50, 1};
std::vector<int> shape3(p_shape3, p_shape3 + 4);
shapes.push_back(shape3);
int p_shape4[] = {batch_size, 4, 50, 1};
std::vector<int> shape4(p_shape4, p_shape4 + 4);
shapes.push_back(shape4);
int p_shape5[] = {batch_size, 1, 50, 1};
std::vector<int> shape5(p_shape5, p_shape5 + 4);
shapes.push_back(shape5);
int p_shape6[] = {batch_size, 5, 50, 1};
std::vector<int> shape6(p_shape6, p_shape6 + 4);
shapes.push_back(shape6);
int p_shape7[] = {batch_size, 7, 50, 1};
std::vector<int> shape7(p_shape7, p_shape7 + 4);
shapes.push_back(shape7);
int p_shape8[] = {batch_size, 3, 50, 1};
std::vector<int> shape8(p_shape8, p_shape8 + 4);
shapes.push_back(shape8);
int p_shape9[] = {batch_size, 32, 50, 1}; // added
std::vector<int> shape9(p_shape9, p_shape9 + 4);
shapes.push_back(shape9);
std::vector<std::string> tensor_names;
/*
tensor_names.push_back("input_0");
tensor_names.push_back("input_1");
tensor_names.push_back("input_2");
tensor_names.push_back("input_3");
tensor_names.push_back("input_4");
tensor_names.push_back("input_5");
tensor_names.push_back("input_6");
tensor_names.push_back("input_7");
tensor_names.push_back("input_8");
tensor_names.push_back("input_9");
*/
tensor_names.push_back("attr_f");
tensor_names.push_back("realtime_f");
tensor_names.push_back("static_f");
tensor_names.push_back("eta_cost_f");
tensor_names.push_back("lukuang_f");
tensor_names.push_back("length_f");
tensor_names.push_back("path_f");
tensor_names.push_back("speed_f");
tensor_names.push_back("lane_f");
tensor_names.push_back("roadid_f");
std::vector<float> tensor_values;
SparseInstance* ins = req.add_instances();
for (int fi = 0; fi < _data.size(); ++fi) {
SparseTensor* tensor = ins->add_tensors();
tensor->set_name(tensor_names[fi]);
int len = 1;
for (int si = 0; si < shapes[fi].size(); ++si) {
len *= shapes[fi][si];
tensor->add_shape(shapes[fi][si]);
}
tensor_values.clear();
for (int vi = 0; vi < len; ++vi) {
if (std::abs(_data[fi][vi]) > 0.000001f) {
tensor_values.push_back(_data[fi][vi]);
tensor->add_keys(vi);
}
}
tensor->set_features(
&tensor_values[0], tensor_values.size() * sizeof(float));
}
tensor_values.clear();
return 0;
}
private:
std::vector<std::vector<float> > _data;
std::vector<SparseRequest*> _req_list;
bthread_mutex_t _mutex;
long _waitingtm;
long _interval;
timeval _lasttm;
int _current;
};
void print_res(
const SparseRequest* req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
uint32_t feature_size = res.predictions_size();
size_t sample_size = 0;
for (int i = 0; i < feature_size; ++i) {
const ::baidu::paddle_serving::fluid_engine::Prediction& prediction = res.predictions(i);
if (i == 0) {
sample_size = prediction.categories_size();
}
for (int j = 0; j < sample_size; ++j) {
//LOG(TRACE) << "categories:" << prediction.categories(j);
}
}
LOG(INFO)
<< "Succ call predictor[sparse_cnn], res sample size: "
<< sample_size << ", the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
struct Arg {
PredictorApi* api;
InputData* input;
};
void* work(void* p) {
Arg* arg = (Arg*) p;
InputData* input = arg->input;
PredictorApi* api = arg->api;
if (api->thrd_initialize() != 0) {
LOG(ERROR) << "Failed init api in thrd:" << bthread_self();
return NULL;
}
Response res;
LOG(WARNING) << "Thread entry!";
while (true) {
api->thrd_clear();
Predictor* predictor = api->fetch_predictor("sparse_cnn");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: sparse_cnn";
continue;
}
SparseRequest* req = input->next_req();
res.Clear();
timeval start;
gettimeofday(&start, NULL);
if (predictor->inference(req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req->ShortDebugString();
api->free_predictor(predictor);
continue;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
}
api->thrd_finalize();
LOG(WARNING) << "Thread exit!";
return NULL;
}
int main(int argc, char** argv) {
if (argc != 5) {
printf("Usage: demo req_buf_size batch_size threads qps\n");
return -1;
}
int req_buffer = atoi(argv[1]);
int batch_size = atoi(argv[2]);
int thread_num = atoi(argv[3]);
int qps = atoi(argv[4]);
PredictorApi api;
if (api.create("./conf", "predictors.conf") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
InputData data;
if (data.create(
//"./data/feature", req_buffer, batch_size, qps) != 0) {
"./data/pure_feature", req_buffer, batch_size, qps) != 0) {
LOG(ERROR) << "Failed create inputdata!";
return -1;
}
Arg arg = {&api, &data};
bthread_t* threads = new bthread_t[thread_num];
if (!threads) {
LOG(ERROR) << "Failed create threads, num:" << thread_num;
return -1;
}
for (int i = 0; i < thread_num; ++i) {
bthread_start_background(threads + i, NULL, work, &arg);
}
for (int i = 0; i < thread_num; ++i) {
bthread_join(threads[i], NULL);
}
delete[] threads;
data.destroy();
return 0;
};
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author root(root@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <fstream>
#include <unistd.h>
#include <stdlib.h>
#include <bthread.h>
#include "common.h"
#include "predictor_sdk.h"
#include "map_rnn.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
//using baidu::infinite::map_rnn::Tensor;
using baidu::infinite::map_rnn::DenseInstance;
using baidu::infinite::map_rnn::DensePrediction;
using baidu::infinite::map_rnn::Request;
using baidu::infinite::map_rnn::Response;
using baidu::infinite::map_rnn::MapRnnService;
static const uint32_t SELECT_VALID_UNIT = 1000;
int split(std::string source, char spliter, std::vector<std::string>& result)
{
result.clear();
std::string::size_type pos;
std::string::size_type start = 0;
while ((pos = source.find(spliter, start)) != std::string::npos)
{
result.insert(result.end(), source.substr(start, pos-start));
start = pos+1;
}
result.insert(result.end(), source.substr(start));
return (int)result.size();
}
int load_data(std::string data_file_name, std::vector<std::vector<float> >& data){
std::ifstream data_file;
std::vector<std::string> token;
data_file.open(data_file_name, std::ios::in);
std::string input_line;
while (std::getline(data_file, input_line)) {
split(input_line, ',', token);
std::vector<float> feature_one;
for (size_t i = 0; i < token.size(); i++){
feature_one.push_back(std::stof(token[i]));
}
data.push_back(feature_one);
}
return 0;
}
void split(const std::string &str, char sep, std::vector<std::string> *pieces) {
pieces->clear();
if (str.empty()) {
return;
}
size_t pos = 0;
size_t next = str.find(sep, pos);
while (next != std::string::npos) {
pieces->push_back(str.substr(pos, next - pos));
pos = next + 1;
next = str.find(sep, pos);
}
if (!str.substr(pos).empty()) {
pieces->push_back(str.substr(pos));
}
}
void split_to_float(const std::string &str, char sep, std::vector<float> *fs) {
std::vector<std::string> pieces;
split(str, sep, &pieces);
std::transform(pieces.begin(), pieces.end(), std::back_inserter(*fs),
[](const std::string &v) {
return std::stof(v);
});
}
// clang-format off
/*void TensorAssignData(paddle::PaddleTensor *tensor, const std::vector<std::vector<float>> &data) {
// Assign buffer
int dim = std::accumulate(tensor->shape.begin(), tensor->shape.end(), 1, [](int a, int b) { return a * b; });
tensor->data.Resize(sizeof(float) * dim);
int c = 0;
for (const auto &f : data) {
for (float v : f) { static_cast<float *>(tensor->data.data())[c++] = v; }
}
}*/
// clang-format on
struct DataRecord {
std::vector<std::vector<std::vector<float>>> link_step_data_all;
std::vector<std::vector<float>> week_data_all, minute_data_all;
std::vector<std::vector<std::vector<char>>> ch_link_step_data_all;
std::vector<std::vector<char>> ch_week_data_all, ch_minute_data_all;
std::vector<size_t> lod1, lod2, lod3;
std::vector<std::vector<float>> rnn_link_data, rnn_week_datas,
rnn_minute_datas;
size_t batch_iter{0};
size_t batch_size{1};
DataRecord() = default;
DataRecord(const std::string &path, int batch_size = 1)
: batch_size(batch_size) {
Load(path);
for (std::vector<std::vector<std::vector<float>>>::iterator it1 = link_step_data_all.begin();
it1 != link_step_data_all.end(); ++it1) {
std::vector<std::vector<char>> links;
for (std::vector<std::vector<float>>::iterator it2 = it1->begin(); it2 != it1->end(); ++it2) {
int len = it2->size() * sizeof(float);
char* ch = (char*)malloc(len);
memcpy(ch, it2->data(), len);
std::vector<char> tmp(ch, ch + len);
links.push_back(tmp);
free(ch);
}
ch_link_step_data_all.push_back(links);
}
for (std::vector<std::vector<float>>::iterator it1 = week_data_all.begin(); it1 != week_data_all.end(); ++it1) {
int len = it1->size() * sizeof(float);
char* ch = (char*)malloc(len);
memcpy(ch, it1->data(), len);
std::vector<char> tmp(ch, ch + len);
ch_week_data_all.push_back(tmp);
free(ch);
}
for (std::vector<std::vector<float>>::iterator it1 = minute_data_all.begin(); it1 != minute_data_all.end(); ++it1) {
int len = it1->size() * sizeof(float);
char* ch = (char*)malloc(len);
memcpy(ch, it1->data(), len);
std::vector<char> tmp(ch, ch + len);
ch_minute_data_all.push_back(tmp);
free(ch);
}
}
DataRecord NextBatch() {
DataRecord data;
size_t batch_end = batch_iter + batch_size;
// NOTE skip the final batch, if no enough data is provided.
if (batch_end <= link_step_data_all.size()) {
data.link_step_data_all.assign(link_step_data_all.begin() + batch_iter,
link_step_data_all.begin() + batch_end);
data.week_data_all.assign(week_data_all.begin() + batch_iter,
week_data_all.begin() + batch_end);
data.minute_data_all.assign(minute_data_all.begin() + batch_iter,
minute_data_all.begin() + batch_end);
// Prepare LoDs
data.lod1.emplace_back(0);
data.lod2.emplace_back(0);
data.lod3.emplace_back(0);
//CHECK(!data.link_step_data_all.empty()) << "empty";
//CHECK(!data.week_data_all.empty());
//CHECK(!data.minute_data_all.empty());
//CHECK_EQ(data.link_step_data_all.size(), data.week_data_all.size());
//CHECK_EQ(data.minute_data_all.size(), data.link_step_data_all.size());
for (size_t j = 0; j < data.link_step_data_all.size(); j++) {
for (const auto &d : data.link_step_data_all[j]) {
data.rnn_link_data.push_back(d);
}
data.rnn_week_datas.push_back(data.week_data_all[j]);
data.rnn_minute_datas.push_back(data.minute_data_all[j]);
// calculate lod
data.lod1.push_back(data.lod1.back() +
data.link_step_data_all[j].size());
data.lod3.push_back(data.lod3.back() + 1);
for (size_t i = 1; i < data.link_step_data_all[j].size() + 1; i++) {
data.lod2.push_back(data.lod2.back() +
data.link_step_data_all[j].size());
}
}
}
batch_iter += batch_size;
return data;
}
void Load(const std::string &path) {
std::ifstream file(path);
std::string line;
int num_lines = 0;
while (std::getline(file, line)) {
num_lines++;
std::vector<std::string> data;
split(line, ':', &data);
std::vector<std::vector<float>> link_step_data;
std::vector<std::string> link_datas;
split(data[0], '|', &link_datas);
for (auto &step_data : link_datas) {
std::vector<float> tmp;
split_to_float(step_data, ',', &tmp);
link_step_data.emplace_back(tmp);
}
// load week data
std::vector<float> week_data;
split_to_float(data[2], ',', &week_data);
// load minute data
std::vector<float> minute_data;
split_to_float(data[1], ',', &minute_data);
link_step_data_all.emplace_back(std::move(link_step_data));
week_data_all.emplace_back(std::move(week_data));
minute_data_all.emplace_back(std::move(minute_data));
}
}
};
/*void PrepareInputs(std::vector<paddle::PaddleTensor> *input_slots, DataRecord *data,
int batch_size) {
// DataRecord data(FLAGS_datapath, batch_size);
paddle::PaddleTensor lod_attention_tensor, init_zero_tensor, lod_tensor_tensor,
week_tensor, minute_tensor;
lod_attention_tensor.name = "lod_attention";
init_zero_tensor.name = "init_zero";
lod_tensor_tensor.name = "lod_tensor";
week_tensor.name = "week";
minute_tensor.name = "minute";
auto one_batch = data->NextBatch();
printf("rnn_link_data.size:%lu,\n", one_batch.rnn_link_data.size());
printf("rnn_link_data.front().size:%lu\n", one_batch.rnn_link_data.front().size());
// clang-format off
std::vector<int> rnn_link_data_shape
({static_cast<int>(one_batch.rnn_link_data.size()), static_cast<int>(one_batch.rnn_link_data.front().size())});
//LOG(INFO) << "set 1";
lod_attention_tensor.shape.assign({1, 2});
lod_attention_tensor.lod.assign({one_batch.lod1, one_batch.lod2});
//LOG(INFO) << "set 1";
init_zero_tensor.shape.assign({batch_size, 15});
init_zero_tensor.lod.assign({one_batch.lod3});
//LOG(INFO) << "set 1";
lod_tensor_tensor.shape = rnn_link_data_shape;
lod_tensor_tensor.lod.assign({one_batch.lod1});
//LOG(INFO) << "set 1";
week_tensor.shape.assign({(int) one_batch.rnn_week_datas.size(), (int) one_batch.rnn_week_datas.front().size()});
week_tensor.lod.assign({one_batch.lod3});
//LOG(INFO) << "set 1";
minute_tensor.shape.assign({(int) one_batch.rnn_minute_datas.size(),
(int) one_batch.rnn_minute_datas.front().size()});
minute_tensor.lod.assign({one_batch.lod3});
// assign data
TensorAssignData(&lod_attention_tensor, std::vector<std::vector<float>>({{0, 0}}));
std::vector<float> tmp_zeros(batch_size * 15, 0.);
TensorAssignData(&init_zero_tensor, {tmp_zeros});
TensorAssignData(&lod_tensor_tensor, one_batch.rnn_link_data);
TensorAssignData(&week_tensor, one_batch.rnn_week_datas);
TensorAssignData(&minute_tensor, one_batch.rnn_minute_datas);
// clang-format on
input_slots->assign({lod_tensor_tensor, lod_attention_tensor,
init_zero_tensor, init_zero_tensor, week_tensor,
minute_tensor});
for (auto &tensor : *input_slots) {
tensor.dtype = paddle::PaddleDType::FLOAT32;
// LOG(INFO) << DescribeTensor(tensor);
}
}*/
class InputData {
public:
InputData() {}
~InputData() {}
int create(const std::string file_name, size_t buf_size,
size_t batch_size, int qps) {
bthread_mutex_init(&_mutex, NULL);
std::string datapath = "./data/test_features_sys";
DataRecord data(datapath, batch_size);
_data_record = data;
/*FILE* fp = fopen(file_name.c_str(), "r");
if (!fp) {
LOG(ERROR) << "Failed open data file: "
<< file_name;
return -1;
}
_data.clear();
char buffer[2048000];
std::vector<std::string> tokens;
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
tokens.clear();
baidu::paddle_serving::sdk_cpp::str_split(
buffer, ",", &tokens);
std::vector<float> feature_one;
for (size_t i = 0; i < tokens.size(); i++){
feature_one.push_back(
strtof(tokens[i].c_str(), NULL));
}
_data.push_back(feature_one);
}
printf("succ load data, size:%ld\n", _data.size());
*/
for (size_t ri = 0; ri < buf_size; ri++) {
Request* req = new Request();
if (generate_one_req(*req, batch_size) != 0) {
LOG(ERROR) << "Failed generate req at: " << ri;
//fclose(fp);
return -1;
}
_req_list.push_back(req);
}
//fclose(fp);
_current = 0;
_waitingtm = 0;
_lasttm.tv_sec = _lasttm.tv_usec = 0;
if (qps == 0) {
_interval = 0;
} else if (qps < 1) {
_interval = 1000 * 1000;
} else {
_interval = 1000 * 1000 / qps;
}
LOG(INFO) << "Succ create req, size: " << buf_size
<< ", batch_size: " << batch_size;
return 0;
}
void destroy() {
size_t ds = _data.size();
for (size_t di = 0; di < ds; di++) {
_data[di].clear();
}
_data.clear();
size_t rs = _req_list.size();
for (size_t ri = 0; ri < rs; ri++) {
delete _req_list[ri];
}
_req_list.clear();
}
Request* next_req() {
bthread_mutex_lock(&_mutex);
if (_interval != 0)
{
if (_lasttm.tv_sec == 0 && _lasttm.tv_usec == 0)
{
gettimeofday(&_lasttm, NULL);
}
else
{
timeval curtm;
gettimeofday(&curtm, NULL);
long elapse =
((curtm.tv_sec - _lasttm.tv_sec) * 1000*1000 +
(curtm.tv_usec - _lasttm.tv_usec));
_waitingtm += _interval - elapse;
_lasttm = curtm;
if (_waitingtm >= SELECT_VALID_UNIT) // select的最小响应单位
{
long tm_unit
= _waitingtm / SELECT_VALID_UNIT * SELECT_VALID_UNIT;
timeval tmp_tm = {tm_unit / 1000000, tm_unit % 1000000};
select(1, NULL, NULL, NULL, &tmp_tm); //延时以控制压力speed
}
else if (_waitingtm <= SELECT_VALID_UNIT * (-2))
{
_waitingtm = -SELECT_VALID_UNIT;
}
}
}
size_t rs = _req_list.size();
Request* req = _req_list[(_current++) % rs];
bthread_mutex_unlock(&_mutex);
return req;
}
int generate_one_req(Request& req, int batch) {
int batch_size = batch;
int i = 0;
DenseInstance* ins = req.add_instances();
ins->set_batch_size(batch_size);
for (std::vector<std::vector<std::vector<char>>>::iterator it1 = _data_record.ch_link_step_data_all.begin();
it1 != _data_record.ch_link_step_data_all.end(); ++it1) {
::baidu::infinite::map_rnn::Lines* step_data = ins->add_step_data();
for (std::vector<std::vector<char>>::iterator it2 = it1->begin(); it2 != it1->end(); ++it2) {
::baidu::infinite::map_rnn::Line* line = step_data->add_line();
line->set_value(it2->data(), it2->size());
}
if (++i == batch_size) {
break;
}
}
i = 0;
::baidu::infinite::map_rnn::Lines* week_data = ins->mutable_week_data();
for (std::vector<std::vector<char>>::iterator it1 = _data_record.ch_week_data_all.begin();
it1 != _data_record.ch_week_data_all.end(); ++it1) {
::baidu::infinite::map_rnn::Line* line = week_data->add_line();
line->set_value(it1->data(), it1->size());
if (++i == batch_size) {
break;
}
}
i = 0;
::baidu::infinite::map_rnn::Lines* minute_data = ins->mutable_minute_data();
for (std::vector<std::vector<char>>::iterator it1 = _data_record.ch_minute_data_all.begin();
it1 != _data_record.ch_minute_data_all.end(); ++it1) {
::baidu::infinite::map_rnn::Line* line = minute_data->add_line();
line->set_value(it1->data(), it1->size());
if (++i == batch_size) {
break;
}
}
/*for (int fi = 0; fi < _data.size(); ++fi) {
Tensor* tensor = ins->add_tensors();
tensor->set_name(tensor_names[fi]);
int len = 1;
for (int si = 0; si < shapes[fi].size(); ++si) {
len *= shapes[fi][si];
}
for (int si = 0; si < shapes[fi].size(); ++si) {
tensor->add_shape(shapes[fi][si]);
}
tensor->set_features(&(_data[fi][0]), len * sizeof(float));
}*/
return 0;
}
private:
DataRecord _data_record;
std::vector<std::vector<float> > _data;
std::vector<Request*> _req_list;
bthread_mutex_t _mutex;
long _waitingtm;
long _interval;
timeval _lasttm;
int _current;
};
void print_res(
const Request* req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
uint32_t sample_size = res.predictions_size();
LOG(INFO)
<< "Succ call predictor[wasq], res sample size: "
<< sample_size << ", the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
struct Arg {
PredictorApi* api;
InputData* input;
};
void* work(void* p) {
Arg* arg = (Arg*) p;
InputData* input = arg->input;
if (PredictorApi::instance().thrd_initialize() != 0) {
LOG(ERROR) << "Failed create bthread local predictor";
return NULL;
}
Response res;
LOG(WARNING) << "Thread entry!";
while (true) {
if (PredictorApi::instance().thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear predictor";
return NULL;
}
Predictor* predictor = PredictorApi::instance().fetch_predictor("wasq");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: wasq";
return NULL;
}
Request* req = input->next_req();
res.Clear();
timeval start;
gettimeofday(&start, NULL);
if (predictor->inference(req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req->ShortDebugString();
return NULL;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
if (PredictorApi::instance().free_predictor(predictor) != 0) {
printf("failed free predictor\n");
}
//break;
//printf("done\n");
}
if (PredictorApi::instance().thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize predictor api";
}
LOG(WARNING) << "Thread exit!";
return NULL;
}
int main(int argc, char** argv) {
if (argc != 5) {
printf("Usage: demo req_buf_size batch_size threads qps\n");
return -1;
}
int req_buffer = atoi(argv[1]);
int batch_size = atoi(argv[2]);
int thread_num = atoi(argv[3]);
int qps = atoi(argv[4]);
if (PredictorApi::instance().create("./conf", "predictors.conf") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
InputData data;
if (data.create(
"./data/test_features_sys", req_buffer, batch_size, qps) != 0) {
LOG(ERROR) << "Failed create inputdata!";
return -1;
}
Arg arg = {NULL, &data};
bthread_t* threads = new bthread_t[thread_num];
if (!threads) {
LOG(ERROR) << "Failed create threads, num:" << thread_num;
return -1;
}
for (int i = 0; i < thread_num; ++i) {
bthread_start_background(threads + i, NULL, work, &arg);
}
for (int i = 0; i < thread_num; ++i) {
bthread_join(threads[i], NULL);
}
delete[] threads;
data.destroy();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h"
#include <fstream>
#include "predictor_sdk.h"
#include "sparse_service.pb.h"
#include "builtin_format.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::sparse_service::Request;
using baidu::paddle_serving::predictor::sparse_service::Response;
using baidu::paddle_serving::predictor::format::SparsePrediction;
using baidu::paddle_serving::predictor::format::SparseInstance;
int create_req(Request& req) {
SparseInstance *ins = req.mutable_instances()->Add();
ins->add_keys(26);
ins->add_keys(182);
ins->add_keys(232);
ins->add_shape(2000);
ins->add_values(1);
ins->add_values(1);
ins->add_values(1);
ins = req.mutable_instances()->Add();
ins->add_keys(0);
ins->add_keys(182);
ins->add_keys(232);
ins->add_keys(299);
ins->add_shape(2000);
ins->add_values(13);
ins->add_values(1);
ins->add_values(1);
ins->add_values(1);
return 0;
}
void print_res(
const Request& req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
for (uint32_t i = 0; i < res.predictions_size(); ++i) {
const SparsePrediction &prediction = res.predictions(i);
std::ostringstream oss;
for (uint32_t j = 0; j < prediction.categories_size(); ++j) {
oss << prediction.categories(j) << " ";
}
LOG(INFO) << "Receive result " << oss.str();
}
LOG(INFO)
<< "Succ call predictor[sparse_format], the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
int main(int argc, char** argv) {
PredictorApi api;
// initialize logger instance
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
Request req;
Response res;
api.thrd_initialize();
while (true) {
timeval start;
gettimeofday(&start, NULL);
api.thrd_clear();
Predictor* predictor = api.fetch_predictor("sparse_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: sparse_service";
return -1;
}
req.Clear();
res.Clear();
if (create_req(req) != 0) {
return -1;
}
butil::IOBufBuilder debug_os;
if (predictor->debug(&req, &res, &debug_os) != 0) {
LOG(ERROR) << "failed call predictor with req:"
<< req.ShortDebugString();
return -1;
}
butil::IOBuf debug_buf;
debug_os.move_to(debug_buf);
LOG(INFO) << "Debug string: " << debug_buf;
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
res.Clear();
usleep(50);
} // while (true)
api.thrd_finalize();
api.destroy();
google::ShutdownGoogleLogging();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
...@@ -11,6 +11,9 @@ ...@@ -11,6 +11,9 @@
* @brief * @brief
* *
**/ **/
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common.h" #include "common.h"
#include <fstream> #include <fstream>
...@@ -117,7 +120,21 @@ void print_res( ...@@ -117,7 +120,21 @@ void print_res(
int main(int argc, char** argv) { int main(int argc, char** argv) {
PredictorApi api; PredictorApi api;
// initialize logger instance
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
if (api.create("./conf", "predictors.prototxt") != 0) { if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!"; LOG(ERROR) << "Failed create predictors api!";
return -1; return -1;
......
...@@ -30,7 +30,7 @@ do { \ ...@@ -30,7 +30,7 @@ do { \
if (factory == NULL \ if (factory == NULL \
|| FactoryPool<B>::instance().register_factory(\ || FactoryPool<B>::instance().register_factory(\
#D, factory) != 0) { \ #D, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s in macro!", #D, #B); \ RAW_LOG_ERROR("Failed regist factory: %s->%s in macro!", #D, #B); \
return E; \ return E; \
} \ } \
} while (0) } while (0)
...@@ -42,7 +42,7 @@ do { \ ...@@ -42,7 +42,7 @@ do { \
if (factory == NULL \ if (factory == NULL \
|| FactoryPool<B>::instance().register_factory(\ || FactoryPool<B>::instance().register_factory(\
tag, factory) != 0) { \ tag, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s in macro!", #D);\ RAW_LOG_ERROR("Failed regist factory: %s in macro!", #D);\
return -1; \ return -1; \
} \ } \
return 0; \ return 0; \
...@@ -65,7 +65,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_ ...@@ -65,7 +65,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_
if (factory == NULL \ if (factory == NULL \
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\ || ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\
#D, factory) != 0) { \ #D, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s in macro!", #D, #B); \ RAW_LOG_ERROR("Failed regist factory: %s->%s in macro!", #D, #B); \
return ; \ return ; \
} \ } \
return ; \ return ; \
...@@ -79,7 +79,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_ ...@@ -79,7 +79,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_
if (factory == NULL \ if (factory == NULL \
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\ || ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\
T, factory) != 0) { \ T, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s, tag %s in macro!", #D, #B, T); \ RAW_LOG_ERROR("Failed regist factory: %s->%s, tag %s in macro!", #D, #B, T); \
return ; \ return ; \
} \ } \
return ; \ return ; \
...@@ -99,6 +99,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_ ...@@ -99,6 +99,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_
#define REGIST_STUB_OBJECT_WITH_TAG(D, C, R, I, O, T) \ #define REGIST_STUB_OBJECT_WITH_TAG(D, C, R, I, O, T) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \ { \
RAW_LOG_INFO("REGIST_STUB_OBJECT_WITH_TAG"); \
::baidu::paddle_serving::sdk_cpp::Factory< \ ::baidu::paddle_serving::sdk_cpp::Factory< \
::baidu::paddle_serving::sdk_cpp::StubImpl<D, C, R, I, O>,\ ::baidu::paddle_serving::sdk_cpp::StubImpl<D, C, R, I, O>,\
::baidu::paddle_serving::sdk_cpp::Stub>* factory = \ ::baidu::paddle_serving::sdk_cpp::Stub>* factory = \
...@@ -109,7 +110,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_ ...@@ -109,7 +110,7 @@ __attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE_
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool< \ || ::baidu::paddle_serving::sdk_cpp::FactoryPool< \
::baidu::paddle_serving::sdk_cpp::Stub>::instance().register_factory(\ ::baidu::paddle_serving::sdk_cpp::Stub>::instance().register_factory(\
T, factory) != 0) { \ T, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->Stub, tag: %s in macro!", #D, T); \ RAW_LOG_ERROR("Failed regist factory: %s->Stub, tag: %s in macro!", #D, T); \
return ; \ return ; \
} \ } \
return ; \ return ; \
...@@ -151,7 +152,7 @@ public: ...@@ -151,7 +152,7 @@ public:
typename std::map<std::string, FactoryBase<B>*>::iterator it typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag); = _pool.find(tag);
if (it != _pool.end()) { if (it != _pool.end()) {
RAW_LOG_FATAL("Insert duplicate with tag: %s", tag.c_str()); RAW_LOG_ERROR("Insert duplicate with tag: %s", tag.c_str());
return -1; return -1;
} }
...@@ -159,7 +160,7 @@ public: ...@@ -159,7 +160,7 @@ public:
typename std::map<std::string, FactoryBase<B>*>::iterator, typename std::map<std::string, FactoryBase<B>*>::iterator,
bool> r = _pool.insert(std::make_pair(tag, factory)); bool> r = _pool.insert(std::make_pair(tag, factory));
if (!r.second) { if (!r.second) {
RAW_LOG_FATAL("Failed insert new factory with: %s", tag.c_str()); RAW_LOG_ERROR("Failed insert new factory with: %s", tag.c_str());
return -1; return -1;
} }
...@@ -172,7 +173,7 @@ public: ...@@ -172,7 +173,7 @@ public:
typename std::map<std::string, FactoryBase<B>*>::iterator it typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag); = _pool.find(tag);
if (it == _pool.end() || it->second == NULL) { if (it == _pool.end() || it->second == NULL) {
RAW_LOG_FATAL("Not found factory pool, tag: %s, pool size: %u", tag.c_str(), _pool.size()); RAW_LOG_ERROR("Not found factory pool, tag: %s, pool size: %u", tag.c_str(), _pool.size());
return NULL; return NULL;
} }
......
syntax="proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.dense_service;
option cc_generic_services = true;
message Request {
repeated baidu.paddle_serving.predictor.format.DenseInstance instances = 1;
};
message Response {
repeated baidu.paddle_serving.predictor.format.DensePrediction predictions = 1;
};
service BuiltinDenseFormatService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
syntax="proto2";
import "pds_option.proto";
package baidu.paddle_serving.predictor.echo_service;
option cc_generic_services = true;
message RequestAndResponse {
required int32 a = 1;
required float b = 2;
};
service BuiltinTestEchoService {
rpc inference(RequestAndResponse) returns (RequestAndResponse);
rpc debug(RequestAndResponse) returns (RequestAndResponse);
option (pds.options).generate_stub = true;
};
syntax="proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.int64tensor_service;
option cc_generic_services = true;
message Request {
repeated baidu.paddle_serving.predictor.format.Int64TensorInstance
instances = 1;
};
message Response {
repeated baidu.paddle_serving.predictor.format.Float32TensorPredictor
predictions = 1;
};
service BuiltinFluidService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
syntax="proto2";
import "pds_option.proto";
package baidu.infinite.map_model;
option cc_generic_services = true;
message Tensor {
required string name = 1;
repeated uint32 shape = 2;
required bytes features = 3;
};
message SparseTensor {
required string name = 1;
repeated uint32 keys = 2;
repeated uint32 shape = 3;
required bytes features = 4;
};
message DenseInstance {
repeated Tensor tensors = 1;
};
message SparseInstance {
repeated SparseTensor tensors = 1;
};
message DenseRequest {
repeated DenseInstance instances = 1;
};
message Request {
repeated SparseInstance instances = 1;
};
message DensePrediction {
repeated float categories = 1;
};
message Response {
repeated DensePrediction predictions = 1;
};
service MapCnnService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
service MapDenseCnnService {
rpc inference(DenseRequest) returns (Response);
rpc debug(DenseRequest) returns (Response);
option (pds.options).generate_stub = true;
};
syntax="proto2";
import "pds_option.proto";
package baidu.infinite.map_rnn;
//package baidu.paddle_serving.predictor.map_rnn;
option cc_generic_services = true;
/*message Tensor {
required string name = 1;
repeated uint32 shape = 2;
required bytes features = 3;
};*/
message Line {
required bytes value = 1;
};
message Lines {
repeated Line line = 1;
};
message DenseInstance {
repeated Lines step_data = 1;
required Lines week_data = 2;
required Lines minute_data = 3;
required uint32 batch_size = 4;
};
message Request {
repeated DenseInstance instances = 1;
};
message DensePrediction {
repeated float categories = 1;
};
message Response {
repeated DensePrediction predictions = 1;
};
service MapRnnService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
//option (pds.options).generate_impl = true;
option (pds.options).generate_stub = true;
};
syntax="proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.sparse_service;
option cc_generic_services = true;
message Request {
repeated baidu.paddle_serving.predictor.format.SparseInstance instances = 1;
};
message Response {
repeated baidu.paddle_serving.predictor.format.SparsePrediction predictions = 1;
};
service BuiltinSparseFormatService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
find_library(MKLML_LIBS NAMES libmklml_intel.so libiomp5.so) find_library(MKLML_LIBS NAMES libmklml_intel.so libiomp5.so)
include(op/CMakeLists.txt) include(op/CMakeLists.txt)
include(proto/CMakeLists.txt) include(proto/CMakeLists.txt)
add_executable(image_class ${serving_srcs}) add_executable(serving ${serving_srcs})
add_dependencies(image_class pdcodegen fluid_cpu_engine pdserving paddle_fluid add_dependencies(serving pdcodegen fluid_cpu_engine pdserving paddle_fluid
opencv_imgcodecs) opencv_imgcodecs)
target_include_directories(image_class PUBLIC target_include_directories(serving PUBLIC
${CMAKE_CURRENT_LIST_DIR}/ ${CMAKE_CURRENT_LIST_DIR}/
${CMAKE_CURRENT_BINARY_DIR}/ ${CMAKE_CURRENT_BINARY_DIR}/
) )
target_link_libraries(image_class opencv_imgcodecs target_link_libraries(serving opencv_imgcodecs
${opencv_depend_libs} -Wl,--whole-archive fluid_cpu_engine ${opencv_depend_libs} -Wl,--whole-archive fluid_cpu_engine
-Wl,--no-whole-archive pdserving paddle_fluid ${paddle_depend_libs} -Wl,--no-whole-archive pdserving paddle_fluid ${paddle_depend_libs}
${MKLML_LIB} ${MKLML_IOMP_LIB} -lpthread -lcrypto -lm -lrt -lssl -ldl -lz) ${MKLML_LIB} ${MKLML_IOMP_LIB} -lpthread -lcrypto -lm -lrt -lssl -ldl -lz)
install(TARGETS image_class install(TARGETS serving
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/serving/bin) RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/serving/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/serving/) ${PADDLE_SERVING_INSTALL_DIR}/demo/serving/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION
......
services { services {
name: "ImageClassifyService" name: "BuiltinDenseFormatService"
workflows: "workflow1" workflows: "workflow1"
} }
services { services {
name: "BuiltinDenseFormatService" name: "BuiltinSparseFormatService"
workflows: "workflow2" workflows: "workflow2"
} }
services {
name: "BuiltinTestEchoService"
workflows: "workflow3"
}
services {
name: "ImageClassifyService"
workflows: "workflow4"
}
services {
name: "BuiltinFluidService"
workflows: "workflow5"
}
workflows { workflows {
name: "workflow1" name: "workflow1"
workflow_type: "Sequence" workflow_type: "Sequence"
nodes {
name: "dense_echo_op"
type: "DenseEchoOp"
}
}
workflows {
name: "workflow2"
workflow_type: "Sequence"
nodes {
name: "sparse_echo_op"
type: "SparseEchoOp"
dependencies {
name: "startup_op"
mode: "RO"
}
}
}
workflows {
name: "workflow3"
workflow_type: "Sequence"
nodes {
name: "echo_op"
type: "CommonEchoOp"
}
}
workflows {
name: "workflow4"
workflow_type: "Sequence"
nodes { nodes {
name: "image_reader_op" name: "image_reader_op"
type: "ReaderOp" type: "ReaderOp"
...@@ -22,11 +51,13 @@ workflows { ...@@ -22,11 +51,13 @@ workflows {
} }
} }
} }
workflows { workflows {
name: "workflow2" name: "workflow5"
workflow_type: "Sequence" workflow_type: "Sequence"
nodes { nodes {
name: "dense_op" name: "int64tensor_echo_op"
type: "DenseOp" type: "Int64TensorEchoOp"
} }
} }
// #include "base/iobuf.h"
#include "op/dense_op.h"
#include "framework/memory.h"
#include "framework/infer.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::predictor::native_tensor::DenseTensor;
using baidu::paddle_serving::predictor::native_tensor::DenseRequest;
using baidu::paddle_serving::predictor::native_tensor::DenseResponse;
using baidu::paddle_serving::predictor::native_tensor::TensorType;
using baidu::paddle_serving::predictor::InferManager;
bool dt2pt(TensorType in, paddle::PaddleDType& out, size_t& sz) {
switch (in) {
case TensorType::FLOAT:
out = paddle::PaddleDType::FLOAT32;
sz = sizeof(float);
return true;
case TensorType::DOUBLE:
return false;
case TensorType::INT32:
return false;
case TensorType::INT64:
out = paddle::PaddleDType::INT64;
sz = sizeof(int64_t);
return true;
case TensorType::UINT32:
return false;
case TensorType::UINT64:
return false;
default:
return false;
}
}
bool pt2dt(paddle::PaddleDType in, DenseTensor* out, size_t& sz) {
switch (in) {
case paddle::PaddleDType::FLOAT32:
out->set_type(TensorType::FLOAT);
sz = sizeof(float);
return true;
case paddle::PaddleDType::INT64:
out->set_type(TensorType::INT64);
sz = sizeof(int64_t);
return true;
default:
return false;
}
}
int DenseOp::inference() {
_in.clear();
_out.clear();
const DenseRequest* req =
dynamic_cast<const DenseRequest*>(get_request_message());
if (!req) {
LOG(ERROR) << "Failed get dense request message";
return -1;
}
DenseResponse* res = mutable_data<DenseResponse>();
if (!res) {
LOG(ERROR) << "Failed get tls output object failed";
return -1;
}
uint32_t tensor_size = req->tensors_size();
if (tensor_size <= 0) {
LOG(INFO) << "No samples need to to predicted";
return -1;
}
for (uint32_t ti = 0; ti < tensor_size; ++ti) {
paddle::PaddleTensor pt;
const DenseTensor& dt = req->tensors(ti);
size_t data_size = 1;
if (!dt2pt(dt.type(), pt.dtype, data_size)) {
LOG(ERROR) << "Invalid data type: " << dt.type();
return -1;
}
pt.name = dt.name();
size_t dim_size = 1;
for (uint32_t si = 0; si < dt.shape_size(); ++si) {
pt.shape.push_back(dt.shape(si));
dim_size *= dt.shape(si);
}
data_size *= dim_size;
void* data =
baidu::paddle_serving::predictor::MempoolWrapper::instance().malloc(data_size);
if (!data) {
LOG(ERROR) << "Failed malloc buffer: " << data_size;
return -1;
}
for (uint32_t fi = 0; fi < dim_size; ++fi) {
AppendHelerWrapper::append1(dt.type(), dt, data, fi);
}
pt.data = paddle::PaddleBuf(data, data_size);
_in.push_back(pt);
}
// call paddle fluid model for inferencing
if (InferManager::instance().infer(
BUILTIN_DENSE_FORMAT_MODEL_NAME, &_in, &_out)) {
LOG(ERROR) << "Failed do infer in fluid model: "
<< BUILTIN_DENSE_FORMAT_MODEL_NAME;
return -1;
}
// copy output tensor into response
for (uint32_t si = 0; si < _out.size(); si++) {
const paddle::PaddleTensor& pt = _out[si];
DenseTensor* dt = res->add_tensors();
if (!dt) {
LOG(ERROR) << "Failed append new out tensor";
return -1;
}
size_t data_size = 1;
if (!pt2dt(pt.dtype, dt, data_size)) {
LOG(ERROR) << "Invalid data type: " << pt.dtype;
return -1;
}
dt->set_name(pt.name);
uint32_t shape_size = pt.shape.size();
size_t dim_size = 1;
for (uint32_t si = 0; si < shape_size; ++si) {
dt->add_shape(pt.shape[si]);
dim_size *= pt.shape[si];
}
// assign output data
const void* data = pt.data.data();
for (uint32_t di = 0; di < dim_size; ++di) {
AppendHelerWrapper::append2(dt->type(), data, dt, di);
}
}
LOG(INFO) << "Response in builtin dense format:"
<< "length:" << res->ByteSize() << ","
<< "data:" << res->ShortDebugString() << ","
<< "in: " << _in.size() << ","
<< "out: " << _out.size();
// release out tensor object resource
size_t in_size = _in.size();
for (size_t ii = 0; ii < in_size; ++ii) {
_in[ii].shape.clear();
}
_in.clear();
size_t out_size = _out.size();
for (size_t oi = 0; oi < out_size; ++oi) {
_out[oi].shape.clear();
}
_out.clear();
return 0;
}
DEFINE_OP(DenseOp);
} // serving
} // paddle_serving
} // baidu
#pragma once
#include "native_tensor.pb.h"
#include "common/inner_common.h"
#include "op/op.h"
#include "framework/channel.h"
#include "framework/op_repository.h"
#include "paddle/fluid/inference/paddle_inference_api.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
static const char* BUILTIN_DENSE_FORMAT_MODEL_NAME
= "image_classification_resnet";
class DenseOp : public baidu::paddle_serving::predictor::OpWithChannel<
baidu::paddle_serving::predictor::native_tensor::DenseResponse> {
public:
typedef std::vector<paddle::PaddleTensor> TensorVector;
DECLARE_OP(DenseOp);
int inference();
private:
TensorVector _in;
TensorVector _out;
};
template<baidu::paddle_serving::predictor::native_tensor::TensorType type>
class AppendHelper {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
LOG(ERROR) << "Not implement append1 for type: " << type;
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
LOG(ERROR) << "Not implement append2 for type:" << type;
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::FLOAT> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((float*)dst)[index] = src.float_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_float_data(((float*)src)[index]);
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::DOUBLE> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((double*)dst)[index] = src.double_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_double_data(((double*)dst)[index]);
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::UINT32> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((uint32_t*)dst)[index] = src.uint32_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_uint32_data(((uint32_t*)dst)[index]);
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::UINT64> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((uint64_t*)dst)[index] = src.uint64_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_uint64_data(((uint64_t*)dst)[index]);
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::INT32> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((int32_t*)dst)[index] = src.int32_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_int32_data(((int32_t*)dst)[index]);
}
};
template<>
class AppendHelper<baidu::paddle_serving::predictor::native_tensor::TensorType::INT64> {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(const DenseTensor& src, void* dst, uint32_t index) {
((int64_t*)dst)[index] = src.int64_data(index);
}
static void append2(const void* src, DenseTensor* dst, uint32_t index) {
dst->add_int64_data(((int64_t*)dst)[index]);
}
};
class AppendHelerWrapper {
public:
typedef baidu::paddle_serving::predictor::native_tensor::DenseTensor DenseTensor;
static void append1(
baidu::paddle_serving::predictor::native_tensor::TensorType type,
const DenseTensor& src, void* dst, uint32_t index) {
switch (type) {
case baidu::paddle_serving::predictor::native_tensor::TensorType::FLOAT:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::FLOAT
>::append1(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::DOUBLE:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::DOUBLE
>::append1(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::UINT32:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::UINT32
>::append1(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::UINT64:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::UINT64
>::append1(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::INT32:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::INT32
>::append1(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::INT64:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::INT64
>::append1(src, dst, index);
break;
default:
;
}
}
static void append2(
baidu::paddle_serving::predictor::native_tensor::TensorType type,
const void* src, DenseTensor* dst, uint32_t index) {
switch (type) {
case baidu::paddle_serving::predictor::native_tensor::TensorType::FLOAT:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::FLOAT
>::append2(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::DOUBLE:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::DOUBLE
>::append2(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::UINT32:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::UINT32
>::append2(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::UINT64:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::UINT64
>::append2(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::INT32:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::INT32
>::append2(src, dst, index);
break;
case baidu::paddle_serving::predictor::native_tensor::TensorType::INT64:
AppendHelper<
baidu::paddle_serving::predictor::native_tensor::TensorType::INT64
>::append2(src, dst, index);
break;
default:
;
}
}
};
} // serving
} // paddle_serving
} // baidu
#include "op/int64tensor_echo_op.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
using baidu::paddle_serving::predictor::format::Float32TensorPredictor;
using baidu::paddle_serving::predictor::int64tensor_service::Request;
using baidu::paddle_serving::predictor::int64tensor_service::Response;
int Int64TensorEchoOp::inference() {
const Request* req =
dynamic_cast<const Request*>(get_request_message());
Response* res = mutable_data<Response>();
LOG(INFO) << "Receive request in dense service:"
<< req->ShortDebugString();
uint32_t sample_size = req->instances_size();
for (uint32_t si = 0; si < sample_size; si++) {
Float32TensorPredictor* float32_tensor_res =
res->mutable_predictions()->Add();
float32_tensor_res->add_data(1.0);
float32_tensor_res->add_data(2.0);
float32_tensor_res->add_shape(2);
float32_tensor_res->add_shape(1);
}
return 0;
}
DEFINE_OP(Int64TensorEchoOp);
} // predictor
} // paddle_serving
} // baidu
#ifndef BAIDU_PADDLE_SSERVER_PREDICTOR_OP_DENSE_ECHO_OP_H
#define BAIDU_PADDLE_SSERVER_PREDICTOR_OP_DENSE_ECHO_OP_H
#include "int64tensor_service.pb.h"
#include "common/inner_common.h"
#include "op/op.h"
#include "framework/channel.h"
#include "framework/op_repository.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
class Int64TensorEchoOp : public OpWithChannel<
baidu::paddle_serving::predictor::int64tensor_service::Response> {
public:
DECLARE_OP(Int64TensorEchoOp);
int inference();
};
} // predictor
} // paddle_serving
} // baidu
#endif
LIST(APPEND protofiles LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/image_class.proto ${CMAKE_CURRENT_LIST_DIR}/image_class.proto
${CMAKE_CURRENT_LIST_DIR}/native_tensor.proto ${CMAKE_CURRENT_LIST_DIR}/dense_service.proto
${CMAKE_CURRENT_LIST_DIR}/sparse_service.proto
${CMAKE_CURRENT_LIST_DIR}/echo_service.proto
${CMAKE_CURRENT_LIST_DIR}/int64tensor_service.proto
) )
PROTOBUF_GENERATE_SERVING_CPP(PROTO_SRCS PROTO_HDRS ${protofiles}) PROTOBUF_GENERATE_SERVING_CPP(PROTO_SRCS PROTO_HDRS ${protofiles})
......
...@@ -12,7 +12,7 @@ message Request { ...@@ -12,7 +12,7 @@ message Request {
message Response { message Response {
repeated baidu.paddle_serving.predictor.format.Float32TensorPredictor repeated baidu.paddle_serving.predictor.format.Float32TensorPredictor
predictions = 2; predictions = 1;
}; };
service BuiltinFluidService { service BuiltinFluidService {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册