提交 3f35cbc4 编写于 作者: W wangguibao

sdk-cpp

Change-Id: I5845dc156899b3c9ee6291c0498a52fcb0b3d47f
上级 83b68099
......@@ -75,6 +75,7 @@ include(flags)
include(configure)
include(generic)
include(paddlepaddle)
include(external/opencv)
include_directories("${PADDLE_SERVING_SOURCE_DIR}")
......@@ -104,3 +105,4 @@ add_subdirectory(mempool)
add_subdirectory(predictor)
add_subdirectory(inferencer-fluid-cpu)
add_subdirectory(serving)
add_subdirectory(sdk-cpp)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
include (ExternalProject)
# NOTE: opencv is needed when linking with paddle-serving example
set(OPENCV_SOURCES_DIR ${THIRD_PARTY_PATH}/opencv)
set(OPENCV_INSTALL_DIR ${THIRD_PARTY_PATH}/install/opencv)
set(OPENCV_INCLUDE_DIR "${OPENCV_INSTALL_DIR}/include" CACHE PATH "Opencv include directory." FORCE)
INCLUDE_DIRECTORIES(${OPENCV_INCLUDE_DIR})
ExternalProject_Add(
extern_opencv
GIT_REPOSITORY "https://github.com/opencv/opencv"
GIT_TAG "3.2.0"
PREFIX ${OPENCV_SOURCES_DIR}
UPDATE_COMMAND ""
PATCH_COMMAND cmake -E copy ${CMAKE_SOURCE_DIR}/cmake/patch/opencv_ippicv_downloader.cmake ${OPENCV_SOURCES_DIR}/src/extern_opencv/3rdparty/ippicv/downloader.cmake
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG}
-DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE}
-DCMAKE_CXX_FLAGS=${OPENCV_CMAKE_CXX_FLAGS}
-DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE}
-DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG}
-DCMAKE_INSTALL_PREFIX=${OPENCV_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${OPENCV_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_TESTS=OFF
-DBUILD_PERF_TESTS=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DWITH_EIGEN=OFF
-DWITH_JPEG=OFF
-DWITH_PNG=OFF
-DWITH_TIFF=OFF
-DBUILD_SHARED_LIBS=OFF
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${OPENCV_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${OPENCV_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
)
IF(WIN32)
IF(NOT EXISTS "${OPENCV_INSTALL_DIR}/lib/libopencv_core.lib")
add_custom_command(TARGET extern_opencv POST_BUILD
COMMAND cmake -E copy ${OPENCV_INSTALL_DIR}/lib/opencv_core.lib ${OPENCV_INSTALL_DIR}/lib/libopencv_core.lib
)
ENDIF()
IF(NOT EXISTS "${OPENCV_INSTALL_DIR}/lib/libopencv_imgcodecs.lib")
add_custom_command(TARGET extern_opencv POST_BUILD
COMMAND cmake -E copy ${OPENCV_INSTALL_DIR}/lib/opencv_imgcodecs.lib ${OPENCV_INSTALL_DIR}/lib/libopencv_imgcodecs.lib
)
ENDIF()
IF(NOT EXISTS "${OPENCV_INSTALL_DIR}/lib/libopencv_imgproc.lib")
add_custom_command(TARGET extern_opencv POST_BUILD
COMMAND cmake -E copy ${OPENCV_INSTALL_DIR}/lib/opencv_imgproc.lib ${OPENCV_INSTALL_DIR}/lib/libopencv_imgproc.lib
)
ENDIF()
set(OPENCV_CORE_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_core.lib")
set(OPENCV_IMGCODECS_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_imgcodecs.lib")
set(OPENCV_IMGPROC_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_imgproc.lib")
else(WIN32)
set(OPENCV_CORE_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_core.a")
set(OPENCV_IMGCODECS_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_imgcodecs.a")
set(OPENCV_IMGPROC_LIB "${OPENCV_INSTALL_DIR}/lib/libopencv_imgproc.a")
endif (WIN32)
add_library(opencv_core STATIC IMPORTED GLOBAL)
set_property(TARGET opencv_core PROPERTY IMPORTED_LOCATION ${OPENCV_CORE_LIB})
add_library(opencv_imgcodecs STATIC IMPORTED GLOBAL)
set_property(TARGET opencv_imgcodecs PROPERTY IMPORTED_LOCATION ${OPENCV_IMGCODECS_LIB})
add_library(opencv_imgproc STATIC IMPORTED GLOBAL)
set_property(TARGET opencv_imgproc PROPERTY IMPORTED_LOCATION ${OPENCV_IMGPROC_LIB})
include_directories(${OPENCV_INCLUDE_DIR})
add_dependencies(opencv_core extern_opencv)
ExternalProject_Get_Property(extern_opencv BINARY_DIR)
ADD_LIBRARY(ippicv STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET ippicv PROPERTY IMPORTED_LOCATION ${BINARY_DIR}/3rdparty/ippicv/ippicv_lnx/lib/intel64/libippicv.a)
ADD_LIBRARY(IlmImf STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET IlmImf PROPERTY IMPORTED_LOCATION ${BINARY_DIR}/3rdparty/lib/libIlmImf.a)
ADD_LIBRARY(libjasper STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET libjasper PROPERTY IMPORTED_LOCATION ${BINARY_DIR}/3rdparty/lib/liblibjasper.a)
ADD_LIBRARY(libwebp STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET libwebp PROPERTY IMPORTED_LOCATION ${BINARY_DIR}/3rdparty/lib/liblibwebp.a)
#ADD_LIBRARY(zlib STATIC IMPORTED GLOBAL)
#SET_PROPERTY(TARGET zlib PROPERTY IMPORTED_LOCATION ${BINARY_DIR}/3rdparty/lib/libzlib.a)
LIST(APPEND opencv_depend_libs
opencv_imgproc
opencv_core
ippicv
IlmImf
libjasper
libwebp
zlib)
......@@ -114,6 +114,7 @@ SET(CMAKE_EXTRA_INCLUDE_FILES "")
# https://github.com/PaddlePaddle/Paddle/issues/12773
if (NOT WIN32)
set(COMMON_FLAGS
-D__const__=
-fPIC
-fno-omit-frame-pointer
-Wall
......
......@@ -839,7 +839,7 @@ function(PROTOBUF_GENERATE_SERVING_CPP SRCS HDRS)
ARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR}
--pdcodegen_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-pdcodegen=${CMAKE_BINARY_DIR}/predictor/pdcodegen
--proto_path=${CMAKE_SOURCE_DIR}/predictor/proto
# --proto_path=${CMAKE_SOURCE_DIR}/predictor/proto
${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL} ${Protobuf_PROTOC_EXECUTABLE}
COMMENT "Running Paddle-serving C++ protocol buffer compiler on ${FIL}"
......
#
# The script downloads ICV package
#
# On return this will define:
# OPENCV_ICV_PATH - path to unpacked downloaded package
#
function(_icv_downloader)
# Commit SHA in the opencv_3rdparty repo
set(IPPICV_BINARIES_COMMIT "81a676001ca8075ada498583e4166079e5744668")
# Define actual ICV versions
if(APPLE)
set(OPENCV_ICV_PACKAGE_NAME "ippicv_macosx_20151201.tgz")
set(OPENCV_ICV_PACKAGE_HASH "4ff1fde9a7cfdfe7250bfcd8334e0f2f")
set(OPENCV_ICV_PLATFORM "macosx")
set(OPENCV_ICV_PACKAGE_SUBDIR "/ippicv_osx")
elseif(UNIX)
if(ANDROID AND NOT (ANDROID_ABI STREQUAL x86 OR ANDROID_ABI STREQUAL x86_64))
return()
endif()
set(OPENCV_ICV_PACKAGE_NAME "ippicv_linux_20151201.tgz")
set(OPENCV_ICV_PACKAGE_HASH "808b791a6eac9ed78d32a7666804320e")
set(OPENCV_ICV_PLATFORM "linux")
set(OPENCV_ICV_PACKAGE_SUBDIR "/ippicv_lnx")
elseif(WIN32 AND NOT ARM)
set(OPENCV_ICV_PACKAGE_NAME "ippicv_windows_20151201.zip")
set(OPENCV_ICV_PACKAGE_HASH "04e81ce5d0e329c3fbc606ae32cad44d")
set(OPENCV_ICV_PLATFORM "windows")
set(OPENCV_ICV_PACKAGE_SUBDIR "/ippicv_win")
else()
return() # Not supported
endif()
set(OPENCV_ICV_UNPACK_PATH "${CMAKE_BINARY_DIR}/3rdparty/ippicv")
set(OPENCV_ICV_PATH "${OPENCV_ICV_UNPACK_PATH}${OPENCV_ICV_PACKAGE_SUBDIR}")
if(DEFINED OPENCV_ICV_PACKAGE_DOWNLOADED
AND OPENCV_ICV_PACKAGE_DOWNLOADED STREQUAL OPENCV_ICV_PACKAGE_HASH
AND EXISTS ${OPENCV_ICV_PATH})
# Package has been downloaded and checked by the previous build
set(OPENCV_ICV_PATH "${OPENCV_ICV_PATH}" PARENT_SCOPE)
return()
else()
if(EXISTS ${OPENCV_ICV_UNPACK_PATH})
message(STATUS "ICV: Removing previous unpacked package: ${OPENCV_ICV_UNPACK_PATH}")
file(REMOVE_RECURSE ${OPENCV_ICV_UNPACK_PATH})
endif()
endif()
unset(OPENCV_ICV_PACKAGE_DOWNLOADED CACHE)
set(OPENCV_ICV_PACKAGE_ARCHIVE "${CMAKE_CURRENT_LIST_DIR}/downloads/${OPENCV_ICV_PLATFORM}-${OPENCV_ICV_PACKAGE_HASH}/${OPENCV_ICV_PACKAGE_NAME}")
get_filename_component(OPENCV_ICV_PACKAGE_ARCHIVE_DIR "${OPENCV_ICV_PACKAGE_ARCHIVE}" PATH)
if(EXISTS "${OPENCV_ICV_PACKAGE_ARCHIVE}")
file(MD5 "${OPENCV_ICV_PACKAGE_ARCHIVE}" archive_md5)
if(NOT archive_md5 STREQUAL OPENCV_ICV_PACKAGE_HASH)
message(WARNING "ICV: Local copy of ICV package has invalid MD5 hash: ${archive_md5} (expected: ${OPENCV_ICV_PACKAGE_HASH})")
file(REMOVE "${OPENCV_ICV_PACKAGE_ARCHIVE}")
file(REMOVE_RECURSE "${OPENCV_ICV_PACKAGE_ARCHIVE_DIR}")
endif()
endif()
if(NOT EXISTS "${OPENCV_ICV_PACKAGE_ARCHIVE}")
if(NOT DEFINED OPENCV_ICV_URL)
if(DEFINED ENV{OPENCV_ICV_URL})
set(OPENCV_ICV_URL $ENV{OPENCV_ICV_URL})
else()
set(OPENCV_ICV_URL "https://raw.githubusercontent.com/opencv/opencv_3rdparty/${IPPICV_BINARIES_COMMIT}/ippicv")
endif()
endif()
file(MAKE_DIRECTORY ${OPENCV_ICV_PACKAGE_ARCHIVE_DIR})
message(STATUS "ICV: Downloading ${OPENCV_ICV_PACKAGE_NAME}...")
execute_process(COMMAND wget --no-check-certificate -O "${OPENCV_ICV_PACKAGE_ARCHIVE}" "${OPENCV_ICV_URL}/${OPENCV_ICV_PACKAGE_NAME}" RESULT_VARIABLE __status)
if(NOT __status EQUAL 0)
message(FATAL_ERROR "ICV: Failed to download ICV package: ${OPENCV_ICV_PACKAGE_NAME}. Status=${__status}")
else()
# Don't remove this code, because EXPECTED_MD5 parameter doesn't fail "file(DOWNLOAD)" step
# on wrong hash
file(MD5 "${OPENCV_ICV_PACKAGE_ARCHIVE}" archive_md5)
if(NOT archive_md5 STREQUAL OPENCV_ICV_PACKAGE_HASH)
message(FATAL_ERROR "ICV: Downloaded copy of ICV package has invalid MD5 hash: ${archive_md5} (expected: ${OPENCV_ICV_PACKAGE_HASH})")
endif()
endif()
endif()
ocv_assert(EXISTS "${OPENCV_ICV_PACKAGE_ARCHIVE}")
ocv_assert(NOT EXISTS "${OPENCV_ICV_UNPACK_PATH}")
file(MAKE_DIRECTORY ${OPENCV_ICV_UNPACK_PATH})
ocv_assert(EXISTS "${OPENCV_ICV_UNPACK_PATH}")
message(STATUS "ICV: Unpacking ${OPENCV_ICV_PACKAGE_NAME} to ${OPENCV_ICV_UNPACK_PATH}...")
execute_process(COMMAND ${CMAKE_COMMAND} -E tar xz "${OPENCV_ICV_PACKAGE_ARCHIVE}"
WORKING_DIRECTORY "${OPENCV_ICV_UNPACK_PATH}"
RESULT_VARIABLE __result)
if(NOT __result EQUAL 0)
message(FATAL_ERROR "ICV: Failed to unpack ICV package from ${OPENCV_ICV_PACKAGE_ARCHIVE} to ${OPENCV_ICV_UNPACK_PATH} with error ${__result}")
endif()
ocv_assert(EXISTS "${OPENCV_ICV_PATH}")
set(OPENCV_ICV_PACKAGE_DOWNLOADED "${OPENCV_ICV_PACKAGE_HASH}" CACHE INTERNAL "ICV package hash")
message(STATUS "ICV: Package successfully downloaded")
set(OPENCV_ICV_PATH "${OPENCV_ICV_PATH}" PARENT_SCOPE)
endfunction()
_icv_downloader()
......@@ -9,6 +9,9 @@ namespace predictor {
// __thread bool p_thread_initialized = false;
static void dynamic_resource_deleter(void* d) {
#if 1
LOG(INFO) << "dynamic_resource_delete on " << bthread_self();
#endif
delete static_cast<DynamicResource*>(d);
}
......@@ -105,8 +108,12 @@ int Resource::thread_initialize() {
}
}
#if 0
LOG(INFO) << "Successfully thread initialized dynamic resource";
#else
LOG(INFO) << bthread_self() << ": Successfully thread initialized dynamic resource " << p_dynamic_resource;
#endif
return 0;
}
......@@ -125,7 +132,11 @@ int Resource::thread_clear() {
DynamicResource* p_dynamic_resource = (DynamicResource*) THREAD_GETSPECIFIC(_tls_bspec_key);
if (p_dynamic_resource == NULL) {
#if 0
LOG(FATAL) << "tls dynamic resource shouldn't be null after thread_initialize";
#else
LOG(FATAL) << bthread_self() << ": tls dynamic resource shouldn't be null after thread_initialize";
#endif
return -1;
}
if (p_dynamic_resource->clear() != 0) {
......@@ -133,6 +144,7 @@ int Resource::thread_clear() {
return -1;
}
LOG(INFO) << bthread_self() << "Resource::thread_clear success";
// ...
return 0;
}
......
include(src/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_library(sdk-cpp ${sdk_cpp_srcs})
add_dependencies(sdk-cpp configure)
target_include_directories(sdk-cpp PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMKAE_CURRENT_BINARY_DIR}/
${CMAKE_CURRENT_LIST_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../ullib/include
${CMAKE_CURRENT_BINARY_DIR}/../bsl/include
)
target_link_libraries(sdk-cpp brpc configure protobuf leveldb)
add_executable(ximage ${CMAKE_CURRENT_LIST_DIR}/demo/ximage.cpp)
target_include_directories(ximage PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_LIST_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../ullib/include
${CMAKE_CURRENT_BINARY_DIR}/../bsl/include)
target_link_libraries(ximage sdk-cpp -lpthread -lcrypto -lm -lrt -lssl -ldl
-lz)
add_executable(mapcnn_dense ${CMAKE_CURRENT_LIST_DIR}/demo/mapcnn_dense.cpp)
target_include_directories(mapcnn_dense PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}/
${CMAKE_CURRENT_LIST_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../ullib/include
${CMAKE_CURRENT_BINARY_DIR}/../bsl/include)
target_link_libraries(mapcnn_dense sdk-cpp -lpthread -lcrypto -lm -lrt -lssl
-ldl -lz)
add_executable(mapcnn_sparse ${CMAKE_CURRENT_LIST_DIR}/demo/mapcnn_sparse.cpp)
target_include_directories(mapcnn_sparse PUBLIC
${CMAKE_CURRENT_LIST_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}/
${CMAKE_CURRENT_LIST_DIR}/../configure
${CMAKE_CURRENT_LIST_DIR}/../ullib/include
${CMAKE_CURRENT_BINARY_DIR}/../bsl/include)
target_link_libraries(mapcnn_sparse sdk-cpp -lpthread -lcrypto -lm -lrt -lssl
-ldl -lz)
[DefaultVariantInfo]
Tag : default
[.Connection]
ConnectTimeoutMilliSec: 2000
RpcTimeoutMilliSec: 20000
ConnectRetryCount : 2
MaxConnectionPerHost : 100
HedgeRequestTimeoutMilliSec: -1
HedgeFetchRetryCount : 2
BnsReloadIntervalSeconds : 10
ConnectionType : pooled
[.NamingInfo]
ClusterFilterStrategy : Default
LoadBalanceStrategy : la
[.RpcParameter]
# 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
CompressType : 0
PackageSize : 20
Protocol : baidu_std
MaxChannelPerRequest : 3
[@Predictor]
name : ximage
service_name : baidu.paddle_serving.predictor.image_classification.ImageClassifyService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 50
[.@VariantInfo]
Tag : var1
[..NamingInfo]
Cluster : list://127.0.0.1:8010
#Cluster : list://10.88.158.21:8010
[DefaultVariantInfo]
Tag : default
[.Connection]
ConnectTimeoutMilliSec: 2000
RpcTimeoutMilliSec: 20000
ConnectRetryCount : 2
MaxConnectionPerHost : 100
HedgeRequestTimeoutMilliSec: -1
HedgeFetchRetryCount : 2
BnsReloadIntervalSeconds : 10
ConnectionType : pooled
[.NamingInfo]
ClusterFilterStrategy : Default
LoadBalanceStrategy : la
[.RpcParameter]
# 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
CompressType : 0
PackageSize : 20
Protocol : baidu_std
MaxChannelPerRequest : 3
[@Predictor]
name : ximage
service_name : baidu.paddle_serving.predictor.image_classification.ImageClassifyService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 50
[.@VariantInfo]
Tag : var1
[..NamingInfo]
Cluster : list://127.0.0.1:8010
#Cluster : list://10.88.158.21:8010
[@Predictor]
name : dense_cnn
service_name : baidu.paddle_serving.fluid_engine.DefaultDenseService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 25
[.@VariantInfo]
Tag : var1
[..NamingInfo]
Cluster : list://10.194.83.21:8010
#Cluster : bns://opera-ps-mapcnn-000-nj03.MAP.nj03
[..Connection]
[@Predictor]
name : sparse_cnn
service_name : baidu.paddle_serving.fluid_engine.DefaultSparseService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 25
[.@VariantInfo]
Tag : var1
[..NamingInfo]
Cluster : list://10.194.83.21:8010
#Cluster : bns://opera-ps-mapcnn-000-nj03.MAP.nj03
[..Connection]
[@Predictor]
name : wasq
service_name : baidu.infinite.map_rnn.MapRnnService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 25
[.@VariantInfo]
Tag : var1
[..NamingInfo]
Cluster : list://127.0.0.1:8010
#Cluster : bns://opera-ps-mapcnn-000-nj03.MAP.nj03
[..Connection]
此差异已折叠。
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author root(root@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <fstream>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include "common.h"
#include "predictor_sdk.h"
#include "map_cnn.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::infinite::map_model::SparseTensor;
using baidu::infinite::map_model::SparseInstance;
using baidu::infinite::map_model::DensePrediction;
using baidu::infinite::map_model::Request;
using baidu::infinite::map_model::Response;
using baidu::infinite::map_model::MapCnnService;
static const uint32_t SELECT_VALID_UNIT = 1000;
class InputData {
public:
InputData() {}
~InputData() {}
int create(const std::string file_name, size_t buf_size,
size_t batch_size, int qps) {
pthread_mutex_init(&_mutex, NULL);
FILE* fp = fopen(file_name.c_str(), "r");
if (!fp) {
LOG(FATAL) << "Failed open data file: "
<< file_name;
return -1;
}
_data.clear();
char buffer[2048000];
std::vector<std::string> tokens;
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
tokens.clear();
baidu::paddle_serving::sdk_cpp::str_split(
buffer, ",", &tokens);
std::vector<float> feature_one;
for (size_t i = 0; i < tokens.size(); i++){
feature_one.push_back(
strtof(tokens[i].c_str(), NULL));
}
_data.push_back(feature_one);
}
printf("succ load data, size:%ld\n", _data.size());
for (size_t ri = 0; ri < buf_size; ri++) {
Request* req = new Request();
if (generate_one_req(*req, batch_size) != 0) {
LOG(FATAL) << "Failed generate req at: " << ri;
fclose(fp);
return -1;
}
_req_list.push_back(req);
}
fclose(fp);
_current = 0;
_waitingtm = 0;
_lasttm.tv_sec = _lasttm.tv_usec = 0;
if (qps == 0) {
_interval = 0;
} else if (qps < 1) {
_interval = 1000 * 1000;
} else {
_interval = 1000 * 1000 / qps;
}
LOG(INFO) << "Succ create req, size: " << buf_size
<< ", batch_size: " << batch_size;
return 0;
}
void destroy() {
size_t ds = _data.size();
for (size_t di = 0; di < ds; di++) {
_data[di].clear();
}
_data.clear();
size_t rs = _req_list.size();
for (size_t ri = 0; ri < rs; ri++) {
delete _req_list[ri];
}
_req_list.clear();
}
Request* next_req() {
pthread_mutex_lock(&_mutex);
if (_interval != 0)
{
if (_lasttm.tv_sec == 0 && _lasttm.tv_usec == 0)
{
gettimeofday(&_lasttm, NULL);
}
else
{
timeval curtm;
gettimeofday(&curtm, NULL);
long elapse =
((curtm.tv_sec - _lasttm.tv_sec) * 1000*1000 +
(curtm.tv_usec - _lasttm.tv_usec));
_waitingtm += _interval - elapse;
_lasttm = curtm;
if (_waitingtm >= SELECT_VALID_UNIT) // select的最小响应单位
{
long tm_unit
= _waitingtm / SELECT_VALID_UNIT * SELECT_VALID_UNIT;
timeval tmp_tm = {tm_unit / 1000000, tm_unit % 1000000};
select(1, NULL, NULL, NULL, &tmp_tm); //延时以控制压力speed
}
else if (_waitingtm <= SELECT_VALID_UNIT * (-2))
{
_waitingtm = -SELECT_VALID_UNIT;
}
}
}
size_t rs = _req_list.size();
Request* req = _req_list[(_current++) % rs];
pthread_mutex_unlock(&_mutex);
return req;
}
int generate_one_req(Request& req, int batch) {
int batch_size = batch;
std::vector<std::vector<int> > shapes;
shapes.clear();
int p_shape[] = {batch_size, 37, 1, 1};
std::vector<int> shape(p_shape, p_shape + 4);
shapes.push_back(shape);
int p_shape1[] = {batch_size, 1, 50, 12};
std::vector<int> shape1(p_shape1, p_shape1 + 4);
shapes.push_back(shape1);
int p_shape2[] = {batch_size, 1, 50, 19};
std::vector<int> shape2(p_shape2, p_shape2 + 4);
shapes.push_back(shape2);
int p_shape3[] = {batch_size, 1, 50, 1};
std::vector<int> shape3(p_shape3, p_shape3 + 4);
shapes.push_back(shape3);
int p_shape4[] = {batch_size, 4, 50, 1};
std::vector<int> shape4(p_shape4, p_shape4 + 4);
shapes.push_back(shape4);
int p_shape5[] = {batch_size, 1, 50, 1};
std::vector<int> shape5(p_shape5, p_shape5 + 4);
shapes.push_back(shape5);
int p_shape6[] = {batch_size, 5, 50, 1};
std::vector<int> shape6(p_shape6, p_shape6 + 4);
shapes.push_back(shape6);
int p_shape7[] = {batch_size, 7, 50, 1};
std::vector<int> shape7(p_shape7, p_shape7 + 4);
shapes.push_back(shape7);
int p_shape8[] = {batch_size, 3, 50, 1};
std::vector<int> shape8(p_shape8, p_shape8 + 4);
shapes.push_back(shape8);
int p_shape9[] = {batch_size, 32, 50, 1}; // added
std::vector<int> shape9(p_shape9, p_shape9 + 4);
shapes.push_back(shape9);
std::vector<std::string> tensor_names;
tensor_names.push_back("input_0");
tensor_names.push_back("input_1");
tensor_names.push_back("input_2");
tensor_names.push_back("input_3");
tensor_names.push_back("input_4");
tensor_names.push_back("input_5");
tensor_names.push_back("input_6");
tensor_names.push_back("input_7");
tensor_names.push_back("input_8");
tensor_names.push_back("input_9");
SparseInstance* ins = req.add_instances();
for (int fi = 0; fi < _data.size(); ++fi) {
SparseTensor* tensor = ins->add_tensors();
tensor->set_name(tensor_names[fi]);
int len = 1;
for (int si = 0; si < shapes[fi].size(); ++si) {
len *= shapes[fi][si];
}
for (int si = 0; si < shapes[fi].size(); ++si) {
tensor->add_shape(shapes[fi][si]);
}
tensor->set_features(&(_data[fi][0]), len * sizeof(float));
}
return 0;
}
private:
std::vector<std::vector<float> > _data;
std::vector<Request*> _req_list;
pthread_mutex_t _mutex;
long _waitingtm;
long _interval;
timeval _lasttm;
int _current;
};
void print_res(
const Request* req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
uint32_t sample_size = res.predictions_size();
for (int i = 0; i < sample_size; ++i) {
const ::baidu::infinite::map_model::DensePrediction& prediction = res.predictions(i);
int cat_size = prediction.categories_size();
for (int j = 0; j < cat_size; ++j) {
//LOG(INFO) << "categories:" << prediction.categories(j);
}
}
LOG(INFO)
<< "Succ call predictor[wasq], res sample size: "
<< sample_size << ", the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
struct Arg {
PredictorApi* api;
InputData* input;
};
void* work(void* p) {
Arg* arg = (Arg*) p;
InputData* input = arg->input;
PredictorApi* api = arg->api;
Response res;
LOG(WARNING) << "Thread entry!";
while (true) {
Predictor* predictor = api->fetch_predictor("mapcnn");
if (!predictor) {
LOG(FATAL) << "Failed fetch predictor: wasq";
return NULL;
}
Request* req = input->next_req();
res.Clear();
timeval start;
gettimeofday(&start, NULL);
if (predictor->inference(req, &res) != 0) {
LOG(FATAL) << "failed call predictor with req:"
<< req->ShortDebugString();
return NULL;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
if (api->free_predictor(predictor) != 0) {
printf("failed free predictor\n");
}
}
LOG(WARNING) << "Thread exit!";
return NULL;
}
int main(int argc, char** argv) {
if (argc != 5) {
printf("Usage: demo req_buf_size batch_size threads qps\n");
return -1;
}
int req_buffer = atoi(argv[1]);
int batch_size = atoi(argv[2]);
int thread_num = atoi(argv[3]);
int qps = atoi(argv[4]);
PredictorApi api;
if (api.create("./conf", "predictors.conf") != 0) {
LOG(FATAL) << "Failed create predictors api!";
return -1;
}
InputData data;
if (data.create(
"./data/pure_feature", req_buffer, batch_size, qps) != 0) {
LOG(FATAL) << "Failed create inputdata!";
return -1;
}
Arg arg = {&api, &data};
pthread_t* threads = new pthread_t[thread_num];
if (!threads) {
LOG(FATAL) << "Failed create threads, num:" << thread_num;
return -1;
}
for (int i = 0; i < thread_num; ++i) {
pthread_create(threads + i, NULL, work, &arg);
}
for (int i = 0; i < thread_num; ++i) {
pthread_join(threads[i], NULL);
}
delete[] threads;
data.destroy();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author root(root@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include <fstream>
#include <unistd.h>
#include <stdlib.h>
#include <bthread/bthread.h>
#include "common.h"
#include "predictor_sdk.h"
#include "default_schema.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::fluid_engine::SparseTensor;
using baidu::paddle_serving::fluid_engine::SparseInstance;
using baidu::paddle_serving::fluid_engine::Prediction;
using baidu::paddle_serving::fluid_engine::SparseRequest;
using baidu::paddle_serving::fluid_engine::Response;
static const uint32_t SELECT_VALID_UNIT = 1000;
class InputData {
public:
InputData() {}
~InputData() {}
int create(const std::string file_name, size_t buf_size,
size_t batch_size, int qps) {
bthread_mutex_init(&_mutex, NULL);
FILE* fp = fopen(file_name.c_str(), "r");
if (!fp) {
LOG(FATAL) << "Failed open data file: "
<< file_name;
return -1;
}
_data.clear();
char buffer[2048000];
std::vector<std::string> tokens;
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
tokens.clear();
baidu::paddle_serving::sdk_cpp::str_split(
buffer, ",", &tokens);
std::vector<float> feature_one;
for (size_t i = 0; i < tokens.size(); i++){
feature_one.push_back(
strtof(tokens[i].c_str(), NULL));
}
_data.push_back(feature_one);
}
printf("succ load data, size:%ld\n", _data.size());
for (size_t ri = 0; ri < buf_size; ri++) {
SparseRequest* req = new SparseRequest();
if (generate_one_req(*req, batch_size) != 0) {
LOG(FATAL) << "Failed generate req at: " << ri;
fclose(fp);
return -1;
}
_req_list.push_back(req);
}
fclose(fp);
_current = 0;
_waitingtm = 0;
_lasttm.tv_sec = _lasttm.tv_usec = 0;
if (qps == 0) {
_interval = 0;
} else if (qps < 1) {
_interval = 1000 * 1000;
} else {
_interval = 1000 * 1000 / qps;
}
LOG(INFO) << "Succ create req, size: " << buf_size
<< ", batch_size: " << batch_size;
return 0;
}
void destroy() {
size_t ds = _data.size();
for (size_t di = 0; di < ds; di++) {
_data[di].clear();
}
_data.clear();
size_t rs = _req_list.size();
for (size_t ri = 0; ri < rs; ri++) {
delete _req_list[ri];
}
_req_list.clear();
}
SparseRequest* next_req() {
bthread_mutex_lock(&_mutex);
/*
if (_interval != 0)
{
if (_lasttm.tv_sec == 0 && _lasttm.tv_usec == 0)
{
gettimeofday(&_lasttm, NULL);
}
else
{
timeval curtm;
gettimeofday(&curtm, NULL);
long elapse =
((curtm.tv_sec - _lasttm.tv_sec) * 1000*1000 +
(curtm.tv_usec - _lasttm.tv_usec));
_waitingtm += _interval - elapse;
_lasttm = curtm;
if (_waitingtm >= SELECT_VALID_UNIT) // select的最小响应单位
{
long tm_unit
= _waitingtm / SELECT_VALID_UNIT * SELECT_VALID_UNIT;
timeval tmp_tm = {tm_unit / 1000000, tm_unit % 1000000};
select(1, NULL, NULL, NULL, &tmp_tm); //延时以控制压力speed
}
else if (_waitingtm <= SELECT_VALID_UNIT * (-2))
{
_waitingtm = -SELECT_VALID_UNIT;
}
}
}*/
size_t rs = _req_list.size();
SparseRequest* req = _req_list[(_current++) % rs];
bthread_mutex_unlock(&_mutex);
return req;
}
int generate_one_req(SparseRequest& req, int batch) {
int batch_size = batch;
std::vector<std::vector<int> > shapes;
shapes.clear();
int p_shape[] = {batch_size, 37, 1, 1};
std::vector<int> shape(p_shape, p_shape + 4);
shapes.push_back(shape);
int p_shape1[] = {batch_size, 1, 50, 12};
std::vector<int> shape1(p_shape1, p_shape1 + 4);
shapes.push_back(shape1);
int p_shape2[] = {batch_size, 1, 50, 19};
std::vector<int> shape2(p_shape2, p_shape2 + 4);
shapes.push_back(shape2);
int p_shape3[] = {batch_size, 1, 50, 1};
std::vector<int> shape3(p_shape3, p_shape3 + 4);
shapes.push_back(shape3);
int p_shape4[] = {batch_size, 4, 50, 1};
std::vector<int> shape4(p_shape4, p_shape4 + 4);
shapes.push_back(shape4);
int p_shape5[] = {batch_size, 1, 50, 1};
std::vector<int> shape5(p_shape5, p_shape5 + 4);
shapes.push_back(shape5);
int p_shape6[] = {batch_size, 5, 50, 1};
std::vector<int> shape6(p_shape6, p_shape6 + 4);
shapes.push_back(shape6);
int p_shape7[] = {batch_size, 7, 50, 1};
std::vector<int> shape7(p_shape7, p_shape7 + 4);
shapes.push_back(shape7);
int p_shape8[] = {batch_size, 3, 50, 1};
std::vector<int> shape8(p_shape8, p_shape8 + 4);
shapes.push_back(shape8);
int p_shape9[] = {batch_size, 32, 50, 1}; // added
std::vector<int> shape9(p_shape9, p_shape9 + 4);
shapes.push_back(shape9);
std::vector<std::string> tensor_names;
/*
tensor_names.push_back("input_0");
tensor_names.push_back("input_1");
tensor_names.push_back("input_2");
tensor_names.push_back("input_3");
tensor_names.push_back("input_4");
tensor_names.push_back("input_5");
tensor_names.push_back("input_6");
tensor_names.push_back("input_7");
tensor_names.push_back("input_8");
tensor_names.push_back("input_9");
*/
tensor_names.push_back("attr_f");
tensor_names.push_back("realtime_f");
tensor_names.push_back("static_f");
tensor_names.push_back("eta_cost_f");
tensor_names.push_back("lukuang_f");
tensor_names.push_back("length_f");
tensor_names.push_back("path_f");
tensor_names.push_back("speed_f");
tensor_names.push_back("lane_f");
tensor_names.push_back("roadid_f");
std::vector<float> tensor_values;
SparseInstance* ins = req.add_instances();
for (int fi = 0; fi < _data.size(); ++fi) {
SparseTensor* tensor = ins->add_tensors();
tensor->set_name(tensor_names[fi]);
int len = 1;
for (int si = 0; si < shapes[fi].size(); ++si) {
len *= shapes[fi][si];
tensor->add_shape(shapes[fi][si]);
}
tensor_values.clear();
for (int vi = 0; vi < len; ++vi) {
if (std::abs(_data[fi][vi]) > 0.000001f) {
tensor_values.push_back(_data[fi][vi]);
tensor->add_keys(vi);
}
}
tensor->set_features(
&tensor_values[0], tensor_values.size() * sizeof(float));
}
tensor_values.clear();
return 0;
}
private:
std::vector<std::vector<float> > _data;
std::vector<SparseRequest*> _req_list;
bthread_mutex_t _mutex;
long _waitingtm;
long _interval;
timeval _lasttm;
int _current;
};
void print_res(
const SparseRequest* req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
uint32_t feature_size = res.predictions_size();
size_t sample_size = 0;
for (int i = 0; i < feature_size; ++i) {
const ::baidu::paddle_serving::fluid_engine::Prediction& prediction = res.predictions(i);
if (i == 0) {
sample_size = prediction.categories_size();
}
for (int j = 0; j < sample_size; ++j) {
//LOG(TRACE) << "categories:" << prediction.categories(j);
}
}
LOG(INFO)
<< "Succ call predictor[sparse_cnn], res sample size: "
<< sample_size << ", the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
struct Arg {
PredictorApi* api;
InputData* input;
};
void* work(void* p) {
Arg* arg = (Arg*) p;
InputData* input = arg->input;
PredictorApi* api = arg->api;
if (api->thrd_initialize() != 0) {
LOG(FATAL) << "Failed init api in thrd:" << bthread_self();
return NULL;
}
Response res;
LOG(WARNING) << "Thread entry!";
while (true) {
api->thrd_clear();
Predictor* predictor = api->fetch_predictor("sparse_cnn");
if (!predictor) {
LOG(FATAL) << "Failed fetch predictor: sparse_cnn";
continue;
}
SparseRequest* req = input->next_req();
res.Clear();
timeval start;
gettimeofday(&start, NULL);
if (predictor->inference(req, &res) != 0) {
LOG(FATAL) << "failed call predictor with req:"
<< req->ShortDebugString();
api->free_predictor(predictor);
continue;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
}
api->thrd_finalize();
LOG(WARNING) << "Thread exit!";
return NULL;
}
int main(int argc, char** argv) {
if (argc != 5) {
printf("Usage: demo req_buf_size batch_size threads qps\n");
return -1;
}
int req_buffer = atoi(argv[1]);
int batch_size = atoi(argv[2]);
int thread_num = atoi(argv[3]);
int qps = atoi(argv[4]);
PredictorApi api;
if (api.create("./conf", "predictors.conf") != 0) {
LOG(FATAL) << "Failed create predictors api!";
return -1;
}
InputData data;
if (data.create(
//"./data/feature", req_buffer, batch_size, qps) != 0) {
"./data/pure_feature", req_buffer, batch_size, qps) != 0) {
LOG(FATAL) << "Failed create inputdata!";
return -1;
}
Arg arg = {&api, &data};
bthread_t* threads = new bthread_t[thread_num];
if (!threads) {
LOG(FATAL) << "Failed create threads, num:" << thread_num;
return -1;
}
for (int i = 0; i < thread_num; ++i) {
bthread_start_background(threads + i, NULL, work, &arg);
}
for (int i = 0; i < thread_num; ++i) {
bthread_join(threads[i], NULL);
}
delete[] threads;
data.destroy();
return 0;
};
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
此差异已折叠。
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file demo.cpp
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:12:44
* @brief
*
**/
#include "common.h"
#include <fstream>
#include "predictor_sdk.h"
#include "image_class.pb.h"
#include "builtin_format.pb.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::format::XImageReqInstance;
using baidu::paddle_serving::predictor::format::DensePrediction;
using baidu::paddle_serving::predictor::image_classification::Request;
using baidu::paddle_serving::predictor::image_classification::Response;
int create_req(Request& req) {
static const char* TEST_IMAGE_PATH = "./data/images/what.jpg";
FILE* fp = fopen(TEST_IMAGE_PATH, "rb");
if (!fp) {
LOG(FATAL) << "Failed open image: " << TEST_IMAGE_PATH;
return -1;
}
fseek(fp, 0L, SEEK_END);
size_t isize = ftell(fp);
char* ibuf = new(std::nothrow) char[isize];
if (!ibuf) {
LOG(FATAL) << "Failed malloc image buffer";
fclose(fp);
return -1;
}
fseek(fp, 0, SEEK_SET);
fread(ibuf, sizeof(ibuf[0]), isize, fp);
XImageReqInstance* ins = req.add_instances();
if (!ins) {
LOG(FATAL) << "Failed create req instance";
delete[] ibuf;
fclose(fp);
return -1;
}
ins->set_image_binary(ibuf, isize);
ins->set_image_length(isize);
delete[] ibuf;
fclose(fp);
return 0;
}
void print_res(
const Request& req,
const Response& res,
std::string route_tag,
uint64_t elapse_ms) {
static const char* GT_TEXT_PATH
= "./data/images/groundtruth.txt";
std::vector<std::string> gt_labels;
std::ifstream file(GT_TEXT_PATH);
std::string temp_str;
while (std::getline(file, temp_str)) {
gt_labels.push_back(temp_str);
}
DensePrediction json_msg;
uint32_t sample_size = res.predictions_size();
std::string err_string;
for (uint32_t si = 0; si < sample_size; ++si) {
std::string json = res.predictions(si).response_json();
butil::IOBuf buf;
buf.append(json);
butil::IOBufAsZeroCopyInputStream wrapper(buf);
if (!json2pb::JsonToProtoMessage(&wrapper, &json_msg, &err_string)) {
LOG(FATAL) << "Failed parse json from str:" << json;
return ;
}
uint32_t csize = json_msg.categories_size();
if (csize <= 0) {
LOG(FATAL) << "sample-" << si << "has no"
<< "categories props";
continue;
}
float max_prop = json_msg.categories(0);
uint32_t max_idx = 0;
for (uint32_t ci = 1; ci < csize; ++ci) {
if (json_msg.categories(ci) > max_prop) {
max_prop = json_msg.categories(ci);
max_idx = ci;
}
}
LOG(INFO) << "sample-" << si << "'s classify result: "
<< gt_labels[max_idx] << ", prop: " << max_prop;
}
LOG(INFO)
<< "Succ call predictor[ximage], the tag is: "
<< route_tag << ", elapse_ms: " << elapse_ms;
}
int main(int argc, char** argv) {
PredictorApi api;
if (api.create("./conf", "predictors.conf") != 0) {
LOG(FATAL) << "Failed create predictors api!";
return -1;
}
Request req;
Response res;
api.thrd_initialize();
while (true) {
timeval start;
gettimeofday(&start, NULL);
api.thrd_clear();
Predictor* predictor = api.fetch_predictor("ximage");
if (!predictor) {
LOG(FATAL) << "Failed fetch predictor: wasq";
return -1;
}
req.Clear();
res.Clear();
if (create_req(req) != 0) {
return -1;
}
butil::IOBufBuilder debug_os;
if (predictor->debug(&req, &res, &debug_os) != 0) {
LOG(FATAL) << "failed call predictor with req:"
<< req.ShortDebugString();
return -1;
}
butil::IOBuf debug_buf;
debug_os.move_to(debug_buf);
LOG(INFO) << "Debug string: " << debug_buf;
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000)
- (start.tv_sec * 1000 + start.tv_usec / 1000);
print_res(req, res, predictor->tag(), elapse_ms);
res.Clear();
usleep(50);
} // while (true)
api.thrd_finalize();
api.destroy();
return 0;
}
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file abtest.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/06 17:11:38
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_ABTEST_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_ABTEST_H
#include "stub.h"
#include "common.h"
#include "factory.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class Stub;
class Variant;
static const std::string WEIGHT_SEPERATOR = "|";
class EndpointRouterBase {
public:
typedef std::vector<Variant*> VariantList;
virtual ~EndpointRouterBase() {}
virtual int initialize(
const comcfg::ConfigUnit& conf) = 0;
virtual Variant* route(const VariantList&) = 0;
virtual Variant* route(
const VariantList&,
const void*) = 0;
};
class WeightedRandomRender : public EndpointRouterBase {
public:
static int register_self() {
INLINE_REGIST_OBJECT(WeightedRandomRender, EndpointRouterBase, -1);
return 0;
}
WeightedRandomRender() : _normalized_sum(0) {}
~WeightedRandomRender() {}
int initialize(
const comcfg::ConfigUnit& conf);
Variant* route(const VariantList&);
Variant* route(
const VariantList&,
const void*);
private:
std::vector<uint32_t> _variant_weight_list;
uint32_t _normalized_sum;
};
class VariantRouterBase {
public:
typedef std::map<std::string, Stub*> StubMap;
virtual ~VariantRouterBase() {}
virtual int initialize(
const comcfg::ConfigUnit& conf) = 0;
virtual Stub* route(const StubMap&) = 0;
virtual Stub* route(
const StubMap&,
const void*) = 0;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_ABTEST_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file common.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 20:24:19
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_CPP_SDK_COMMON_H
#define BAIDU_PADDLE_SERVING_CPP_SDK_COMMON_H
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>
#include <strings.h>
#include <getopt.h>
#include <exception>
#include <google/protobuf/message.h>
#include <boost/unordered_map.hpp>
#include <gflags/gflags.h>
#include <bvar/bvar.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/object_pool.h>
#include <brpc/channel.h>
#include <brpc/parallel_channel.h>
#include <brpc/traceprintf.h>
#include <bthread/bthread.h>
#include <error.h>
#include <json2pb/json_to_pb.h>
#include "Configure.h"
#include "utils.h"
#endif //BAIDU_PADDLE_SERVING_CPP_SDK_COMMON_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file config_manager.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 15:28:43
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_CONFIG_MANAGER_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_CONFIG_MANAGER_H
#include "common.h"
#include "endpoint_config.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class EndpointConfigManager {
public:
static EndpointConfigManager& instance() {
static EndpointConfigManager singleton;
return singleton;
}
EndpointConfigManager()
: _last_update_timestamp(0),
_current_endpointmap_id(1) {}
int create(const char* path, const char* file);
int load();
bool need_reload() {
return false;
}
int reload() {
if (!need_reload()) {
LOG(INFO) << "Noneed reload endpoin config";
return 0;
}
return load();
}
const std::map<std::string, EndpointInfo>& config() {
return _ep_map;
}
const std::map<std::string, EndpointInfo>& config() const {
return _ep_map;
}
private:
int init_one_variant(
const comcfg::ConfigUnit& conf,
VariantInfo& var);
int init_one_endpoint(
const comcfg::ConfigUnit& conf,
EndpointInfo& ep,
const VariantInfo& default_var);
int merge_variant(
const VariantInfo& default_var,
const comcfg::ConfigUnit& conf,
VariantInfo& merged_var);
int parse_tag_values(
SplitParameters& split);
private:
std::map<std::string, EndpointInfo> _ep_map;
std::string _endpoint_config_path;
std::string _endpoint_config_file;
uint32_t _last_update_timestamp;
uint32_t _current_endpointmap_id;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_CONFIG_MANAGER_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/resource.h
* @author wanlijin(wanlijin01@baidu.com)
* @date 2018/07/06 17:06:25
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_ENDPOINT_H
#define BAIDU_PADDLE_SERVING_SDK_ENDPOINT_H
#include "common.h"
#include "endpoint_config.h"
#include "stub.h"
#include "variant.h"
#include "predictor.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class Endpoint {
friend class EndpointRouterBase;
public:
virtual ~Endpoint() {}
Endpoint() {
_variant_list.clear();
}
int initialize(const EndpointInfo& ep_info);
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
Predictor* get_predictor(const void* params);
Predictor* get_predictor();
int ret_predictor(Predictor* predictor);
const std::string& endpoint_name() const {
return _endpoint_name;
}
private:
int initialize_variant(
const VariantInfo& var_info,
const std::string& service,
const std::string& ep_name,
std::vector<Stub*>& stubs);
private:
std::string _endpoint_name;
std::vector<Variant*> _variant_list;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPPRESOURCE_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file endpoint_config.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/12 15:17:56
* @brief
*
**/
#include "common.h"
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_ENDPOINT_CONFIG_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_ENDPOINT_CONFIG_H
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
#define PARSE_CONF_ITEM(conf, item, name, fail) \
do { \
try { \
item.set(conf[name]); \
} catch (comcfg::NoSuchKeyException& e) { \
LOG(INFO) << "Not found key in configue: " << name;\
} catch (comcfg::ConfigException& e) { \
LOG(FATAL) << "Error config, key: " << name; \
return fail; \
} catch (...) { \
LOG(FATAL) << "Unkown error accurs when load config";\
return fail; \
} \
} while (0)
#define ASSIGN_CONF_ITEM(dest, src, fail) \
do { \
if (!src.init) { \
LOG(FATAL) << "Cannot assign an unintialized item: " \
<< #src << " to dest: " << #dest; \
return fail; \
} \
dest = src.value; \
} while (0)
template<typename T> struct type_traits {
static type_traits<T> tag;
};
template<typename T> struct ConfigItem {
T value;
bool init;
ConfigItem() : init(false) {}
void set(const comcfg::ConfigUnit& unit) {
set_impl(type_traits<T>::tag, unit);
init = true;
}
void set_impl(type_traits<int16_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_int16();
}
void set_impl(type_traits<int32_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_int32();
}
void set_impl(type_traits<int64_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_int64();
}
void set_impl(type_traits<uint16_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_uint16();
}
void set_impl(type_traits<uint32_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_uint32();
}
void set_impl(type_traits<uint64_t>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_uint64();
}
void set_impl(type_traits<float>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_float();
}
void set_impl(type_traits<double>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_double();
}
void set_impl(type_traits<std::string>&,
const comcfg::ConfigUnit& unit) {
value = unit.to_cstr();
}
};
struct Connection {
ConfigItem<int32_t> tmo_conn;
ConfigItem<int32_t> tmo_rpc;
ConfigItem<int32_t> tmo_hedge;
ConfigItem<uint32_t> cnt_retry_conn;
ConfigItem<uint32_t> cnt_retry_hedge;
ConfigItem<uint32_t> cnt_maxconn_per_host;
ConfigItem<std::string> type_conn;
};
struct NamingInfo {
ConfigItem<std::string> cluster_naming;
ConfigItem<std::string> load_balancer;
ConfigItem<std::string> cluster_filter;
};
struct RpcParameters {
ConfigItem<std::string> protocol;
ConfigItem<int32_t> compress_type;
ConfigItem<uint32_t> package_size;
ConfigItem<std::string> route_tag;
ConfigItem<uint32_t> max_channel;
};
struct SplitParameters {
ConfigItem<std::string> split_tag;
ConfigItem<std::string> tag_cands_str;
std::vector<std::string> tag_values;
};
struct VariantInfo {
VariantInfo() : ab_test(NULL) {}
Connection connection;
NamingInfo naminginfo;
RpcParameters parameters;
SplitParameters splitinfo;
void* ab_test;
};
struct EndpointInfo {
EndpointInfo() : ab_test(NULL) {}
std::string endpoint_name;
std::string stub_service;
std::vector<VariantInfo> vars;
void* ab_test;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_ENDPOINT_CONFIG_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/factory.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/10 22:09:57
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_FACTORY_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_FACTORY_H
#include "common.h"
#include "stub_impl.h"
#include "glog/raw_logging.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
#define INLINE_REGIST_OBJECT(D, B, E) \
do { \
Factory<D, B>* factory = \
new (std::nothrow) Factory<D, B>(); \
if (factory == NULL \
|| FactoryPool<B>::instance().register_factory(\
#D, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s in macro!", #D, #B); \
return E; \
} \
} while (0)
#define DECLARE_FACTORY_OBJECT(D, B) \
static int regist(const std::string& tag) { \
Factory<D, B>* factory = \
new (std::nothrow) Factory<D, B>();\
if (factory == NULL \
|| FactoryPool<B>::instance().register_factory(\
tag, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s in macro!", #D);\
return -1; \
} \
return 0; \
}
#define PDS_STR_CAT(a, b) PDS_STR_CAT_I(a, b)
#define PDS_STR_CAT_I(a, b) a ## b
#define DEFINE_FACTORY_OBJECT(D) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
D::regist(#D); \
}
#define REGIST_FACTORY_OBJECT_IMPL(D, B) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
::baidu::paddle_serving::sdk_cpp::Factory<D, B>* factory =\
new (::std::nothrow) ::baidu::paddle_serving::sdk_cpp::Factory<D, B>();\
if (factory == NULL \
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\
#D, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s in macro!", #D, #B); \
return ; \
} \
return ; \
}
#define REGIST_FACTORY_OBJECT_IMPL_WITH_TAG(D, B, T)\
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
::baidu::paddle_serving::sdk_cpp::Factory<D, B>* factory =\
new (::std::nothrow) ::baidu::paddle_serving::sdk_cpp::Factory<D, B>();\
if (factory == NULL \
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool<B>::instance().register_factory(\
T, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->%s, tag %s in macro!", #D, #B, T); \
return ; \
} \
return ; \
}
#define REGIST_ABTEST_OBJECT(D) \
REGIST_FACTORY_OBJECT_IMPL( \
D, \
::baidu::paddle_serving::sdk_cpp::ABTestRouterBase)
#define REGIST_ABTEST_OBJECT_WITH_TAG(D, T) \
REGIST_FACTORY_OBJECT_IMPL_WITH_TAG( \
D, \
::baidu::paddle_serving::sdk_cpp::ABTestRouterBase,\
T)
#define REGIST_STUB_OBJECT_WITH_TAG(D, C, R, I, O, T) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
::baidu::paddle_serving::sdk_cpp::Factory< \
::baidu::paddle_serving::sdk_cpp::StubImpl<D, C, R, I, O>,\
::baidu::paddle_serving::sdk_cpp::Stub>* factory = \
new (::std::nothrow) ::baidu::paddle_serving::sdk_cpp::Factory< \
::baidu::paddle_serving::sdk_cpp::StubImpl<D, C, R, I, O>,\
::baidu::paddle_serving::sdk_cpp::Stub>(); \
if (factory == NULL \
|| ::baidu::paddle_serving::sdk_cpp::FactoryPool< \
::baidu::paddle_serving::sdk_cpp::Stub>::instance().register_factory(\
T, factory) != 0) { \
RAW_LOG_FATAL("Failed regist factory: %s->Stub, tag: %s in macro!", #D, T); \
return ; \
} \
return ; \
}
class Stub;
class EndpointRouterBase;
class VariantRouterBase;
template<typename B>
class FactoryBase {
public:
virtual B* gen() = 0;
virtual void del(B* obj) = 0;
};
template<typename D, typename B>
class Factory : public FactoryBase<B> {
public:
B* gen() {
return new(std::nothrow) D();
}
void del(B* obj) {
delete dynamic_cast<D*>(obj);
}
};
template<typename B>
class FactoryPool {
public:
static FactoryPool<B>& instance() {
static FactoryPool<B> singleton;
return singleton;
}
int register_factory(const std::string& tag,
FactoryBase<B>* factory) {
typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag);
if (it != _pool.end()) {
RAW_LOG_FATAL("Insert duplicate with tag: %s", tag.c_str());
return -1;
}
std::pair<
typename std::map<std::string, FactoryBase<B>*>::iterator,
bool> r = _pool.insert(std::make_pair(tag, factory));
if (!r.second) {
RAW_LOG_FATAL("Failed insert new factory with: %s", tag.c_str());
return -1;
}
RAW_LOG_INFO("Succ insert one factory, tag: %s, base type %s", tag.c_str(), typeid(B).name());
return 0;
}
B* generate_object(const std::string& tag) {
typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag);
if (it == _pool.end() || it->second == NULL) {
RAW_LOG_FATAL("Not found factory pool, tag: %s, pool size: %u", tag.c_str(), _pool.size());
return NULL;
}
return it->second->gen();
}
template<typename D>
void return_object(B* object) {
Factory<D, B> factory;
factory->del(object);
}
private:
std::map<std::string, FactoryBase<B>*> _pool;
};
typedef FactoryPool<Stub> StubFactory;
typedef FactoryPool<brpc::CallMapper> CallMapperFactory;
typedef FactoryPool<brpc::ResponseMerger> ResponseMergerFactory;
typedef FactoryPool<EndpointRouterBase> EndpointRouterFactory;
typedef FactoryPool<VariantRouterBase> VariantRouterFactory;
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_FACTORY_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/predictor.h
* @author wanlijin01(com@baidu.com)
* @date 2018/07/05 16:53:43
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_H
#include "stub.h"
#include "common.h"
#include "endpoint_config.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
#define GET_OBJECT_FROM_POOL(param, T, err) \
do { \
param = butil::get_object<T>(); \
if (!param) { \
LOG(FATAL) << "Failed get object from pool" \
<< ", arg:" << #param << "type: " \
<< #T; \
return err; \
} \
} while (0)
static const brpc::CompressType compress_types[] = {
brpc::COMPRESS_TYPE_NONE,
brpc::COMPRESS_TYPE_SNAPPY,
brpc::COMPRESS_TYPE_GZIP,
brpc::COMPRESS_TYPE_ZLIB,
brpc::COMPRESS_TYPE_LZ4};
typedef void (*DoneType) (google::protobuf::Message* res,
brpc::Controller* controller);
template<typename Arg1, typename Arg2>
class FunctionClosure : public ::google::protobuf::Closure {
public:
typedef void (*FunctionType)(Arg1* arg1, Arg2* arg2);
FunctionClosure() {}
~FunctionClosure() {}
int init(FunctionType function, bool self_deleting,
bool arg1_deleting, bool arg2_deleting,
Arg1* arg1 = NULL, Arg2* arg2 = NULL);
void Run();
private:
FunctionType _function;
Arg1* _arg1;
Arg2* _arg2;
bool _self_deleting;
bool _arg1_deleting;
bool _arg2_deleting;
};
class InterfaceAdaptor {
public:
typedef google::protobuf::Message RequestMessage;
typedef google::protobuf::Message ResponseMessage;
virtual int partition(RequestMessage& request, std::vector<RequestMessage*>& out) = 0;
virtual int merge(std::vector<ResponseMessage*>& response, ResponseMessage& out) = 0;
};
class EchoAdaptor : public InterfaceAdaptor {
public:
typedef google::protobuf::Message RequestMessage;
typedef google::protobuf::Message ResponseMessage;
int partition(RequestMessage& request, std::vector<RequestMessage*>& out) {
return 0;
}
int merge(std::vector<ResponseMessage*>& response, ResponseMessage*& out) {
return 0;
}
};
class Predictor {
public:
// synchronize interface
virtual int inference(
google::protobuf::Message* req,
google::protobuf::Message* res) = 0;
// asynchronize interface
virtual int inference(
google::protobuf::Message* req,
google::protobuf::Message* res,
DoneType done,
brpc::CallId* cid = NULL) = 0;
// synchronize interface
virtual int debug(
google::protobuf::Message* req,
google::protobuf::Message* res,
butil::IOBufBuilder* debug_os) = 0;
// un-blocked interface
virtual int send_inference(
google::protobuf::Message* req,
google::protobuf::Message* res) = 0;
virtual int recv_inference() = 0;
virtual void cancel_inference() = 0;
virtual const char* tag() = 0;
virtual const google::protobuf::Service* service() = 0;
virtual const brpc::Controller* controller() = 0;
virtual const google::protobuf::RpcChannel* channel() = 0;
virtual const Stub* stub() = 0;
virtual bool is_inited() = 0;
};
template<typename T>
class PredictorImpl : public Predictor {
public:
typedef google::protobuf::MethodDescriptor MethodDescriptor;
PredictorImpl() : _service(NULL), _stub(NULL), _infer(NULL),
_debug(NULL), _channel(NULL), _inited(false) {
// _inferid = 0;
}
~PredictorImpl() {}
int init(
google::protobuf::RpcChannel* chnl,
T* service,
const MethodDescriptor* infer,
const MethodDescriptor* debug,
const RpcParameters& options,
Stub* stub,
const std::string& tag);
int reset(
const RpcParameters& options,
brpc::Controller& cntl);
int deinit();
bool is_inited() {
return _inited;
}
// 同步接口
int inference(
google::protobuf::Message* req,
google::protobuf::Message* res);
// 异步接口
int inference(
google::protobuf::Message* req,
google::protobuf::Message* res,
DoneType done,
brpc::CallId* cid = NULL);
// Debug同步接口
int debug(
google::protobuf::Message* req,
google::protobuf::Message* res,
butil::IOBufBuilder* debug_os);
// 半同步(非阻塞)接口
int send_inference(
google::protobuf::Message* req,
google::protobuf::Message* res);
// 半同步(非阻塞)接口
int recv_inference();
// 半同步(非阻塞)接口
void cancel_inference();
const char* tag();
const google::protobuf::Service* service() {
return _service;
}
const brpc::Controller* controller() {
return &_cntl;
}
const google::protobuf::RpcChannel* channel() {
return _channel;
}
const Stub* stub() {
return _stub;
}
private:
T* _service;
Stub* _stub;
const MethodDescriptor* _infer;
const MethodDescriptor* _debug;
google::protobuf::RpcChannel* _channel;
brpc::Controller _cntl;
brpc::CallId _inferid;
RpcParameters _options;
std::string _tag;
bool _inited;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#include "predictor.hpp"
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_HPP
#define BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_HPP
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class MetricScope;
class Stub;
template<typename T, typename C, typename R, typename I, typename O>
class StubImpl;
template<typename Arg1, typename Arg2>
inline ::google::protobuf::Closure* NewClosure(
void (*function)(Arg1*, Arg2*),
Arg1* arg1 = NULL, Arg2* arg2 = NULL) {
FunctionClosure<Arg1, Arg2>* closure = butil::get_object<
FunctionClosure<Arg1, Arg2> >();
if (closure) {
if (closure->init(function, true, false, true,
arg1, arg2) != 0) {
LOG(FATAL) << "Failed create closure objects";
return NULL;
}
}
return closure;
}
template<typename Arg1, typename Arg2>
int FunctionClosure<Arg1, Arg2>::init(
FunctionType function, bool self_deleting,
bool arg1_deleting, bool arg2_deleting,
Arg1* arg1, Arg2* arg2) {
_function = function;
_self_deleting = self_deleting;
_arg1_deleting = arg1_deleting;
_arg2_deleting = arg2_deleting;
if (arg2 == NULL) {
GET_OBJECT_FROM_POOL(_arg2, Arg2, -1);
_arg2_deleting = true;
}
return 0;
}
template<typename Arg1, typename Arg2>
void FunctionClosure<Arg1, Arg2>::Run() {
bool self_delete = _self_deleting;
bool arg1_delete = _arg1_deleting;
bool arg2_delete = _arg2_deleting;
_function(_arg1, _arg2);
if (self_delete) {
butil::return_object(this);
}
if (arg2_delete) {
butil::return_object(_arg2);
}
}
template<typename T> int PredictorImpl<T>::init(
google::protobuf::RpcChannel* chnl,
T* service,
const MethodDescriptor* infer,
const MethodDescriptor* debug,
const RpcParameters& options,
Stub* stub,
const std::string& tag) {
MetricScope metric(stub, "rpc_init");
butil::Timer tt(butil::Timer::STARTED);
_service = service;
_channel = chnl;
_infer = infer;
_debug = debug;
_options = options;
_stub = stub;
_tag = tag;
reset(_options, _cntl);
_inited = true;
return 0;
}
template<typename T> int PredictorImpl<T>::reset(
const RpcParameters& options,
brpc::Controller& cntl) {
cntl.Reset();
if (options.compress_type.init) {
cntl.set_request_compress_type(
compress_types[options.compress_type.value]);
}
return 0;
}
template<typename T> int PredictorImpl<T>::deinit() {
// do nothing
_inited = false;
return 0;
}
template<typename T> int PredictorImpl<T>::inference(
google::protobuf::Message* req,
google::protobuf::Message* res) {
MetricScope metric(_stub, "infer_sync");
_service->CallMethod(_infer, &_cntl, req, res, NULL);
if (_cntl.Failed()) {
LOG(WARNING)
<< "inference call failed, message: "
<< _cntl.ErrorText();
_stub->update_average(1, "failure");
return -1;
}
return 0;
}
template<typename T> int PredictorImpl<T>::inference(
google::protobuf::Message* req,
google::protobuf::Message* res,
DoneType done,
brpc::CallId* cid) {
MetricScope metric(_stub, "infer_async");
// 异步接口不能使用当前predictor的controller成员,而应该
// 在对象池临时申请一个独立的对象,且直到异步回调执行完
// 成后才能释放,而该释放行为被NewClosure自动托管,用户
// 无需关注。
brpc::Controller* cntl
= butil::get_object<brpc::Controller>();
if (!cntl || reset(_options, *cntl) != 0) {
LOG(FATAL) << "Failed get controller from object pool,"
<< "cntl is null: " << (cntl == NULL);
_stub->update_average(1, "failure");
return -1;
}
if (cid != NULL) { // you can join this rpc with cid
*cid = cntl->call_id();
}
_service->CallMethod(_infer, cntl, req, res, NewClosure(
done, res, cntl));
return 0;
}
template<typename T> int PredictorImpl<T>::debug(
google::protobuf::Message* req,
google::protobuf::Message* res,
butil::IOBufBuilder* debug_os) {
MetricScope metric(_stub, "debug");
_service->CallMethod(_debug, &_cntl, req, res, NULL);
if (_cntl.Failed()) {
LOG(WARNING)
<< "inference call failed, message: "
<< _cntl.ErrorText();
_stub->update_average(1, "failure");
return -1;
}
// copy debug info from response attachment
(*debug_os) << _cntl.response_attachment();
return 0;
}
template<typename T> int PredictorImpl<T>::send_inference(
google::protobuf::Message* req,
google::protobuf::Message* res) {
MetricScope metric(_stub, "infer_send");
_inferid = _cntl.call_id();
_service->CallMethod(
_infer, &_cntl, req, res, brpc::DoNothing());
return 0;
}
template<typename T> int PredictorImpl<T>::recv_inference() {
// waiting for callback done
MetricScope metric(_stub, "infer_recv");
brpc::Join(_inferid);
if (_cntl.Failed()) {
LOG(WARNING) << "Failed recv response from rpc"
<< ", err: " << _cntl.ErrorText();
_stub->update_average(1, "failure");
return -1;
}
return 0;
}
template<typename T> void PredictorImpl<T>::cancel_inference() {
MetricScope metric(_stub, "infer_cancel");
brpc::StartCancel(_inferid);
}
template<typename T> const char* PredictorImpl<T>::tag() {
return _tag.c_str();
}
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTORR_HPP
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/predictor_api.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/09 17:33:59
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_SDK_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_SDK_H
#include "stub.h"
#include "predictor.h"
#include "endpoint_config.h"
#include "endpoint.h"
#include "config_manager.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class PredictorApi {
public:
PredictorApi() {}
int register_all();
int create(const char* path, const char* file);
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
void destroy();
static PredictorApi& instance() {
static PredictorApi api;
return api;
}
Predictor* fetch_predictor(std::string ep_name) {
std::map<std::string, Endpoint*>::iterator it
= _endpoints.find(ep_name);
if (it == _endpoints.end() || !it->second) {
LOG(FATAL) << "Failed fetch predictor:"
<< ", ep_name: " << ep_name;
return NULL;
}
return it->second->get_predictor();
}
Predictor* fetch_predictor(std::string ep_name,
const void* params) {
std::map<std::string, Endpoint*>::iterator it
= _endpoints.find(ep_name);
if (it == _endpoints.end() || !it->second) {
LOG(FATAL) << "Failed fetch predictor:"
<< ", ep_name: " << ep_name;
return NULL;
}
return it->second->get_predictor(params);
}
int free_predictor(Predictor* predictor) {
const Stub* stub = predictor->stub();
if (!stub || stub->return_predictor(predictor) != 0) {
LOG(FATAL) << "Failed return predictor via stub";
return -1;
}
return 0;
}
private:
EndpointConfigManager _config_manager;
std::map<std::string, Endpoint*> _endpoints;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_PREDICTOR_SDK_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/stub.h
* @author wanlijin(wanlijin01@baidu.com)
* @date 2018/12/04 16:42:29
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_STUB_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_STUB_H
#include "common.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class Predictor;
struct VariantInfo;
class Stub {
public:
typedef google::protobuf::Message Message;
virtual ~Stub() {}
virtual int initialize(const VariantInfo& var, const std::string& ep,
const std::string* tag, const std::string* tag_value) = 0;
// predictor
virtual Predictor* fetch_predictor() = 0;
virtual int return_predictor(Predictor* predictor) = 0;
virtual int return_predictor(Predictor* predictor) const = 0;
// request
virtual Message* fetch_request() = 0;
virtual int return_request(Message* request) = 0;
virtual int return_request(Message* request) const = 0;
// response
virtual Message* fetch_response() = 0;
virtual int return_response(Message* response) = 0;
virtual int return_response(Message* response) const = 0;
virtual const std::string& which_endpoint() const = 0;
// control logic for tls
virtual int thrd_initialize() = 0;
virtual int thrd_clear() = 0;
virtual int thrd_finalize() = 0;
virtual void update_average(int64_t acc, const char* name) = 0;
virtual void update_latency(int64_t acc, const char* name) = 0;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_STUB_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/stub.h
* @author wanlijin(wanlijin01@baidu.com)
* @date 2018/07/04 16:42:29
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_H
#include "common.h"
#include "predictor.h"
#include "stub.h"
#include "endpoint_config.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
static const std::string AVG_PREFIX = "avg_";
static const std::string LTC_PREFIX = "ltc_";
class Predictor;
template<typename T>
class PredictorImpl;
static const char* INFERENCE_METHOD_NAME = "inference";
static const char* DEBUG_METHOD_NAME = "debug";
class MetricScope {
public:
MetricScope(Stub* stub, const char* routine) :
_stub(stub), _tt(butil::Timer::STARTED), _routine(routine) {
TRACEPRINTF("enter %s", routine);
}
~MetricScope() {
TRACEPRINTF("exit %s", _routine.c_str());
_tt.stop();
_stub->update_latency(_tt.u_elapsed(), _routine.c_str());
}
private:
Stub* _stub;
butil::Timer _tt;
std::string _routine;
};
class TracePackScope {
public:
TracePackScope(const char* routine) :
_routine(routine), _index(-1) {
TRACEPRINTF("start pack: %s", routine);
}
TracePackScope(const char* routine, int index) :
_routine(routine), _index(index) {
TRACEPRINTF("start pack: %s, index: %d", routine, index);
}
~TracePackScope() {
if (_index >= 0) {
TRACEPRINTF("finish pack: %s, index: %d", _routine.c_str(), _index);
} else {
TRACEPRINTF("finish pack: %s", _routine.c_str());
}
}
private:
std::string _routine;
int _index;
};
class TagFilter : public brpc::NamingServiceFilter {
public:
class TagHelper {
public:
TagHelper(const std::string& kv_str) {
if (kv_str.compare("") == 0) {
return;
}
static const char TAG_DELIM = ',';
static const char KV_DELIM = ':';
std::string::size_type start_pos = 0;
std::string::size_type end_pos;
do {
end_pos = kv_str.find(TAG_DELIM, start_pos);
std::string kv_pair_str;
if (end_pos == std::string::npos) {
kv_pair_str = kv_str.substr(start_pos);
} else {
kv_pair_str = kv_str.substr(start_pos, end_pos - start_pos);
start_pos = end_pos + 1;
}
std::string::size_type kv_delim_pos = kv_pair_str.find(KV_DELIM, 0);
if (kv_delim_pos == std::string::npos) {
LOG(FATAL) << "invalid kv pair: " << kv_pair_str.c_str();
continue;
}
std::string key = kv_pair_str.substr(0, kv_delim_pos);
std::string value = kv_pair_str.substr(kv_delim_pos + 1);
_kv_map.insert(std::pair<std::string, std::string>(key, value));
} while (end_pos != std::string::npos);
}
bool container(const std::string& k, const std::string& v) const {
std::map<std::string, std::string>::const_iterator found
= _kv_map.find(k);
if (found == _kv_map.end()) {
// key not found
return false;
}
if (v.compare(found->second) != 0) {
// value not equals
return false;
}
return true;
}
private:
std::map<std::string, std::string> _kv_map;
};
TagFilter(const std::string& key, const std::string& val) {
_key = key;
_value = val;
}
bool Accept(const brpc::ServerNode& server) const {
TagHelper helper(server.tag);
return helper.container(_key, _value);
}
private:
std::string _key;
std::string _value;
};
class BvarWrapper {
public:
virtual void update_latency(int64_t acc) = 0;
virtual void update_average(int64_t acc) = 0;
};
class LatencyWrapper : public BvarWrapper {
public:
LatencyWrapper(const std::string& name) :
_ltc(name + "_ltc") {
}
void update_latency(int64_t acc) {
_ltc << acc;
}
void update_average(int64_t acc) {
LOG(FATAL) << "Cannot update average to a LatencyRecorder";
}
private:
bvar::LatencyRecorder _ltc;
};
class AverageWrapper : public BvarWrapper {
public:
AverageWrapper(const std::string& name) :
_win(name + "_avg", &_avg, ::bvar::FLAGS_bvar_dump_interval) {
}
void update_latency(int64_t acc) {
LOG(FATAL) << "Cannot update latency to a AverageWrapper";
}
void update_average(int64_t acc) {
_avg << acc;
}
private:
bvar::IntRecorder _avg;
bvar::Window<bvar::IntRecorder> _win;
};
struct StubTLS {
StubTLS() {
predictor_pools.clear();
request_pools.clear();
response_pools.clear();
}
std::vector<Predictor*> predictor_pools;
std::vector<google::protobuf::Message*> request_pools;
std::vector<google::protobuf::Message*> response_pools;
};
template<typename T, typename C, typename R, typename I, typename O>
class StubImpl : public Stub {
public:
typedef google::protobuf::Message Message;
StubImpl()
: _channel(NULL), _pchannel(NULL), _gchannel(NULL),
_service_stub(NULL), _infer(NULL), _debug(NULL) {}
~StubImpl() {}
int initialize(const VariantInfo& var, const std::string& ep,
const std::string* tag, const std::string* tag_value);
Predictor* fetch_predictor();
int return_predictor(Predictor* predictor);
int return_predictor(Predictor* predictor) const;
Message* fetch_request();
int return_request(Message* request);
int return_request(Message* request) const;
Message* fetch_response();
int return_response(Message* response);
int return_response(Message* response) const;
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
const std::string& which_endpoint() const {
return _endpoint;
}
private:
google::protobuf::RpcChannel* init_channel(
const VariantInfo& var,
brpc::NamingServiceFilter* filter = NULL);
brpc::ParallelChannel* init_pchannel(
brpc::Channel* sub_channel, uint32_t channel_count,
uint32_t package_size, const brpc::ChannelOptions& options);
StubTLS* get_tls() {
return static_cast<StubTLS*>(bthread_getspecific(_bthread_key));
}
private:
brpc::Channel* _channel;
brpc::ParallelChannel* _pchannel;
google::protobuf::RpcChannel* _gchannel;
T* _service_stub;
const google::protobuf::MethodDescriptor* _infer;
const google::protobuf::MethodDescriptor* _debug;
std::string _endpoint;
RpcParameters _options;
std::string _tag;
uint32_t _max_channel;
uint32_t _package_size;
// tls handlers
bthread_key_t _bthread_key;
// bvar variables
std::map<std::string, BvarWrapper*> _ltc_bvars;
std::map<std::string, BvarWrapper*> _avg_bvars;
mutable butil::Mutex _bvar_mutex;
#ifndef DECLARE_LATENCY
#define DECLARE_LATENCY(item) \
LatencyWrapper* _ltc_##item;
#endif
DECLARE_LATENCY(infer_sync); // 同步请求
DECLARE_LATENCY(infer_async); // 异步请求
DECLARE_LATENCY(infer_send); // 半同步send
DECLARE_LATENCY(infer_recv); // 半同步recv
DECLARE_LATENCY(infer_cancel);// 半同步cancel
DECLARE_LATENCY(debug); // 调试请求
DECLARE_LATENCY(rpc_init); // rpc reset
DECLARE_LATENCY(thrd_clear); // thrd clear
DECLARE_LATENCY(pack_map); // thrd clear
DECLARE_LATENCY(pack_merge); // thrd clear
#undef DECLARE_LATENCY
#ifndef DECLARE_AVERAGE
#define DECLARE_AVERAGE(item) \
AverageWrapper* _avg_##item;
#endif
DECLARE_AVERAGE(failure); // 失败请求数
DECLARE_AVERAGE(item_size); // 单次请求item数
DECLARE_AVERAGE(pack); // 单次请求分包数
DECLARE_AVERAGE(pack_fail); // 单次请求分包失败数
#undef DECLARE_AVERAGE
public:
void update_average(int64_t acc, const char* name) {
std::map<std::string, BvarWrapper*>::iterator iter =
_avg_bvars.find(AVG_PREFIX + name);
if (iter == _avg_bvars.end()) {
LOG(FATAL) << "Not found average record:avg_" << name;
return ;
}
iter->second->update_average(acc);
}
void update_latency(int64_t acc, const char* name) {
std::map<std::string, BvarWrapper*>::iterator iter =
_ltc_bvars.find(LTC_PREFIX + name);
if (iter == _ltc_bvars.end()) {
LOG(FATAL) << "Not found latency record:ltc_" << name;
return ;
}
iter->second->update_latency(acc);
}
};
} // sdk_cpp
} // paddle_serving
} // baidu
#include "stub_impl.hpp"
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_HPP
#define BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_HPP
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::initialize(
const VariantInfo& var, const std::string& ep,
const std::string* tag, const std::string* tag_value) {
if (tag != NULL && tag_value != NULL) {
TagFilter* filter = new (std::nothrow) TagFilter(
*tag, *tag_value);
if (!filter) {
LOG(FATAL) << "Failed create tag filter, key: " << tag
<< ", value: " << tag_value;
return -1;
}
_gchannel = init_channel(var, filter);
LOG(INFO)
<< "Create stub with tag: " << *tag
<< ", " << *tag_value << ", ep: " << ep;
} else {
_gchannel = init_channel(var, NULL);
LOG(INFO) << "Create stub without tag, ep " << ep;
}
if (!_gchannel) {
LOG(FATAL) << "Failed init channel via var_info";
return -1;
}
_service_stub = new (std::nothrow) T(_gchannel);
if (!_service_stub) {
LOG(FATAL) << "Failed create stub with channel";
return -1;
}
_infer = _service_stub->GetDescriptor()->FindMethodByName(
INFERENCE_METHOD_NAME);
if (!_infer) {
LOG(FATAL) << "Failed get inference method, "
<< "method name: " << INFERENCE_METHOD_NAME;
return -1;
}
_debug = _service_stub->GetDescriptor()->FindMethodByName(
DEBUG_METHOD_NAME);
if (!_debug) {
LOG(FATAL) << "Failed get debug method, "
<< "method name: " << DEBUG_METHOD_NAME;
return -1;
}
_endpoint = ep;
if (bthread_key_create(&_bthread_key, NULL) != 0) {
LOG(FATAL) << "Failed create key for stub tls";
return -1;
}
const std::string& name
= _endpoint + "_" + _service_stub->GetDescriptor()->full_name() + "_" + _tag;
_ltc_bvars.clear();
_avg_bvars.clear();
BAIDU_SCOPED_LOCK(_bvar_mutex);
#ifndef DEFINE_LATENCY
#define DEFINE_LATENCY(item) \
do { \
_ltc_##item = new (std::nothrow) LatencyWrapper(name + "_"#item);\
if (!_ltc_##item) { \
LOG(FATAL) << "Failed create latency recorder:" \
<< name + "_"#item; \
return -1; \
} \
_ltc_bvars["ltc_"#item] = _ltc_##item; \
} while(0)
#endif
DEFINE_LATENCY(infer_sync);
DEFINE_LATENCY(infer_async);
DEFINE_LATENCY(infer_send);
DEFINE_LATENCY(infer_recv);
DEFINE_LATENCY(infer_cancel);
DEFINE_LATENCY(debug);
DEFINE_LATENCY(rpc_init);
DEFINE_LATENCY(thrd_clear);
DEFINE_LATENCY(pack_map);
DEFINE_LATENCY(pack_merge);
#undef DEFINE_LATENCY
#ifndef DEFINE_AVERAGE
#define DEFINE_AVERAGE(item) \
do { \
_avg_##item = new(std::nothrow) AverageWrapper(name + "_"#item);\
if (!_avg_##item) { \
LOG(FATAL) << "Failed create average recorder:" \
<< name + "_"#item; \
return -1; \
} \
_avg_bvars["avg_"#item] = _avg_##item; \
} while(0)
#endif
DEFINE_AVERAGE(failure);
DEFINE_AVERAGE(pack);
DEFINE_AVERAGE(item_size);
DEFINE_AVERAGE(pack_fail);
#undef DEFINE_AVERAGE
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::thrd_initialize() {
if (bthread_getspecific(_bthread_key) != NULL) {
LOG(WARNING) << "Already thread initialized for stub";
return 0;
}
StubTLS* tls = new (std::nothrow) StubTLS();
if (!tls || bthread_setspecific(_bthread_key, tls) != 0) {
LOG(FATAL) << "Failed binding tls data to bthread_key";
return -1;
}
LOG(WARNING) << "Succ thread initialize stub impl!";
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::thrd_clear() {
MetricScope metric(this, "thrd_clear");
StubTLS* tls = get_tls();
if (!tls) {
LOG(FATAL) << "Failed get tls stub object";
return -1;
}
// clear predictor
size_t ps = tls->predictor_pools.size();
for (size_t pi = 0; pi < ps; ++pi) {
Predictor* p = tls->predictor_pools[pi];
if (p && p->is_inited() && return_predictor(p) != 0) {
LOG(FATAL) << "Failed return predictor: " << pi;
return -1;
}
}
tls->predictor_pools.clear();
// clear request
size_t is = tls->request_pools.size();
for (size_t ii = 0; ii < is; ++ii) {
if (return_request(tls->request_pools[ii])!= 0) {
LOG(FATAL) << "Failed return request: " << ii;
return -1;
}
}
tls->request_pools.clear();
// clear response
size_t os = tls->response_pools.size();
for (size_t oi = 0; oi < os; ++oi) {
if (return_response(tls->response_pools[oi])!= 0) {
LOG(FATAL) << "Failed return response: " << oi;
return -1;
}
}
tls->response_pools.clear();
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::thrd_finalize() {
StubTLS* tls = get_tls();
if (!tls || thrd_clear() != 0) {
LOG(FATAL) << "Failed clreate tls in thrd finalize";
return -1;
}
delete tls;
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
Predictor* StubImpl<T, C, R, I, O>::fetch_predictor() {
StubTLS* tls = get_tls();
if (!tls) {
LOG(FATAL) << "Failed get tls data when fetching predictor";
return NULL;
}
PredictorImpl<T>* predictor = butil::get_object<PredictorImpl<T> >();
if (!predictor) {
LOG(FATAL) << "Failed fetch predictor";
return NULL;
}
if (predictor->init(_gchannel, _service_stub, _infer, _debug, _options,
this, _tag) != 0) {
LOG(FATAL) << "Failed init fetched predictor";
return NULL;
}
tls->predictor_pools.push_back(predictor);
return predictor;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_predictor(Predictor* predictor) {
if ((dynamic_cast<PredictorImpl<T>*>(predictor))->deinit() != 0) {
LOG(FATAL) << "Failed deinit fetched predictor";
return -1;
}
butil::return_object(dynamic_cast<PredictorImpl<T>*>(predictor));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_predictor(Predictor* predictor) const {
if ((dynamic_cast<PredictorImpl<T>*>(predictor))->deinit() != 0) {
LOG(FATAL) << "Failed deinit fetched predictor";
return -1;
}
butil::return_object(dynamic_cast<PredictorImpl<T>*>(predictor));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
google::protobuf::Message* StubImpl<T, C, R, I, O>::fetch_request() {
StubTLS* tls = get_tls();
if (!tls) {
LOG(FATAL) << "Failed get tls data when fetching request";
return NULL;
}
I* req = butil::get_object<I>();
if (!req) {
LOG(FATAL) << "Failed get tls request item, type: " << typeid(I).name();
return NULL;
}
req->Clear();
tls->request_pools.push_back(req);
return req;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_request(
google::protobuf::Message* request) const {
request->Clear();
butil::return_object(dynamic_cast<I*>(request));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_request(
google::protobuf::Message* request) {
request->Clear();
butil::return_object(dynamic_cast<I*>(request));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
google::protobuf::Message* StubImpl<T, C, R, I, O>::fetch_response() {
StubTLS* tls = get_tls();
if (!tls) {
LOG(FATAL) << "Failed get tls data when fetching response";
return NULL;
}
O* res = butil::get_object<O>();
if (!res) {
LOG(FATAL) << "Failed get tls response item, type: " << typeid(O).name();
return NULL;
}
res->Clear();
tls->response_pools.push_back(res);
return res;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_response(
google::protobuf::Message* response) const {
response->Clear();
butil::return_object(dynamic_cast<O*>(response));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::return_response(
google::protobuf::Message* response) {
response->Clear();
butil::return_object(dynamic_cast<O*>(response));
return 0;
}
template<typename T, typename C, typename R, typename I, typename O>
google::protobuf::RpcChannel* StubImpl<T, C, R, I, O>::init_channel(
const VariantInfo& var, brpc::NamingServiceFilter* filter) {
brpc::ChannelOptions chn_options;
chn_options.ns_filter = filter;
// parameters
ASSIGN_CONF_ITEM(chn_options.protocol, var.parameters.protocol, NULL);
ASSIGN_CONF_ITEM(_tag, var.parameters.route_tag, NULL);
ASSIGN_CONF_ITEM(_max_channel, var.parameters.max_channel, NULL);
ASSIGN_CONF_ITEM(_package_size, var.parameters.package_size, NULL);
if (_max_channel < 1) {
LOG(ERROR) << "Invalid MaxChannelPerRequest: " << _max_channel;
return NULL;
}
// connection
ASSIGN_CONF_ITEM(chn_options.max_retry, var.connection.cnt_retry_conn, NULL);
ASSIGN_CONF_ITEM(chn_options.connect_timeout_ms, var.connection.tmo_conn, NULL);
ASSIGN_CONF_ITEM(chn_options.timeout_ms, var.connection.tmo_rpc, NULL);
ASSIGN_CONF_ITEM(chn_options.backup_request_ms, var.connection.tmo_hedge, NULL);
// connection type
std::string conn_type_str;
ASSIGN_CONF_ITEM(conn_type_str, var.connection.type_conn, NULL);
chn_options.connection_type
= brpc::StringToConnectionType(conn_type_str);
// naminginfo
std::string cluster_naming_info;
std::string cluster_loadbalancer;
ASSIGN_CONF_ITEM(cluster_naming_info, var.naminginfo.cluster_naming, NULL);
ASSIGN_CONF_ITEM(cluster_loadbalancer, var.naminginfo.load_balancer, NULL);
// brpc single channel
_channel = butil::get_object<brpc::Channel>();
if (!_channel) {
LOG(FATAL) << "Failed get channel object from butil::pool";
return NULL;
}
if (_channel->Init(
cluster_naming_info.c_str(),
cluster_loadbalancer.c_str(),
&chn_options) != 0) {
LOG(ERROR)
<< "Failed to initialize channel, path: "
<< cluster_naming_info;
return NULL;
}
// brpc parallel channel
_pchannel = init_pchannel(
_channel, _max_channel, _package_size, chn_options);
if (_pchannel) {
LOG(INFO) << "Succ create parallel channel, count: "
<< _max_channel;
return _pchannel;
}
return _channel;
}
template<typename T, typename C, typename R, typename I, typename O>
brpc::ParallelChannel* StubImpl<T, C, R, I, O>::init_pchannel(
brpc::Channel* sub_channel, uint32_t channel_count,
uint32_t package_size, const brpc::ChannelOptions& options) {
if (channel_count <= 1) { // noneed use parallel channel
LOG(INFO) << "channel count <= 1, noneed use pchannel.";
return NULL;
}
_pchannel = butil::get_object<brpc::ParallelChannel>();
if (!_pchannel) {
LOG(FATAL) << "Failed get pchannel from object pool";
return NULL;
}
brpc::ParallelChannelOptions pchan_options;
pchan_options.timeout_ms = options.timeout_ms;
if (_pchannel->Init(&pchan_options) != 0) {
LOG(FATAL) << "Failed init parallel channel with tmo_us: "
<< pchan_options.timeout_ms;
return NULL;
}
for (uint32_t si = 0; si < channel_count; ++si) {
if (_pchannel->AddChannel(
sub_channel,
brpc::DOESNT_OWN_CHANNEL,
new C(package_size, this),
new R(package_size, this)) != 0) {
LOG(FATAL) << "Failed add channel at: " << si
<< ", package_size:" << package_size;
return NULL;
}
}
return _pchannel;
}
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_STUB_IMPL_HPP
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file utils.h
* @author root(com@baidu.com)
* @date 2018/07/09 19:43:36
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_SDK_CPP_UTILS_H
#define BAIDU_PADDLE_SERVING_SDK_CPP_UTILS_H
#include "common.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
inline int str_split(
const std::string& source,
const std::string& delim,
std::vector<std::string>* vector_spliter) {
int delim_length = delim.length();
int total_length = source.length();
int last = 0;
if (delim_length == 0) {
vector_spliter->push_back(source);
return 0;
}
if (delim_length == 1) {
size_t index = source.find_first_of(delim, last);
while (index != std::string::npos) {
vector_spliter->push_back(source.substr(last,
index - last));
last = index + delim_length;
index = source.find_first_of(delim, last);
}
} else {
size_t index = source.find(delim, last);
while (index != std::string::npos) {
vector_spliter->push_back(source.substr(last,
index - last));
last = index + delim_length;
index = source.find(delim, last);
}
}
if (last < total_length) {
vector_spliter->push_back(source.substr(last,
total_length - last));
}
return 0;
}
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_SDK_CPP_UTILS_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/variant.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/27 17:37:31
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_CPP_SDK_VARIANT_H
#define BAIDU_PADDLE_SERVING_CPP_SDK_VARIANT_H
#include "common.h"
#include "endpoint_config.h"
#include "stub.h"
#include "predictor.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
class Variant {
friend class VariantRouterBase;
public:
virtual ~Variant() {}
Variant() : _default_stub(NULL) {
_stub_map.clear();
}
int initialize(
const EndpointInfo& ep_info,
const VariantInfo& var_info);
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
Predictor* get_predictor(
const void* params);
Predictor* get_predictor();
int ret_predictor(Predictor* predictor);
const std::string& variant_tag() const {
return _variant_tag;
}
private:
std::string _endpoint_name;
std::string _stub_service;
std::string _variant_tag;
std::map<std::string, Stub*> _stub_map;
Stub* _default_stub;
};
} // sdk_cpp
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_CPP_SDK_VARIANT_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
FILE(GLOB protos ${CMAKE_CURRENT_LIST_DIR}/*.proto)
PROTOBUF_GENERATE_SERVING_CPP(PROTO_SRCS PROTO_HDRS ${protos})
LIST(APPEND sdk_cpp_srcs ${PROTO_SRCS})
此差异已折叠。
此差异已折叠。
syntax="proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.image_classification;
option cc_generic_services = true;
message ClassifyResponse {
repeated baidu.paddle_serving.predictor.format.DensePrediction predictions = 1;
};
message Request {
repeated baidu.paddle_serving.predictor.format.XImageReqInstance instances = 1;
};
message Response {
// Each json string is serialized from ClassifyResponse predictions
repeated baidu.paddle_serving.predictor.format.XImageResInstance predictions = 1;
};
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
此差异已折叠。
此差异已折叠。
此差异已折叠。
FILE(GLOB srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND sdk_cpp_srcs ${srcs})
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
COMLOG_LEVEL : 2
COMLOG_LEVEL : 16
COMLOG_DEVICE_NUM : 2
COMLOG_DEVICE0 : TRACE
COMLOG_DEVICE1 : WARNING
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册