提交 958a6db9 编写于 作者: W wangguibao

predictor

上级 6b107eca
#coding:gbk
COMPILER('gcc482')
#工作路径.
WORKROOT('../../../')
# version info
## module name
repo_module = REPO_PATH()
## git branch name (master/rb)
repo_name = REPO_BRANCH()
## last changed version
repo_version = REPO_LAST_CHANGED_REV()
version = repo_module + '_' + repo_name.split('/')[-1] + ',' + repo_version
build_time = os.popen('date +%Y-%m-%d_%H:%M:%S').read().strip()
#Preprocessor flags.
CPPFLAGS(r'-DPDSERVING_VERSION=\"%s\"' % (version))
CPPFLAGS(r'-DPDSERVING_BUILDTIME=\"%s\"' % (build_time))
#使用硬链接copy.
#CopyUsingHardLink(True)
#C++编译参数.
#CXXFLAGS('-fsanitize=address -g -DNDEBUG -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer -Wno-deprecated \
# -Wno-unused-parameter -Wno-unused-variable -Wno-unused-local-typedefs -Wno-sign-compare \
# -std=c++11')
CXXFLAGS('-g -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer -Wno-deprecated \
-Wno-unused-parameter -Wno-unused-variable -Wno-unused-local-typedefs -Wno-sign-compare \
-std=c++11')
#for profiler
#CPPFLAGS('-D__const__= -Dtypeof=__typeof__ -DUSE_PTHREAD -DUSE_XBOX -DBAIDU_RPC_ENABLE_CPU_PROFILER -DBAIDU_RPC_ENABLE_HEAP_PROFILER')
#使用C++11编译idl报错:error: ‘typeof’ was not declared in this scope,需要加上-Dtypeof=__typeof__
CPPFLAGS('-D__const__= -Dtypeof=__typeof__ -DUSE_PTHREAD')
#链接参数.
LDFLAGS('-lpthread -lcrypto -lrt -lssl -ldl -lz')
#依赖模块
CONFIGS('baidu/base/baidu-rpc@ci-base')
CONFIGS('baidu/im-common/mempool@mempool_1-0-1_BRANCH@git_branch')
CONFIGS('baidu/third-party/opencv@master@git_branch')
CONFIGS('public/configure@configure_1-2-17-0_PD_BL')
#CONFIGS('lib2-64/ullib@ullib_3-1-135-21782_PD_BL')
CONFIGS('third-64/boost@boost_1-63-0-101_PD_BL')
CONFIGS('public/bthread@ci-base')
CONFIGS('third-64/protobuf@protobuf_2-4-1-1100_PD_BL')
#CONFIGS('third-64/protobuf@protobuf_3-1-0-6209_PD_BL')
# for ut
CONFIGS('third-64/gtest@1.7.2.0')
# for profiler
#CONFIGS('thirdsrc/tcmalloc@2.5.0.5977', Libraries('libtcmalloc_and_profiler.a'))
# McCache
CONFIGS('baidu/base/cache@cache_3-1-7-21784_PD_BL')
INCPATHS('$OUT/include/')
PROTOFLAGS('--proto_path=.')
#ServiceGenerator
HEADERS(GLOB_GEN_SRCS('./proto/*.h'), '$INC/')
HEADERS(GLOB('./plugin/*.h'), '$INC/plugin')
Application('pdcodegen', Sources(GLOB('plugin/*.cc'), 'proto/pds_option.proto',
'src/pdcodegen.cpp', IncludePaths('. ./proto/ $OUT/include')))
HEADERS(GLOB_GEN_SRCS('./proto/*.h'), '$INC/')
HEADERS(GLOB('./proto/*.proto'), '$INC/proto')
HEADERS(GLOB('./common/*.h'), '$INC/common')
HEADERS(GLOB('./op/*.h'), '$INC/op')
HEADERS(GLOB('./framework/*.h'), '$INC/framework')
cpp_source_dirs = []
cpp_source_dirs.append('common/*.cpp')
cpp_source_dirs.append('op/*.cpp')
cpp_source_dirs.append('framework/*.cpp')
cpp_source_dirs.append('proto/*.proto')
#支持.proto作为源文件
PROTOFLAGS('--plugin=protoc-gen-pdcodegen=plugin/pdcodegen --pdcodegen_out proto --proto_path=.')
#StaticLib
StaticLibrary('pdserving',
Sources(GLOB(' '.join(cpp_source_dirs)), 'src/pdserving.cpp'),
LinkFlags('-lpthread -lcrypto -lm -lrt -lssl -ldl -lz'))
#可执行文件
Application('pdserving',
Sources(GLOB(' '.join(cpp_source_dirs)), 'src/pdserving.cpp'),
LinkFlags('-lpthread -lcrypto -lm -lrt -lssl -ldl -lz '))
Application('pdclient',
Sources(GLOB(' '.join(cpp_source_dirs)), 'src/pdclient.cpp'),
LinkFlags('-lpthread -lcrypto -lm -lrt -lssl -ldl -lz'))
#单元测试
ut_include='./include ./unittest $OUT/include'
ut_sources=GLOB('./unittest/*.cpp')
ut_cppflag='-Dprivate=public -Dprotected=public -DUNIT_TEST -std=c++11 -usercode_in_pthread -DUSE_PTHREAD'
ut_ldflags='-lpthread -lssl -lcrypto -lrt -ldl -lz -std=c++11 -usercode_in_pthread '
ut_gdbflag='-O0 -g -fpermissive -std=c++11 -usercode_in_pthread'
UTApplication(
'test_pdserving',
Sources(
ut_sources,
IncludePaths(ut_include),
CppFlags(ut_cppflag),
CxxFlags(ut_gdbflag)
),
Libraries('$OUT/lib/libpdserving.a'),
LinkFlags(ut_ldflags),
UTOnServer(True))
OUTPUT('./conf', '$OUT/')
OUTPUT('./data', '$OUT/')
OUTPUT('./scripts/images', '$OUT/data/')
OUTPUT('./scripts/start.sh', '$OUT/bin/')
FROM registry.baidu.com/public/centos6u3-online:gcc482
MAINTAINER predictor@baidu.com
LABEL Description="paddle serving docker image"
USER root
RUN echo "Enjoy your paddle serving journey!"
ADD conf /home/work/paddle-serving/conf
ADD data /home/work/paddle-serving/data
ADD bin /home/work/paddle-serving/bin
RUN wget ftp://st01-rdqa-dev055-wanlijin01.epc.baidu.com/home/users/wanlijin01/workspace/baidu/paddle-serving/predictor/data.tar.gz -O /tmp/data.tar.gz \
&& tar -C /home/work/paddle-serving -xvzf /tmp/data.tar.gz \
&& rm /tmp/data.tar.gz \
&& cd /home/work/paddle-serving/ \
&& chmod a+x bin/pdserving \
&& chmod a+x bin/start.sh \
&& sed -i 's/\.\/conf/\/home\/work\/paddle-serving\/conf/g' conf/workflow.conf \
&& sed -i 's/\.\/conf/\/home\/work\/paddle-serving\/conf/g' conf/resource.conf \
&& sed -i 's/\.\/log/\/home\/work\/paddle-serving\/log/g' conf/log.conf \
&& sed -i 's/\.\/data/\/home\/work\/paddle-serving\/data/g' conf/model_toolkit.conf \
&& mkdir -p /home/work/paddle-serving/log
CMD sh /home/work/paddle-serving/bin/start.sh -c "trap : TERM INT; sleep infinity & wait"
FROM registry.baidu.com/paddlecloud/paddlecloud-runenv-centos6u3-bce:paddlecloud-fluid-gcc482-cuda8.0_cudnn5_bce
MAINTAINER predictor@baidu.com
LABEL Description="paddle serving docker image"
USER root
RUN echo "Enjoy your paddle serving journey!"
ADD conf /home/work/paddle-serving/conf
ADD data /home/work/paddle-serving/data
ADD bin /home/work/paddle-serving/bin
RUN wget ftp://st01-rdqa-dev055-wanlijin01.epc.baidu.com/home/users/wanlijin01/workspace/baidu/paddle-serving/predictor/data.tar.gz -O /tmp/data.tar.gz \
&& tar -C /home/work/paddle-serving -xvzf /tmp/data.tar.gz \
&& rm /tmp/data.tar.gz \
&& cd /home/work/paddle-serving/ \
&& chmod a+x bin/pdserving \
&& chmod a+x bin/start.sh \
&& sed -i 's/\.\/conf/\/home\/work\/paddle-serving\/conf/g' conf/workflow.conf \
&& sed -i 's/\.\/conf/\/home\/work\/paddle-serving\/conf/g' conf/resource.conf \
&& sed -i 's/\.\/log/\/home\/work\/paddle-serving\/log/g' conf/log.conf \
&& sed -i 's/\.\/data/\/home\/work\/paddle-serving\/data/g' conf/model_toolkit.conf \
&& mkdir -p /home/work/paddle-serving/log
CMD sh /home/work/paddle-serving/bin/start.sh -c "trap : TERM INT; sleep infinity & wait"
#
# bcloud default makefile
# more details please refer to ./.bcloud.cache/makefile.*
#
.SILENT:
#
#makefile-self location, must be always on the top!!!
#
MAKEFILE_PATH_DEFAULT := $(word $(words $(MAKEFILE_LIST)), $(MAKEFILE_LIST))
MAKEFILE_DIR_DEFAULT := $(shell cd $(dir $(MAKEFILE_PATH_DEFAULT)) && pwd)
MAKEFILE_DEAULT := 'imported'
#
#Global Configs
#
WORK_ROOT := $(shell cd $(MAKEFILE_DIR_DEFAULT)/../../.. && pwd)
GCC_PATH := /opt/compiler/gcc-4.8.2/bin
#
#import global configs
#
ifneq ($(MAKEFILE_CONFIG), 'imported')
include $(MAKEFILE_DIR_DEFAULT)/.bcloud.cache/makefile.config
endif
.PHONEY: all clean distclean cleanall help
all: main
clean:
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/\*.o ...
find $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor -name "*.o" | xargs rm -rf
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/bin ...
rm -rf $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/bin
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/lib ...
rm -rf $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/lib
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/so ...
rm -rf $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/so
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/test ...
rm -rf $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/test
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/output ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/output
distclean:
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile.baidu_paddle-serving_predictor ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile.baidu_paddle-serving_predictor
echo removing $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor ...
rm -rf $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/output ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/output
echo -e '\033[32m[NOTICE]\033[0m'
echo -e \\tplease run \'bcloud local -M\' before next \'make\'!!!
cleanall:
echo removing $(WORK_ROOT)/bc_out ...
rm -rf $(WORK_ROOT)/bc_out
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/output ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/output
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile
echo removing $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile\* ...
rm -rf $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile*
echo -e '\033[32m[NOTICE]\033[0m'
echo -e \\tplease run \'bcloud local or bcloud local -M ALL\' before next \'make\'!!!
app/ecom/elib/ecommon-lib: app/ecom/elib/ecommon-lib_deps
baidu/base/protobuf-json: baidu/base/protobuf-json_deps
baidu/base/common: baidu/base/common_deps
third-64/gtest: third-64/gtest_deps
baidu/base/bvar: baidu/base/bvar_deps
public/protobuf-json: public/protobuf-json_deps
baidu/base/cache: baidu/base/cache_deps
baidu/base/spreg: baidu/base/spreg_deps
baidu/bns/naming-lib: baidu/bns/naming-lib_deps
lib2-64/bsl: lib2-64/bsl_deps
third-64/gflags: third-64/gflags_deps
third-64/pcre: third-64/pcre_deps
lib2-64/cache: lib2-64/cache_deps
baidu/base/bthread: baidu/base/bthread_deps
third-64/leveldb: third-64/leveldb_deps
lib2-64/ullib: lib2-64/ullib_deps
public/common: public/common_deps
public/bvar: public/bvar_deps
baidu/base/bsl: baidu/base/bsl_deps
baidu/base/configure: baidu/base/configure_deps
public/spreg: public/spreg_deps
public/configure: public/configure_deps
baidu/base/ullib: baidu/base/ullib_deps
baidu/base/baidu-rpc: baidu/base/baidu-rpc_deps
third-64/libevent: third-64/libevent_deps
baidu/third-party/opencv: baidu/third-party/opencv_deps
baidu/base/dict: baidu/base/dict_deps
op/oped/noah/webfoot/naming-lib: op/oped/noah/webfoot/naming-lib_deps
baidu/elib/ecommon-lib: baidu/elib/ecommon-lib_deps
public/bthread: public/bthread_deps
public/noah/giano-lib/release/baas-lib-c: public/noah/giano-lib/release/baas-lib-c_deps
app/ecom/im/mempool: app/ecom/im/mempool_deps
baidu/base/mcpack2pb: baidu/base/mcpack2pb_deps
third-64/boost: third-64/boost_deps
public/baidu-rpc: public/baidu-rpc_deps
public/mcpack2pb: public/mcpack2pb_deps
baidu/base/iobuf: baidu/base/iobuf_deps
public/iobuf: public/iobuf_deps
baidu/im-common/mempool: baidu/im-common/mempool_deps
lib2-64/dict: lib2-64/dict_deps
help:
echo -e 'all available targets of make($(MAKE_VERSION)):\n'
echo ' all : will compile all targets and run release.bcloud'
echo ' no-release.bcloud: will compile all targets and not run release.bcloud'
echo ' no-ut : will compile all targets without ut and run release.bcloud'
echo ' test : will compile all targets, run ut and run release.bcloud'
echo ' clean : will only cleanup Intermediate files(such as .o, .so, .a, ut and bin) of main module:'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/bin'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/lib'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/so'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/\*.o'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor/output/test'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/output'
echo ' distclean : will cleanup makefile, intermediate files(such as .o, .so, .a, ut and bin) and pb.cc/pb.h idl.h/idl.cpp of main module:'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile.baidu_paddle-serving_predictor'
echo ' $(WORK_ROOT)/bc_out/baidu/paddle-serving/predictor'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/output'
echo ' cleanall : will cleanup makefiles, intermediate files(such as .o, .so, .a, ut and bin) and pb.cc/pb.h idl.h/idl.cpp of all modules:'
echo ' $(WORK_ROOT)/bc_out'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/output'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/Makefile'
echo ' $(WORK_ROOT)/baidu/paddle-serving/predictor/.bcloud.cache/makefile*'
echo ' help : list all available make targets'
echo -e '\ntargets for the compile of dependency module:'
echo ' app/ecom/elib/ecommon-lib'
echo ' app/ecom/im/mempool'
echo ' baidu/base/baidu-rpc'
echo ' baidu/base/bsl'
echo ' baidu/base/bthread'
echo ' baidu/base/bvar'
echo ' baidu/base/cache'
echo ' baidu/base/common'
echo ' baidu/base/configure'
echo ' baidu/base/dict'
echo ' baidu/base/iobuf'
echo ' baidu/base/mcpack2pb'
echo ' baidu/base/protobuf-json'
echo ' baidu/base/spreg'
echo ' baidu/base/ullib'
echo ' baidu/bns/naming-lib'
echo ' baidu/elib/ecommon-lib'
echo ' baidu/im-common/mempool'
echo ' baidu/third-party/opencv'
echo ' lib2-64/bsl'
echo ' lib2-64/cache'
echo ' lib2-64/dict'
echo ' lib2-64/ullib'
echo ' op/oped/noah/webfoot/naming-lib'
echo ' public/baidu-rpc'
echo ' public/bthread'
echo ' public/bvar'
echo ' public/common'
echo ' public/configure'
echo ' public/iobuf'
echo ' public/mcpack2pb'
echo ' public/noah/giano-lib/release/baas-lib-c'
echo ' public/protobuf-json'
echo ' public/spreg'
echo ' third-64/boost'
echo ' third-64/gflags'
echo ' third-64/gtest'
echo ' third-64/leveldb'
echo ' third-64/libevent'
echo ' third-64/pcre'
#
#import dependency modules
#
ifneq ($(MAKEFILE_BAIDU_PADDLE-SERVING_PREDICTOR), 'imported')
include $(MAKEFILE_DIR_DEFAULT)/.bcloud.cache/makefile.baidu_paddle-serving_predictor
endif
[TOC]
# 概述
PaddlePaddle是公司开源的机器学习框架,广泛支持各种深度学习模型的定制化开发;
Paddle cloud是基于PaddlePaddle框架实现的一整套云平台,对外提供全流程的AI开发平台,对内托管集团内各产品线的机器学习云服务。
Paddle serving是Paddle cloud的在线预测部分,与Paddle cloud模型训练环节无缝衔接,对外提供机器学习预测共有云服务,对内为公司各业务线提供统一的模型预测开发框架和云服务。
# Getting Started
## 运行示例
说明:Imagenet图像分类模型,默认采用CPU模式(GPU模式请修改BCLOUD配置项,并用Dockerfile构建运行环境,[Docker部署请参考Wiki](http://agroup.baidu.com/share/md/044f552e866f4078900be503784e2468))。
Step1:启动Server端:
```shell
git clone ssh://icode.baidu.com:8235/baidu/paddle-serving/serving ~/my_paddle_serving/baidu/paddle-serving/serving && cd ~/my_paddle_serving/baidu/paddle-serving/serving && bcloud build && ./output/bin/image_class &
```
Step2:启动Client端:
```shell
git clone ssh://icode.baidu.com:8235/baidu/paddle-serving/sdk-cpp ~/my_paddle_serving/baidu/paddle-serving/sdk-cpp && cd ~/my_paddle_serving/baidu/paddle-serving/sdk-cpp && bcloud build && ./output/bin/ximage && pkill image_class
```
## 示例说明
### 预测接口定义
```c++
syntax="proto2";
package baidu.paddle_serving.predictor.image_class;
option cc_generic_services = true;
// x-image request相关(批量接口)
message XImageReqInstance {
required bytes image_binary = 1;
required uint32 image_length = 2;
};
message Request {
repeated XImageReqInstance instances = 1;
};
// x-image response相关(批量接口)
message DensePrediction {
repeated float categories = 1;
};
message ClassResponse {
repeated DensePrediction predictions = 1;
};
message XImageResInstance {
required string response_json = 1;
};
message Response {
// Each json string is serialized from ClassResponse
repeated XImageResInstance predictions = 1;
};
// Service/method相关
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
};
```
### Server端实现
用户只需定制或配置以下三类信息的实现,即可快速搭建完整的Paddle-Serving预测模块。
#### 接口改造([proto目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:proto/))
Server端需对预测接口作如下修改即可:
```c++
// 改动1:依赖paddle-serving option接口文件
import "pds_option.proto";
...
service ClassService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
// 改动2:打开generate_impl开关(以支持配置驱动)
option (pds.options).generate_impl = true;
};
```
#### 示例配置([conf目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:conf/))
- gflags配置项
| name | 默认值 | 含义 |
|------|--------|------|
| workflow_path | ./conf | workflow配置目录名 |
|workflow_file|workflow.conf|workflow配置文件名|
|inferservice_path|./conf|service配置目录名|
|inferservice_file|service.conf|service配置文件名|
|logger_path|./conf|日志配置目录名|
|logger_file|log.conf|日志配置文件名|
|resource_path|./conf|资源管理器目录名|
|resource_file|resource.conf|资源管理器文件名|
|reload_interval_s|10|重载线程间隔时间(s)|
- 配置文件实例(Image图像分类demo)
```shell
# >>> service.conf
[@Service]
name: ImageClassifyService
@workflow: workflow_image_classification
# >>> workflow.conf
[@Workflow]
name: workflow_image_classification
path: ./conf
file: imagec_dag.conf
# >>> imagec_dag.conf
workflow_type: Sequence
[@Node]
name: image_reader_op
type: ImageReaderOp
[@Node]
name: image_classify_op
type: ImageClassifyOp
[.@Depend]
name: image_reader_op
mode: RO
[@Node]
name: write_json_op
type: WriteJsonOp
[.@Depend]
name: image_classify_op
mode: RO
# >>> resource.conf
model_manager_path: ./conf
model_manager_file: model_toolkit.conf
```
#### 定制Op算子([op目录](http://icode.baidu.com/repos/baidu/paddle-serving/serving/tree/master:op/))
- 预处理算子(ImageReaderOp):从Request中读取图像字节流,通过opencv解码,填充tensor对象并输出到channel;
- 预测调用算子(ImageClassifyOp):从ImageReaderOp的channel获得输入tensor,临时申请输出tensor,调用ModelToolkit进行预测,并将输出tensor写入channel
- 后处理算子(WriteJsonOp):从ImageClassifyop的channel获得输出tensor,将其序列化为json字符串,写入作为rpc的output;
### Client端实现
用户只需定制或配置以下三类信息,即可方便的接入预估请求,并在本地配置多套服务连接:
#### 接口改造([proto目录](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/tree/master:proto))
Client端接口只需对预测接口作如下修改即可:
```c++
// 改动1:依赖paddle-serving option接口文件
import "pds_option.proto";
...
service ImageClassifyService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
// 改动2:打开generate_stub开关(以支持配置驱动)
option (pds.options).generate_stub = true;
};
```
#### 连接配置([conf目录](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/tree/master:conf))
```shell
# predictions.conf
## 默认配置共享
[DefaultVariantInfo]
Tag : default
[.Connection]
ConnectTimeoutMicroSec : 200
ReadTimeoutMicroSec : 2000
WriteTimeoutMicroSec : 500
ConnectRetryCount : 2
MaxConnectionPerHost : 100
HedgeRequestTimeoutMicroSec : -1
HedgeFetchRetryCount : 2
BnsReloadIntervalSeconds : 10
ConnectionType : pooled
[.NamingInfo]
ClusterFilterStrategy : Default
LoadBalanceStrategy : la
[.RpcParameter]
# 0-NONE, 1-SNAPPY, 2-GZIP, 3-ZLIB, 4-LZ4
CompressType : 0
Protocol : baidu_std
MaxChannelPerRequest : 3
[@Predictor]
name : ximage
service_name : baidu.paddle_serving.predictor.image_class.ImageClassifyService
endpoint_router : WeightedRandomRender
[.WeightedRandomRender]
VariantWeightList : 30|70 # 30% vs 70% pvs
[.@VariantInfo]
Tag : var1 # 变体版本标识,提供上游辨识
[..NamingInfo]
Cluster : list://127.0.0.1:8010
[.@VariantInfo]
Tag : var2
[..NamingInfo]
Cluster : list://127.0.0.1:8011
```
#### 请求逻辑([demo/ximage.cpp](http://icode.baidu.com/repos/baidu/paddle-serving/sdk-cpp/blob/master:demo/ximage.cpp))
```c++
// 进程级初始化
assert(PredictorAPI::instance().create("./conf/predictions.conf") == 0);
// 线程级预测调用:
Request req;
// fill request
// ...
Response res;
Predictor* ximage = PredictorAPI::instance().fetch_predictor("ximage");
assert(ximage != NULL);
ximage->inference(req, res);
// parse response
// ...
assert(PredictorAPI::instance().free_predictor(ximage) == 0);
// 进程级销毁
assert(PredictorAPI::instance().destroy() == 0);
```
## 凤巢协议兼容
Paddle Serving由凤巢观星框架发展而来,而之前框架的通信协议是nshead+compack+idl,为方便新老接口的兼容,Paddle Serving的server和client均支持向后兼容:
- 老API访问新Server,为适配老观星客户端数据包格式,新Server需通过mcpack2pb生成能解析idl格式的pb对象,详见:[wtitleq server实现](http://icode.baidu.com/repos/baidu/paddle-serving/lr-model/tree/master)
- 新SDK访问老Server,为能够访问老观星server服务,SDK需通过mcpack2pb插件生成基于idl格式的序列化逻辑;详见:[wtitleq api实现](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/tree/master)
凤巢广告拆包支持:Paddle Serving的SDK-Cpp为用户提供了简单易用的拆包功能,通过修改proto/conf文件开启:
```c++
// interface.proto文件
message PredictorRequest {
message AdvRequest {
// 广告级别字段
repeated uint32 ideaid = 1;
repeated string title = 2;
}
// query级别字段
required uint64 sid = 1;
required string query = 2;
// ...
// 广告级别字段
repeated AdvRequest advs = 3 [(pds.pack_on)=true]; // 改动1:对advs字段进行拆包
}
// ...
service WtitleqService {
rpc ...
rpc ...
option (pds.options).package_size = 10; // 改动2:限制单包大小
}
```
[wtitleq sdk的proto实例](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/blob/master:proto/predictor_api.proto)
```bash
# predictions.conf文件
[@Predictor]
# ...
[.@VariantInfo]
#...
[..RpcParameter]
Protocol : itp # 改动3:修改rpc请求参数为itp协议
```
[wtitleq sdk的conf实例](http://icode.baidu.com/repos/baidu/infinite-inference/as-wtitleq-demo/blob/master:conf/predictors.conf)
# 框架简介
![图片](http://agroup-bos.cdn.bcebos.com/63a5076471e96a08124b89101e12c1a0ec7b642a)
- 基础框架:屏蔽一个RPC服务所需的所有元素,让用户只关注自己的业务算子的开发;
- 业务框架:基于Protobuf定制请求接口,基于有限DAG定制业务逻辑,并行化调度;
- 模型框架:CPU/FPGA/GPU等硬件异构,多模型间异步优先级调度,新引擎灵活扩展,配置化驱动;
- 用户接口:搭建服务=定义proto文件+实现/复用Op+撰写配置,支持sdk/http请求;
## 名词解释
- 预测引擎:对PaddlePaddle/Abacus/Tensorflow等各种推理计算Lib的封装,屏蔽预测模型动态Reload细节,对上层暴露统一的预测接口;
- 预测模型:由离线训练框架生成、在线预测引擎加载的数据文件或目录,以PaddleFluid模型为例,通常包括拓扑文件和参数文件;
- Op 算子:Paddle-serving对在线(预处理/后处理等)业务逻辑的最小粒度封装,框架提供OpWithChannel和OpWithChannelAndConf这两种常用的Op基类;框架默认实现通用Op算子;
- Node:由某个Op算子类结合参数配置组成的Op算子实例,也是Workflow中的一个执行单元;
- DAG/Workflow:由若干个相互依赖的Node组成,每个Node均可通过特定接口获得Request对象,节点Op通过依赖关系获得其前置Op的输出对象,最后一个Node的输出默认就是Response对象;
- Service:对一次pv的请求封装,可配置若干条Workflow,彼此之间复用当前PV的Request对象,然后各自并行/串行执行,最后将Response写入对应的输出slot中;一个Paddle-serving进程可配置多套Service接口,上游根据ServiceName决定当前访问的Service接口。
![图片](http://agroup-bos.cdn.bcebos.com/2e5e3cdcc9426d16e2090e64e7d33098ae5ad826)
## 主要功能
Paddle serving框架为策略工程师提供以下三层面的功能性扩展:
### 模型
- 预测引擎:集成PaddlePaddle、Abacus、Tensorrt、Anakin、Tensorflow等常用机器学习框架的预测Lib;
- 模型种类:支持PaddlePaddle(V1、V2、Fluid)、TensorrtUFF、Anakin、Tensorflow、Caffe等常见模型格式;
- 用户接口:支持模型加载、重载的配置化驱动,不同种类模型的预测接口完全一致;
- 模型调度:支持基于异步线程模型的多模型预估调度,实现异构资源的优先级调度;
### 业务
- 预测流程:通过有限DAG图描述一次预测从Request到Response的业务流程,节点Node是一个最小逻辑单元——OP;
- 预测逻辑:框架封装常用预处理、预测计算、后处理等常用OP,用户通过自定义OP算子实现特化处理逻辑;
### 服务
- RPC:底层通过Baidu-rpc封装网络交互,Server端可配置化启动多个独立Service,框架会搜集Service粒度的详细业务指标,并按照BVar接口对接到Noah等监控平台;
- SDK:基于Baidu-rpc的client进行封装,提供多下游连接管理、可扩展路由策略、可定制参数实验、自动分包等机制,支持同步、半同步、纯异步等交互模式,以及多种兼容协议,所有连接策略均通过配置驱动
# 平台简介
![图片](http://agroup-bos.cdn.bcebos.com/42a0e34a7c6b36976e3932639209fd823d8f25e0)
- [运维API](http://agroup.baidu.com/share/md/e582f543fb574e9b92445286955a976d)
- [预测API](http://agroup.baidu.com/share/md/eb91a51739514319844ceccdb331564c)
## 名词解释
- 用户(User):云平台注册用户,可基于平台Dashboard对账户下的端点信息进行增、删、查、改;
- 端点(Endpoit):对一个预测需求的逻辑抽象,通常包含一到多个服务变体,以方便多版本模型管理;
- 变体(Variant):一套同质化的Paddle-serving集群服务,每个实例起一个Paddle-serving进程;
- 实验(A/B Test):支持变体实验和参数化实验两种模式,变体实验根据Endpoint所属变体流量百分比实现流量随机抽样;参数化实验通过对pv绑定实验参数、由Paddle-serving进程解析参数、选择不同的代码分支进行实验;
## 主要功能
在公有云落地场景为Infinite(天衍)云平台,主要为策略工程师提供以下三方面的全流程托管:
- 统一接入代理:提供代理服务,通过zk和云平台实时同步元信息,支持多模型版本管理和A/B测试路由策略,提供统一入口和标准预测API;
- 自动化部署:对接K8S/Opera等常见PaaS部署平台,支持服务的一键部署、回滚、下线等运维操作,支持endpoint/variant/model等维度的资源管理;
- 可视化运维:对接console、notebook、dashboard等前端工具和页面,满足可视化运维需求;
# 设计文档
- [总体设计文档](http://agroup.baidu.com/paddleserving/view/office/895070)
- [框架详设文档](http://agroup.baidu.com:8964/static/a3/e40876e464ba08ae5de14aa7710cf326456751.pdf?filename=PaddleServing%E6%9C%8D%E5%8A%A1%E6%A1%86%E6%9E%B6%E8%AF%A6%E7%BB%86%E8%AE%BE%E8%AE%A1%E6%96%87%E6%A1%A3v0_1.pdf)
- [平台详设文档](http://agroup.baidu.com/share/office/042a0941579e49adb8c255c8b5e92d51)
# FAQ
1. 如何修改端口配置?
- 使用该框架搭建的服务需要申请一个端口,可以通过以下方式修改端口号:
- 如果在inferservice_file里指定了port:xxx,那么就去申请该端口号;
- 否则,如果在gflags.conf里指定了--port:xxx,那就去申请该端口号;
- 否则,使用程序里指定的默认端口号:8010。
2. 如何在部署的时候配置动态端口?
- 如果使用FCCI部署协议(凤巢检索端内部的部署协议),需要(1)通过inferservice_file指定端口号;(2)修改[Rakefile.opera](http://wiki.baidu.com/pages/viewpage.action?pageId=399979183#id-%E4%BB%8E%E9%9B%B6%E5%BC%80%E5%A7%8B%E5%86%99production-%E7%BC%96%E5%86%99Rakefile)的dynamic_port_config配置
- `@dynamic_port_config为动态端口配置,向Opera申请名为:name的动态端口,其端口号会被写到:conf文件中的:target配置项。`例子如下:
```
@dynamic_port_config = [
{:name => 'main', :conf => 'framework/service.conf', :target => 'port'}, // 部署时自动向Opera申请端口,服务将会监听这个端口
{:name => 'main', :conf => 'predictor_valid.conf', :target => 'port'}, // valid工具向这个端口发送测试请求,确保服务已正常启动
]
```
此差异已折叠。
#!/bin/bash
function install_pdserving_lib(){
ret=1
local pdserving_lib_mode=$1
case $pdserving_lib_mode in
local)
local pdserving_local_path=$2
if [ ! -d $pdserving_local_path ]; then
echo "[WARN failed to find local path]"
return ret
fi
lib_name=`basename $pdserving_local_path`
if [ -d ${CITOOLS}/$lib_name ]; then
rm -rf ${CITOOLS}/$lib_name
fi
cp -rf $pdserving_local_path ${CITOOLS}/
source ${CITOOLS}/$lib_name/predictor_build_lib.sh
;;
ftp)
local wgetOptions="--tries=3 --retry-connrefused -r -l0 -nv --limit-rate=50m -nH --cut-dirs=5"
pdserving_lib_ftp_path="ftp://tc-orp-app2.tc.baidu.com:/home/heqing/scmbak/common_lib/pdserving_cts/pdserving_lib"
lib_name=`basename $pdserving_lib_ftp_path`
if [ -d ${CITOOLS}/$lib_name ]; then
rm -rf ${CITOOLS}/$lib_name
fi
echo "wget cmd is :$wgetOptions $pdserving_lib_ftp_path"
echo "lib_name is :${lib_name}"
wget $wgetOptions$cur_dirs $pdserving_lib_ftp_path
mv ${lib_name} ${CITOOLS}/
source ${CITOOLS}/${lib_name}/predictor_build_lib.sh
;;
*)
ret=0
echo "todo"
;;
esac
return $ret
}
CUR_PATH=$(pwd)
WORK_PATH=$(pwd)
WORK_ROOT=${WORK_PATH%%/baidu/*}
#co citools
CITOOLS="${WORK_ROOT}/baidu/fengchao-qa/citools"
if [ -d ${CITOOLS} ];then
rm -rf ${CITOOLS}
fi
git clone --depth 1 ssh://git@icode.baidu.com:8235/baidu/fengchao-qa/citools $CITOOLS >/dev/null
[[ $? != 0 ]] && exit 1
source $CITOOLS/lib/localbuild_lib.sh
#source过后路径可能改变,需要重新赋值
CITOOLS="${WORK_ROOT}/baidu/fengchao-qa/citools"
#install_pdserving_lib
pdserving_lib_mode="ftp"
install_pdserving_lib ${pdserving_lib_mode} #两种模式:如果是local,需要指定本机上pdserving_lib的路径
#source ${CITOOLS}/pdserving_lib/predictor_build_lib.sh
COVMODULEID=8652
TYPE=framework
#执行本模块构建初始化
predictor_build_init
WORKROOT=$WORK_ROOT
#执行构建命令
predictor_build_do $@
exit 0
Global:
tool: bcloud
Default:
profile: [change]
Profiles:
- profile:
name: change
command: bcloud ut
release: true
#include "common/constant.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
DEFINE_bool(use_parallel_infer_service, false, "");
DEFINE_int32(el_log_level, 16, "");
DEFINE_int32(idle_timeout_s, 16, "");
DEFINE_int32(port, 8010, "");
DEFINE_string(workflow_path, "./conf", "");
DEFINE_string(workflow_file, "workflow.conf", "");
DEFINE_string(inferservice_path, "./conf", "");
DEFINE_string(inferservice_file, "service.conf", "");
DEFINE_string(logger_path, "./conf", "");
DEFINE_string(logger_file, "log.conf", "");
DEFINE_string(resource_path, "./conf", "");
DEFINE_string(resource_file, "resource.conf", "");
DEFINE_bool(enable_yacl, false, "enable yacl");
DEFINE_string(yacl_module_name, "predictor", "yacl module name");
DEFINE_string(yacl_param_dump_file, "./data/yacl_param_list.txt", "yacl param dump file path");
DEFINE_bool(enable_mc_cache, false, "enable mc cache");
DEFINE_bool(enable_nshead_protocol, false, "enable nshead protocol in server side");
DEFINE_string(nshead_protocol, "itp", "type of nshead protocol, support itp, nova_pbrpc, public_pbrpc, nshead_mcpack");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel, 0: unlimited");
DEFINE_int32(num_threads, 0, "Number of pthreads that server runs on, not change if this value <= 0");
DEFINE_int32(reload_interval_s, 10, "");
DEFINE_bool(enable_model_toolkit, false, "enable model toolkit");
DEFINE_string(enable_protocol_list, "baidu_std nshead", "set protocol list");
} // predictor
} // paddle_serving
} // baidu
// Baidurpc
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_INTERNAL_FAILURE,
"Paddle Serving Framework Internal Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_MEM_ALLOC_FAILURE,
"Paddle Serving Memory Alloc Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OVERFLOW_FAILURE,
"Paddle Serving Array Overflow Error.");
BAIDU_REGISTER_ERRNO(baidu::paddle_serving::predictor::ERR_OP_INFER_FAILURE,
"Paddle Serving Op Inference Error.");
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_CONSTANT_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_CONSTANT_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
// GFLAGS Variables
DECLARE_bool(use_parallel_infer_service);
DECLARE_int32(el_log_level);
DECLARE_int32(idle_timeout_s);
DECLARE_int32(port);
DECLARE_string(workflow_path);
DECLARE_string(workflow_file);
DECLARE_string(inferservice_path);
DECLARE_string(inferservice_file);
DECLARE_string(logger_path);
DECLARE_string(logger_file);
DECLARE_string(resource_path);
DECLARE_string(resource_file);
DECLARE_bool(enable_mc_cache);
DECLARE_bool(enable_nshead_protocol);
DECLARE_string(nshead_protocol);
DECLARE_int32(max_concurrency);
DECLARE_int32(num_threads);
DECLARE_int32(reload_interval_s);
DECLARE_bool(enable_model_toolkit);
DECLARE_string(enable_protocol_list);
// STATIC Variables
static const char* START_OP_NAME = "startup_op";
// ERRORCODE
enum {
// internal error
ERR_INTERNAL_FAILURE = -5000,
ERR_MEM_ALLOC_FAILURE = -5001,
ERR_OVERFLOW_FAILURE = -5002,
// op error
ERR_OP_INFER_FAILURE = -5100,
// no error
ERR_OK = 0,
// internal ignore
ERR_IGNORE_FAILURE = 5000,
// op ignore
ERR_OP_IGNORE_FAILURE = 5100,
};
static const size_t MAX_WORKFLOW_NUM_IN_ONE_SERVICE = 20;
static const uint32_t DEFAULT_CACHE_CAPACITY = 10000;
static const uint32_t DEFAULT_CACHE_UNITSIZE = 8192;
} // predictor
} // paddle_serving
} // baidu
#endif
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_INNER_COMMON_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_INNER_COMMON_H
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>
#include <strings.h>
#include <getopt.h>
#include <google/protobuf/text_format.h>
#include <boost/unordered_map.hpp>
#include <boost/function.hpp>
#include <boost/algorithm/string.hpp> // for boost::split&trim
#include <baas-lib-c/baas.h>
#include <baas-lib-c/giano_mock_helper.h>
#include <gflags/gflags.h>
#include <base/logging.h>
#include <base/time.h>
#include <base/object_pool.h>
#include <baidu/rpc/channel.h>
#include <baidu/rpc/server.h>
#include <baidu/rpc/policy/giano_authenticator.h>
#include <bthread.h>
#include <error.h>
#include "Configure.h"
#include <comlog/comlog.h>
#include "common/utils.h"
#include "common/types.h"
#include "common/constant.h"
#endif
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_MACROS_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_MACROS_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
#ifndef CATCH_ANY_AND_RET
#define CATCH_ANY_AND_RET(errno) \
catch (...) { \
LOG(FATAL) << "exception catched"; \
return errno; \
}
#endif
#ifdef USE_PTHREAD
#define THREAD_T pthread_t
#define THREAD_KEY_T pthread_key_t
#define THREAD_MUTEX_T pthread_mutex_t
#define THREAD_KEY_CREATE pthread_key_create
#define THREAD_SETSPECIFIC pthread_setspecific
#define THREAD_GETSPECIFIC pthread_getspecific
#define THREAD_CREATE pthread_create
#define THREAD_CANCEL pthread_cancel
#define THREAD_JOIN pthread_join
#define THREAD_KEY_DELETE pthread_key_delete
#define THREAD_MUTEX_INIT pthread_mutex_init
#define THREAD_MUTEX_LOCK pthread_mutex_lock
#define THREAD_MUTEX_UNLOCK pthread_mutex_unlock
#define THREAD_MUTEX_DESTROY pthread_mutex_destroy
#define THREAD_COND_T pthread_cond_t
#define THREAD_COND_INIT pthread_cond_init
#define THREAD_COND_SIGNAL pthread_cond_signal
#define THREAD_COND_WAIT pthread_cond_wait
#define THREAD_COND_DESTROY pthread_cond_destroy
#else
#define THREAD_T bthread_t
#define THREAD_KEY_T bthread_key_t
#define THREAD_MUTEX_T bthread_mutex_t
#define THREAD_KEY_CREATE bthread_key_create
#define THREAD_SETSPECIFIC bthread_setspecific
#define THREAD_GETSPECIFIC bthread_getspecific
#define THREAD_CREATE bthread_start_background
#define THREAD_CANCEL bthread_stop
#define THREAD_JOIN bthread_join
#define THREAD_KEY_DELETE bthread_key_delete
#define THREAD_MUTEX_INIT bthread_mutex_init
#define THREAD_MUTEX_LOCK bthread_mutex_lock
#define THREAD_MUTEX_UNLOCK bthread_mutex_unlock
#define THREAD_MUTEX_DESTROY bthread_mutex_destroy
#define THREAD_COND_T bthread_cond_t
#define THREAD_COND_INIT bthread_cond_init
#define THREAD_COND_SIGNAL bthread_cond_signal
#define THREAD_COND_WAIT bthread_cond_wait
#define THREAD_COND_DESTROY bthread_cond_destroy
#endif
} // predictor
} // paddle_serving
} // baidu
#endif
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H
namespace baidu {
namespace paddle_serving {
namespace predictor {
typedef char* Byte;
typedef size_t Size;
typedef const char* ConstByte;
struct Sequence {
Byte data;
Size size;
};
} // predictor
} // paddle_serving
} // baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_TYPES_H
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_UTILS_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_UTILS_H
#include "common/macros.h"
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
class TimerFlow {
public:
static const int MAX_SIZE = 1024;
TimerFlow() {
init();
}
void init() {
_csize = 0;
_name = NULL;
_started = false;
_auto = false;
}
TimerFlow(const char* name) : _csize(0), _name(name) {
_last = _start = base::cpuwide_time_us();
_auto = true;
_started = true;
}
void set_name(const char* name) {
_name = name;
}
void start() {
_last = _start = base::cpuwide_time_us();
_started = true;
}
bool check(const char* tag) {
if (!_started) {
LOG(WARNING) << "Timer not started yet!";
return false;
}
uint64_t now = base::cpuwide_time_us();
if (!appendf("%s:%lu|", tag, now - _last)) {
LOG(WARNING)
<< "Failed check timer: " << _name
<< ", value = [" << tag << ":"
<< (now - _last) << "]!" << noflush;
return false;
}
_last = now;
return true;
}
std::string info() {
return std::string(_buf);
}
void end() {
uint64_t now = base::cpuwide_time_us();
if (!appendf("total:%lu", now - _start)) {
LOG(WARNING) << "Failed dump time_info[" << _name << "]";
}
_started = false;
}
~TimerFlow() {
if (!_auto) {
return;
}
uint64_t now = base::cpuwide_time_us();
if (appendf("total:%lu,%s", now - _start, _name)) {
LOG(INFO)
<< " " << _name << "_tc=[" << _buf << "]";
} else {
LOG(WARNING) << "Failed dump time_info[" << _name << "]";
}
}
private:
bool appendf(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
try {
int bytes = vsnprintf(_buf + _csize, MAX_SIZE - _csize, fmt, ap);
if (bytes >= MAX_SIZE - _csize || bytes < 0) {
LOG(WARNING) << "Overflow when appendf!" << noflush;
return false;
}
_csize += bytes;
} CATCH_ANY_AND_RET(false);
va_end(ap);
return true;
}
private:
char _buf[1024];
int _csize;
uint64_t _start;
uint64_t _last;
const char* _name;
bool _started;
bool _auto;
};
template<bool flag>
struct derived_from_message {};
template<typename T, typename TBase>
class TIsDerivedFromB {
private:
static uint8_t check(TBase*) {
return 1;
}
static uint32_t check(void*) {
return 0;
}
public:
enum {
// function call cannot apprear in a constant-expression
RESULT = (sizeof(uint8_t) == sizeof(check((T*)(NULL)))),
};
};
template<typename TBase>
class IsDerivedFrom {
private:
static bool check(TBase*) {
return true;
}
static bool check(void*) {
return false;
}
public:
template<typename T>
static bool yes(T* x) {
return check(x);
}
};
} // predictor
} // paddle_serving
} // baidu
#endif
workflow_type: Sequence
[@Node]
name: dense_echo_op
type: DenseEchoOp
workflow_type: Sequence
[@Node]
name: echo_op
type: CommonEchoOp
workflow_type: Sequence
[@Node]
name: fluid_dense_op
type: FluidDenseOp
--port=8010
--noenable_rd_dict
workflow_type: Sequence
[@Node]
name: image_reader_op
type: ImageReaderOp
[@Node]
name: image_classify_op
type: ImageClassifyOp
[.@Depend]
name: image_reader_op
mode: RO
[@Node]
name: write_json_op
type: WriteJsonOp
[.@Depend]
name: image_classify_op
mode: RO
COMLOG_LEVEL : 16
COMLOG_DEVICE_NUM : 2
COMLOG_DEVICE0 : TRACE
COMLOG_DEVICE1 : WARNING
TRACE_OPEN : 1
TRACE_TYPE : FILE
TRACE_PATH : ./log
TRACE_NAME : pdserving.log
TRACE_SYSLEVEL : 0
TRACE_SELFLEVEL : NOTICE,TRACE,DEBUG
TRACE_SIZE : 4000
TRACE_SPLITE_TYPE : DATECUT
TRACE_DATA_CUTTIME : 60
TRACE_DATA_CRONOCUT : 1
TRACE_RESERVED1 : %Y%m%d%H
TRACE_LAYOUT : %L: %A %T %R
TRACE_QUOTA_DAY : 2
WARNING_OPEN : 1
WARNING_TYPE : FILE
WARNING_PATH : ./log
WARNING_NAME : pdserving.log.wf
WARNING_SYSLEVEL : 0
WARNING_SELFLEVEL : WARNING,FATAL
WARNING_SIZE : 4000
WARNING_SPLITE_TYPE : DATECUT
WARNING_DATA_CUTTIME : 60
WARNING_DATA_CRONOCUT : 1
WARNING_RESERVED1 : %Y%m%d%H
WARNING_LAYOUT : %L: %A %T %R
WARNING_QUOTA_DAY : 2
#[@MODEL]
#MODEL_TYPE : PADDLE_FLUID
#MODEL_NAME : fluid_model_test
#TRAINER_CONFIG: ./data/model/paddle/fluid/word2vec.config
#TRAINER_PARAMETER: ./data/model/paddle/fluid/word2vec.dict
#RELOAD_CHECK_FILE : ./data/model/paddle/fluid_reload_flag
#MODEL_TIME_FILE : ./data/model/paddle/fluid_time_file
[@MODEL]
MODEL_TYPE : PADDLE_FLUID
MODEL_NAME : image_classification_resnet
#TRAINER_PARAMETER: ./data/model/paddle/fluid/image_classification_resnet
#TRAINER_PARAMETER: ./data/model/paddle/fluid/se_resnext50
#TRAINER_PARAMETER: ./data/model/paddle/fluid/resnet_50
TRAINER_PARAMETER: ./data/model/paddle/fluid/SE_ResNeXt50_32x4d
RELOAD_CHECK_FILE : ./data/model/paddle/fluid_reload_flag
MODEL_TIME_FILE : ./data/model/paddle/fluid_time_file
[@rd_dict]
NODE_TYPE:SINGLE
DICT_NAME: demo
DICT_FILE: ./rd_dict.dict
[@rd_dict]
NODE_TYPE:SINGLE
DICT_NAME: demo1
DICT_FILE: ./rd_dict.dict
# model toolkit conf
model_manager_path: ./conf
model_manager_file: model_toolkit.conf
rd_dict_conf_path: ./conf
rd_dict_conf_file: rd_dict.conf
[@Service]
name: BuiltinDenseFormatService
@workflow: workflow1
# 开启后,框架会根据请求中request_field_key字段的值,将请求映射到对应workflow执行
# enable_map_request_to_workflow: 1
# request_field_key: cmd
# 若请求中request_field_key字段的值与request_field_value相等,则执行该workflow
# request_field_value不能重复
# @workflow: workflow1
# @request_field_value: /titleq/wise/ctr
# @workflow: workflow2
# @request_field_value: /titleq/pc/ctr
# @workflow: workflow3
# @request_field_value: /titleq/xxx/ctr
[@Service]
name: BuiltinSparseFormatService
@workflow: workflow2
[@Service]
name: BuiltinTestEchoService
@workflow: workflow3
workflow_type: Sequence
[@Node]
name: sparse_echo_op
type: SparseEchoOp
[.@Depend]
name: startup_op
mode: RO
[@Engine]
Name : FCR_WISE_NONLINEAR_DNN_MODEL
[.@Version]
Type : ABACUS_DNN
VersionFile: ./data/abacus/version
VersionType: abacus_version
ReloadableMeta: ./data/abacus/join_model_nonlinear
ReloadableType: timestamp_ne
ModelDataPath: ./conf/cvm_model/dnn_nonlinear_model.conf
RuntimeThreadNum: 0
BatchInferSize: 0
EnableBatchAlign: 0
[.@Version]
Type : ABACUS_DNN
VersionFile: ./data/abacus/version
VersionType: abacus_version
ReloadableMeta: ./data/abacus/join_model_nonlinear
ReloadableType: timestamp_ne
ModelDataPath: ./conf/cvm_model/ubm_nonlinear_dnn_model.conf
RuntimeThreadNum: 0
BatchInferSize: 0
EnableBatchAlign: 0
[@Engine]
Name : FCR_NONLINEAR_DNN_MT_MODEL
[.@Version]
Type : ABACUS_DNN
VersionFile: ./data/abacus/version
VersionType: abacus_version
ReloadableMeta: ./data/abacus/join_model_nonlinear
ReloadableType: timestamp_ne
ModelDataPath: ./conf/cvm_model/ubm_mt_nonlinear_dnn_model.conf
RuntimeThreadNum: 0
BatchInferSize: 0
EnableBatchAlign: 0
[@Engine]
Name : FCR_MT_MODEL_NO_FPGA
Type : ABACUS_DNN
ReloadableMeta: ./data/abacus/join_model_nonlinear
ReloadableType: timestamp_ne
ModelDataPath: ./conf/cvm_model/ubm_mt_no_fpga_dnn_model.conf
RuntimeThreadNum: 0
BatchInferSize: 0
EnableBatchAlign: 0
[@Engine]
Name : FCR_NONLINEAR_DNN_AD_MODEL
Type : ABACUS_DNN
ReloadableMeta: ./data/abacus/join_model_nonlinear
ReloadableType: timestamp_ne
ModelDataPath: ./conf/cvm_model/ubm_ad_nonlinear_dnn_model.conf
RuntimeThreadNum: 0
BatchInferSize: 0
EnableBatchAlign: 0
[@Workflow]
name: workflow1
path: ./conf
file: dense_dag.conf
[@Workflow]
name: workflow2
path: ./conf
file: sparse_dag.conf
[@Workflow]
name: workflow3
path: ./conf
file: echo_dag.conf
#!/bin/bash
# 启动路径
start_path="$(pwd)"
sh build.sh stop
# 定位到cts目录下
cd "$(dirname "$0")"/
if [[ "x"$@ = x*--module_name=* ]]
then
all_arg=$@
tmp=${all_arg##*--module_name=}
mod_name=${tmp%% *}
sed -i "/^run_mod=/s/run_mod.*/run_mod=$mod_name/" install-all.conf
else
sed -i "/^run_mod=/s/run_mod.*/run_mod=lr_engine/" install-all.conf
fi
env_num=`grep env_num install-all.conf | awk -F '=' '{print $2}'`
# 设置环境变量
export PATH="$(pwd)"/frame/tools/python27/bin:$PATH
export PYTHONPATH="$(pwd)"
alias | grep "alias cp=" >/dev/null
if [ $? -eq 0 ];then
unalias cp
fi
# 回到启动路径,执行main.py
cd "$start_path"
mem_free=`free -m | awk '{print $4}'| head -3 | awk 'END{print}'`
let thread_max=$mem_free/5000
if [ $thread_max -eq 0 ];then
echo "系统内存不足, 不能运行任何case"
exit 1
fi
if [ $thread_max -lt $env_num ];then
env_num=$thread_max
echo "目前系统内存最多支持运行$env_num个线程"
fi
temp_args="--paral=$env_num"
python "$(dirname "$0")"/control/main.py $temp_args $@
ret=$?
sh build.sh stop
if [ $ret -ne 0 ]
then
exit 1
fi
#!/bin/bash
function cfont()
{
while (($#!=0))
do
case $1 in
-b)
echo -ne " ";
;;
-t)
echo -ne "\t";
;;
-n)
echo -ne "\n";
;;
-black)
echo -ne "\033[30m";
;;
-red)
echo -ne "\033[31m";
echo -ne "\033[1m";
;;
-green)
echo -ne "\033[32m";
echo -ne "\033[1m";
;;
-yellow)
echo -ne "\033[33m";
;;
-blue)
echo -ne "\033[34m";
echo -ne "\033[1m";
;;
-purple)
echo -ne "\033[35m";
;;
-cyan)
echo -ne "\033[36m";
echo -ne "\033[1m";
;;
-white|-gray)
echo -ne "\033[37m";
;;
-reset)
echo -ne "\033[0m";
;;
-h|-help|--help)
echo "Usage: cfont -color1 message1 -color2 message2 ...";
echo "eg: cfont -red [ -blue message1 message2 -red ]";
;;
*)
echo -ne "$1"
;;
esac
shift
done
echo -ne "\033[0m";
}
cur_path=`pwd`
work_root=${cur_path%%/baidu/*}
CITOOLS="${work_root}/baidu/fengchao-qa/citools"
if [ ! -e ${CITOOLS}/lib/localbuild_lib.sh ];then
cfont -blue "=============== localbuild_lib.sh is not exist, downloading ...================" -n
git clone ssh://git@icode.baidu.com:8235/baidu/fengchao-qa/citools $CITOOLS >/dev/null
fi
source ${CITOOLS}/lib/localbuild_lib.sh
function get_framework_baseenv()
{
onlineFtp="ftp://tc-orp-app2.tc.baidu.com/home/heqing"
wgetOptions="--tries=3 --retry-connrefused -r -l0 -nv --limit-rate=50m -nH"
cfont -blue "##################################################" -n ;
cfont -blue "### build pdserving_framework xts base env ###" -n ;
cfont -blue "##################################################" -n ;
cfont -reset;
run_path="$(grep "run_path" "./install-all.conf" | cut -d "=" -f 2)"
cd $run_path
wget $wgetOptions --cut-dirs=4 "$onlineFtp"/scmbak/pdserving/framework_tester -o wget.log
ret=$?
retry=0
while [[ $retry -lt 3 ]]; do
if [[ $ret -eq 0 ]];then
break;
fi
wget $wgetOptions --cut-dirs=4 "$onlineFtp"/scmbak/pdserving/framework_tester -o wget.log
ret=$?
((retry++))
done
[[ $ret -ne 0 ]] && return 1
cfont -blue "[XTS] " -green "[ finish download: pdserving-framework ]" -n
cd -
return 0
}
# 搭建cts环境
function build_ctsenv()
{
# 搭建cts环境
if [ -z $1 ]; then
ENV_NUM=0
else
ENV_NUM=$1
fi
#更新安装配置设置
hostname=$(uname -n)
username="$(echo "`whoami`" | awk '{print $1}')"
LIBPATH=${PWD}/lib
echo "libpath is : $LIBPATH"
# 生成install-all.conf
{
echo "[config]"
echo "host=$hostname"
echo "user=$username"
echo "passwd=CAPHI2008"
echo "env_file=${PWD}/envfile"
echo "lib_path=$LIBPATH"
echo "run_path=${PWD}/run_env"
echo "env_num=$ENV_NUM"
} > ./install-all.conf
# 安装cts环境
{
cfont -blue "============= predictor env install =============" -n
rm -rf run_env && mkdir -p run_env
echo "current path is :${cur_path}"
#get_framework_baseenv
#if [ $? -ne 0 ]; then
# echo "pdserving-framework is not ready!!!"
# exit 1
#fi
mkdir -p run_env/predictor/bin
mkdir -p run_env/predictor/conf
# 拷贝pdserving到环境中
[[ -e ../output/bin/pdserving ]] && cp -rf ../output/bin/pdserving run_env/predictor/bin/predictor
[[ -e ../output/lib ]] && cp -rf ../output/lib/ run_env/predictor/
[[ -e ../conf ]] && cp -rf ../conf/* run_env/predictor/conf/
#搭建并行环境
if [ $ENV_NUM -ne 0 ]; then
cfont -blue "=============== build multi env ===============" -n
mkdir -p ${PWD}/run_env/1
mv -f ${PWD}/run_env/framework_tester ${PWD}/run_env/1/framework_tester
mv -f ${PWD}/run_env/model ${PWD}/run_env/1/model
mv -f ${PWD}/run_env/dict ${PWD}/run_env/1/dict
for ((i=2; i<=$ENV_NUM; i=i+1))
do
cp -rf ${PWD}/run_env/1 ${PWD}/run_env/$i
done
fi
}
#安装XTS环境
{
echo "now pwd is :`pwd`"
cfont -blue "=============== XTS(cts) install ================" -n
svn co https://svn.baidu.com/general-test/trunk/xts/frame frame> /dev/null
svn co https://svn.baidu.com/general-test/trunk/xts/im/core/control control>/dev/null
echo "now dir list is :`ls`"
cd lib
svn co https://svn.baidu.com/general-test/trunk/xts/im/core/lib/commonlib commonlib>/dev/null
cd -
}
cfont -blue "[XTS] " -green "[ finish XTS(cts) install ]" -n
onlineFtp="ftp://tc-orp-app2.tc.baidu.com/home/heqing"
wgetOptions="--tries=3 --retry-connrefused -r -l0 -nv --limit-rate=50m -nH"
#安装bidinfo 和基础protolib
{
cd lib
[[ -e bidinfo ]] && rm -rf bidinfo
[[ -e protolib ]] && rm -rf protolib
[[ -e pluginlib ]] && rm -rf pluginlib
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/bidinfo -o wget.log
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/protolib -o wget.log
wget $wgetOptions --cut-dirs=6 "$onlineFtp"/scmbak/common_lib/pdserving_cts/framework/pluginlib -o wget.log
cd -
}
#安装protolib
{
cfont -blue "============== protoc install ==================" -n
[[ -e protoc_tools ]] && rm -rf protoc_tools
wget $wgetOptions --cut-dirs=5 "$onlineFtp"/scmbak/common_lib/pdserving_cts/protoc_tools -o wget.log
[[ -e ../proto ]] && cp -rf ../proto/* ./protoc_tools/proto/
cd protoc_tools
chmod +x ./protobuf-2.4.1/bin/protoc
chmod +x ./protobuf-2.4.1/lib/*
[[ -e protolib ]] && rm -rf protolib
mkdir ./protolib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:`pwd`/protobuf-2.4.1/lib
./protobuf-2.4.1/bin/protoc -I=./proto --python_out=./protolib/ ./proto/*.proto
cd -
cp ./protoc_tools/protolib/*.py ./lib/protolib/
}
cfont -reset
return 0
}
function get_pid
{
local prog=$1
local user=$2
local prog_path=$3
local ret=-1
local trash_path="/home/$(echo "`whoami`" | awk '{print $1}')/.__trash/"
pids=`pgrep $prog -u $user`
for pid in $pids
do
tmp_path=`ls -l /proc/$pid/exe 2>/dev/null | awk '{print $NF}'`
if [ "$tmp_path" == "$prog_path" ] || [ ! -e $tmp_path ] || [ 0 == `echo $tmp_path | grep -qs $trash_path;echo $?` ]
then
echo $pid
ret=0
fi
done
return $ret
}
function kill_prog()
{
name=$1
username=$2
prog_path=$3
pids=`get_pid $name $username $prog_path`
echo $pids>/dev/null
if [ $? -eq 0 ] ;then
for pid in $pids
do
#echo "$name,$pid"
kill -9 $pid
done
fi
}
function kill_predictor_prog()
{
username="$(echo "`whoami`" | awk '{print $1}')"
if [ -f install-all.conf ]
then
env_num=`grep env_num= install-all.conf|awk -F '=' '{print $2}'`
else
env_num=0
fi
for ((i=0; i<=$env_num; i=i+1))
do
if [ $i -eq 0 ]
then
run_path="${PWD}/run_env"
else
run_path="${PWD}/run_env/$i"
fi
kill_prog predictor $username $run_path/framework_tester/bin/predictor
done
}
function clean_ctsenv()
{
rm -rf install-all.conf ccover
rm -rf run_env fail_env output log frame control lib/commonlib lib/protolib
return 0
}
if [ $# -eq 1 ] && [ $1 == "clean" ]
then
clean_ctsenv
exit 0
fi
if [ $# -eq 1 ] && [ $1 == "stop" ]
then
kill_predictor_prog
exit 0
fi
clean_ctsenv
build_ctsenv "$1"
exit $?
#!/usr/bin/env python
# -*- coding:gbk -*-
"""
case created by templete
"""
import sys
sys.path.append(r'./lib/protolib')
print("sys path is : %s " % str(sys.path))
import os
import json
import commands
from lib.protolib.dense_service_pb2 import Request
from lib.protolib.dense_service_pb2 import Response
from lib.pluginlib.plugin_util import Util as ut
from lib.pluginlib.plugin_case import PluginCase
from lib.pluginlib.plugin_module import PluginModule
from lib.pluginlib.plugin_apistub import ApiStub
class TestDenseService(PluginCase):
"""test wtitleq case class"""
OWNER="zhangwenbo03"
quick=['ALL']
low=[]
daily=[]
ignorelist=[]
RESTART=True
def setUp(self):
"""setup something before run case"""
pass
def tearDown(self):
"""tear down after run case"""
self.t.stop()
print "stop finished"
pass
def testDemoCase(self):
"""demo case"""
req = Request()
denseIns = req.instances.add()
denseIns.features.append(10)
denseIns.features.append(13)
denseIns.features.append(200)
service = "BuiltinDenseFormatService"
type = "debug"
ut_obj = ut()
dict_val = ut_obj.pb2dict(req)
json_val = ut_obj.dict2json(dict_val)
self.t.restart()
self.t.tester.sendJsonData(json_val, service, type)
print "execute demo case"
"""plugin register """
from lib.plugin_tester import *
#!/usr/bin/env python
# -*- coding:gbk -*-
"""
注册类:RegxxxConfData,RegxxxReq,RegxxxXbox,RegxxxAd,xxx为组件名
"""
from lib.pluginlib.plugin_common import ConfData
from lib.pluginlib.plugin_common import TreeConfData
from lib.pluginlib.plugin_common import CommonIndex
class RegpredictorConfData(object):
"""
注册wtitleq组件的conf和data文件
"""
def __init__(self, path):
self.path = path
self.conf = {}
self.data = {}
self.conf['ub'] = ConfData(path=self.path + "/conf/ub.conf", connect_flag=":")
self.data['lr_model'] = CommonIndex(path=self.path + \
'/data/lr-model/wtitleq_model_file.sign',
col_list=['key', 'value'],
format='B')
class RegpredictorReq(object):
"""
注册wtitleq组件的默认请求
"""
def __init__(self):
self.plugin_term = {}
cmd_tag = 'cmd_tag0'
query_schema_list = []
query_value_list = []
pair_schema_list = ['query',
'wadptid',
'wbwsid',
'omit_buf',
'title',
'desc',
'cmatch',
'bidword',
'dynamic_new_title']
pair_value_list = ['鲜花',
'0',
'3',
'鲜花',
'鲜花%2C本地实体鲜花店100%25保证%21',
'鲜花品质100%25%2C主城最快2小时送到%2C全天24时在线订花%21市区内免费送花上门%21鲜%2E%2E',
'223',
'鲜花',
'美丽鲜花']
cmd_str = '/titleq/wise/ctr'
req_term = {"query_schema": query_schema_list,
"pair_schema": pair_schema_list,
"query_value": query_value_list,
"pair_value": pair_value_list,
"cmd": cmd_str}
self.plugin_term.update({cmd_tag: req_term})
self.plugin_list = self.plugin_term.keys()
class RegpredictorNewXbox(object):
"""
注册wtitleq组件的xbox
"""
def __init__(self):
self.need_xbox = True
self.stub_conf = 'xboxstub.conf'
self.stub_name = 'xboxstub'
self.conf_list = ['xbox-wtitleq_pegasus.conf']
class RegpredictorAd(object):
"""
注册wtitleq组件是否需要构造广告库
"""
def __init__(self):
self.need_adstub = False
#pragma once
#include <errno.h>
#include <vector>
#include <deque>
#include <base/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include "framework/infer_data.h"
#include "framework/memory.h"
#include <boost/function.hpp>
namespace im {
namespace bsf {
template<>
struct Task<baidu::paddle_serving::predictor::Tensor,
baidu::paddle_serving::predictor::Tensor> {
typedef Task<baidu::paddle_serving::predictor::Tensor,
baidu::paddle_serving::predictor::Tensor> TaskT;
typedef baidu::paddle_serving::predictor::Tensor Tensor;
typedef baidu::paddle_serving::predictor::Tensor InType;
typedef baidu::paddle_serving::predictor::Tensor OutType;
typedef baidu::paddle_serving::predictor::BatchTensor BatchTensor;
typedef baidu::paddle_serving::predictor::BatchTensor InArrayT;
typedef baidu::paddle_serving::predictor::BatchTensor OutArrayT;
struct Segment {
Segment(void* p, size_t b, size_t s)
: ptr(p), begin(b), size(s) {}
void* ptr;
size_t begin;
size_t size;
};
int read_fd;
int write_fd;
pid_t owner_tid;
const InArrayT* in;
OutArrayT* out;
size_t rem;
size_t size;
base::atomic<size_t> index;
const BatchTensor* get(bool is_in) const {
if (is_in) {
return in;
} else {
return out;
}
}
BatchTensor* get(bool is_in) {
if (is_in) {
return const_cast<BatchTensor*>(in);
} else {
return out;
}
}
Task() {
read_fd = -1;
write_fd = -1;
owner_tid = -1;
in = NULL;
out = NULL;
rem = -1;
size = -1;
index.store(0, base::memory_order_relaxed);
}
};
template<>
class BatchTasks<Task<
baidu::paddle_serving::predictor::Tensor,
baidu::paddle_serving::predictor::Tensor> > {
public:
typedef baidu::paddle_serving::predictor::Tensor Tensor;
typedef baidu::paddle_serving::predictor::Tensor InType;
typedef baidu::paddle_serving::predictor::Tensor OutType;
typedef baidu::paddle_serving::predictor::DataBuf DataBuf;
typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
typedef Task<baidu::paddle_serving::predictor::Tensor,
baidu::paddle_serving::predictor::Tensor> TaskT;
typedef TaskMeta<TaskT> TaskMetaT;
typedef TaskT::InArrayT InArrayT;
typedef TaskT::OutArrayT OutArrayT;
BatchTasks(size_t batch_size, bool batch_align = false)
: _batch_size(batch_size)
, _rem_size(batch_size)
, _batch_align(batch_align) {
_batch_in.clear();
_batch_out.clear();
_tasks.clear();
}
~BatchTasks() {
_batch_in.clear();
_batch_out.clear();
_tasks.clear();
}
static bool check_valid(
const InArrayT& in, OutArrayT& out, bool align) {
if (align) {
if (out.count() <= 0 || out.size() <= 0) {
CFATAL_LOG("Out tensor is empty, when aligned");
return false;
}
if (out.size() != in.size()) {
CFATAL_LOG("In/Out tensor size not eq: %ld!=%ld",
out.size(), in.size());
return false;
}
for (size_t fi = 0, shape0 = 0; fi < out.count(); ++fi) {
if (!out[fi].valid()) {
CFATAL_LOG("Out[%ld] tensor not valid", fi);
return false;
}
if (out.size() != out[fi].shape0()) {
CFATAL_LOG("Shape0 not consistency, %ld!=%ld, %ld",
out.size(), out[fi].shape0(), fi);
return false;
}
}
}
return true;
}
size_t append_task(TaskT* task) {
size_t add = std::min(task->rem, _rem_size);
if (!_batch_align) {
add = task->rem;
}
TaskMetaT tm(task, task->in->size() - task->rem, add);
_tasks.push_back(tm);
task->rem -= add;
_rem_size -= add;
return _rem_size;
}
void merge_tasks() {
merge_input();
merge_output();
}
void merge_input() {
if (_tasks.size() <= 0 || _tasks[0].task->in->count() <= 0) {
return ;
}
if (_tasks.size() == 1 && !_batch_align) {
TaskMetaT& tm = _tasks[0];
_batch_in = *(tm.task->in);
return ;
}
merge_tensor(true);
}
void merge_output() {
if (_batch_align) {
if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) {
return ;
}
}
if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) {
return ;
}
TaskMetaT& tm = _tasks[0];
if (_tasks.size() == 1 && !_batch_align) {
_batch_out = *(tm.task->out);
return ;
}
if (tm.task->out->size() <= 0) {
// shape is empty
_batch_out = *(tm.task->out);
return ;
}
if ((*tm.task->out)[0].data.data() == 0
|| (*tm.task->out)[0].data.size() == 0) {
_batch_out = *(tm.task->out);
return ;
}
merge_tensor(false);
}
void merge_tensor(bool is_in) {
// accumulate batch size from fetched tasks
size_t batch_size = 0;
for (size_t ti = 0; ti < _tasks.size(); ++ti) {
TaskMetaT& tm = _tasks[ti];
size_t add = tm.end - tm.begin;
batch_size += add;
}
// merge all instanses in each tensor data
size_t tensor_count = _tasks[0].task->get(is_in)->count();
for (size_t fi = 0; fi < tensor_count; ++fi) {
const Tensor& head = (*(_tasks[0].task->get(is_in)))[fi];
Tensor batch_tensor;
batch_tensor.name = head.name;
batch_tensor.type = head.type;
batch_tensor.shape.push_back(batch_size);
size_t ins_ele_count = 1;
for (size_t si = 1; si < head.shape.size(); ++si) {
batch_tensor.shape.push_back(head.shape[si]);
ins_ele_count *= head.shape[si];
}
size_t tensor_ele_count = ins_ele_count * batch_size;
size_t ins_byte = ins_ele_count * head.ele_byte();
size_t tensor_byte = tensor_ele_count * head.ele_byte();
void* data_buf
= MempoolWrapper::instance().malloc(tensor_byte);
if (!data_buf) {
CFATAL_LOG("Malloc failed, size: %ld", tensor_byte);
return ;
}
size_t data_byte = 0;
for (size_t ti = 0; ti < _tasks.size(); ++ti) {
TaskMetaT& tm = _tasks[ti];
size_t acc_byte = ins_byte * (tm.end - tm.begin);
if (data_byte + acc_byte > tensor_byte) {
CFATAL_LOG("Invalid bytes: %ld + %ld >= %ld",
data_byte, acc_byte, tensor_byte);
return ;
}
const Tensor& tensor = (*(tm.task->get(is_in)))[fi];
memcpy(data_buf + data_byte,
tensor.data.data() + tm.begin * ins_byte,
acc_byte);
data_byte += acc_byte;
}
if (data_byte != tensor_byte) {
CFATAL_LOG("Invalid tensor byte: %ld != %ld",
data_byte, tensor_byte);
return ;
}
batch_tensor.data = DataBuf(data_buf, tensor_byte);
if (is_in) {
_batch_in.push_back(batch_tensor);
} else {
_batch_out.push_back(batch_tensor);
}
}
LOG(TRACE) << "merge input(" << is_in << ") samples: "
<< batch_size << " from " << _tasks.size() << " pvs";
}
void notify_tasks() {
if (_batch_out.size() != _batch_in.size()) {
CFATAL_LOG("batch size not consistency: %ld != %ld",
_batch_out.size(), _batch_in.size());
return ;
}
size_t tensor_count = _batch_out.count();
size_t batch_size = _batch_out.size();
for (size_t fi = 0; fi < tensor_count; ++fi) {
const Tensor& tensor = _batch_out[fi];
size_t ins_byte = tensor.ele_byte();
for (size_t si = 1; si < tensor.shape.size(); ++si) {
ins_byte *= tensor.shape[si];
}
for (size_t ti = 0, bi = 0, add = 0;
ti < _tasks.size(); ++ti, bi += add) {
OutArrayT* dst = _tasks[ti].task->out;
add = _tasks[ti].end - _tasks[ti].begin;
size_t offset_src = ins_byte * bi;
size_t add_byte = add * ins_byte;
if (_batch_align) { // merge all batchs
size_t offset_dst = ins_byte * _tasks[ti].begin;
void* ptr = const_cast<void*>((*dst)[fi].data.data());
memcpy(ptr + offset_dst,
_batch_out[fi].data.data() + offset_src, add_byte);
} else { // overwrite
if (dst->count() <= 0) {
dst->push_back(_batch_out[fi]);
} else {
(*dst)[fi] = _batch_out[fi];
}
(*dst)[fi].shape[0] = add;
(*dst)[fi].data = DataBuf(
_batch_out[fi].data.data() + offset_src, add_byte);
}
}
}
for (size_t ti = 0; ti < _tasks.size(); ++ti) {
TaskT* task = _tasks[ti].task;
size_t begin = _tasks[ti].begin;
size_t end = _tasks[ti].end;
size_t add = end - begin;
size_t index = task->index.fetch_add(add);
if ((index + add) >= task->in->size()) {
char c = 0;
while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) {
;
}
base::return_object(task);
}
}
}
const typename TaskT::InArrayT& in() const {
return _batch_in;
}
typename TaskT::OutArrayT& out() {
return _batch_out;
}
size_t task_size() {
return _tasks.size();
}
private:
std::vector<TaskMetaT> _tasks;
InArrayT _batch_in;
OutArrayT _batch_out;
size_t _rem_size;
size_t _batch_size;
bool _batch_align;
};
} // namespace bsf
} // namespace im
#pragma once
#include <boost/bind.hpp>
#include <base/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include <sys/syscall.h>
namespace im {
namespace bsf {
template<typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) {
ComlogGuard logging_guard;
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
TaskExecutor<TaskT>* executor = static_cast<TaskExecutor<TaskT>*>(context->executor);
executor->work(context);
return NULL;
}
template<typename TaskT>
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
_stop = false;
if (!_thread_contexts.empty()) {
CWARNING_LOG("BSF has started");
return 0;
}
if (thread_num == 0) {
CFATAL_LOG("cannot init BSF with zero thread");
return -1;
}
ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
for (uint32_t i = 0; i < thread_num; ++i) {
contexts[i].executor = this;
if (_user_thread_contexts != NULL) {
contexts[i].user_thread_context = _user_thread_contexts[i];
}
int rc = THREAD_CREATE(
&contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
if (rc != 0) {
CFATAL_LOG("failed to create BSF worker thread: index=%u, rc=%d, errno=%d:%m",
i, rc, errno);
return -1;
}
_thread_contexts.push_back(&contexts[i]);
}
int init_timeout = init_timeout_sec * 1000 * 1000;
bool has_error = false;
bool has_timeout = true;
if (init_timeout == 0) {
has_timeout = false;
}
while (!has_timeout || init_timeout > 0) {
bool done = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
if (_thread_contexts[i]->init_status < 0) {
has_error = true;
break;
}
if (_thread_contexts[i]->init_status == 0) {
done = false;
}
}
if (has_error) {
CFATAL_LOG("BSF thread init error");
return -1;
}
if (done) {
CDEBUG_LOG("BSF thread init done");
return 0;
}
// 100ms
const int sleep_interval = 100 * 1000;
usleep(sleep_interval);
init_timeout -= sleep_interval;
}
CFATAL_LOG("BSF thread init timed out");
return -1;
}
template<typename TaskT>
void TaskExecutor<TaskT>::stop() {
_stop = true;
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_CANCEL(_thread_contexts[i]->tid);
}
for (size_t i = 0; i < _thread_contexts.size(); ++i) {
THREAD_JOIN(_thread_contexts[i]->tid, NULL);
}
_thread_contexts.clear();
}
template<typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
const InArrayT& in, OutArrayT& out) {
TaskT* task = base::get_object<TaskT>();
if (!task) {
LOG(FATAL) << "Failed get TaskT from object pool";
return TaskHandler<TaskT>::valid_handle();
}
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
LOG(FATAL) << "Invalid input & output";
return TaskHandler<TaskT>::valid_handle();
}
int fds[2];
int rc = pipe(fds);
if (rc != 0) {
CFATAL_LOG("call pipe() failed, errno=%d:%m", errno);
return TaskHandler<TaskT>::valid_handle();
}
task->read_fd = fds[0];
task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid);
task->in = &in;
task->out = &out;
task->rem = in.size();
task->size = in.size();
task->index.store(0, base::memory_order_relaxed);
AutoMutex lock(_mut);
_task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond);
return TaskHandler<TaskT>(*task);
}
template<typename TaskT>
bool TaskExecutor<TaskT>::fetch_batch(BatchTasks<TaskT>& batch) {
AutoMutex lock(_mut);
while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut);
}
if (_task_queue.empty()) {
CFATAL_LOG("invalid task queue!");
return false;
}
while (!_task_queue.empty()) {
TaskT* task = _task_queue.front();
size_t rem = batch.append_task(task);
if (task->rem <= 0) {
_task_queue.pop_front();
}
if (rem <= 0) break;
}
return true;
}
template<typename TaskT>
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
if (_thread_init_fn != NULL) {
if (_thread_init_fn(context->user_thread_context) != 0) {
CFATAL_LOG("execute thread init thunk failed, BSF thread will exit");
context->init_status = -1;
return -1;
} else {
CDEBUG_LOG("execute thread init thunk succeed");
}
}
context->init_status = 1;
while (!_stop) {
if (_thread_reset_fn != NULL) {
if (_thread_reset_fn(context->user_thread_context) != 0) {
CFATAL_LOG("execute user thread reset failed");
}
}
BatchTasks<TaskT> batch(_batch_size, _batch_align);
if (fetch_batch(batch)) {
batch.merge_tasks();
_fn(batch.in(), batch.out());
batch.notify_tasks();
}
}
return 0;
}
template<typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const InArrayT& in,
OutArrayT& out) {
TaskHandler<TaskT> handler = _executor.schedule(in, out);
if (handler.valid()) {
_task_owned = handler;
return true;
} else {
CFATAL_LOG("failed to schedule task");
return false;
}
}
template<typename InItemT, typename OutItemT>
void TaskManager<InItemT, OutItemT>::wait() {
char buffer[128];
while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0
&& errno == EINTR) {
;
}
close(_task_owned.read_fd);
close(_task_owned.write_fd);
_task_owned.read_fd = -1;
_task_owned.write_fd = -1;
return;
}
}
}
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_BSF_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_BSF_H
#include <errno.h>
#include <vector>
#include <deque>
#include <base/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include <boost/function.hpp>
namespace im {
namespace bsf {
static const size_t DEFAULT_BATCH_SIZE = 100;
template<typename InItemT, typename OutItemT>
struct Task {
typedef std::vector<InItemT> InArrayT;
typedef std::vector<OutItemT> OutArrayT;
typedef InItemT InType;
typedef OutItemT OutType;
typedef Task<InItemT, OutItemT> TaskT;
int read_fd;
int write_fd;
pid_t owner_tid;
const InArrayT* in;
OutArrayT* out;
size_t rem;
size_t size;
size_t batch_size() {
return in->size();
}
base::atomic<size_t> index;
Task() {
read_fd = -1;
write_fd = -1;
owner_tid = -1;
in = NULL;
out = NULL;
rem = -1;
size = -1;
index.store(0, base::memory_order_relaxed);
}
};
template<typename TaskT>
struct TaskMeta {
TaskMeta(TaskT* ptr, size_t start, size_t add)
: task(ptr)
, begin(start)
, end(start + add) {}
TaskT* task;
size_t begin;
size_t end;
};
template<typename TaskT>
class BatchTasks {
public:
typedef typename TaskT::InType InType;
typedef typename TaskT::OutType OutType;
typedef TaskMeta<TaskT> TaskMetaT;
BatchTasks(size_t batch_size, bool batch_align = true)
: _batch_size(batch_size)
, _rem_size(batch_size)
, _batch_align(batch_align) {
_batch_in.clear();
_batch_out.clear();
_tasks.clear();
}
~BatchTasks() {
_batch_in.clear();
_batch_out.clear();
_tasks.clear();
}
// synchronized operation
size_t append_task(TaskT* task) {
size_t add = std::min(task->rem, _rem_size);
if (!_batch_align) {
add = task->rem;
}
TaskMetaT tm(task, task->in->size() - task->rem, add);
_tasks.push_back(tm);
task->rem -= add;
_rem_size -= add;
return _rem_size;
}
static bool check_valid(
const typename TaskT::InArrayT& in,
typename TaskT::OutArrayT& out, bool align) {
(void)in;
(void)out;
(void)align;
return true;
}
void merge_tasks() {
for (size_t ti = 0; ti < _tasks.size(); ++ti) {
TaskMetaT& tm = _tasks[ti];
for (size_t vi = tm.begin; vi < tm.end; ++vi) {
_batch_in.push_back((*tm.task->in)[vi]);
_batch_out.push_back((*tm.task->out)[vi]);
}
}
}
void notify_tasks() {
if (_batch_out.size() != _batch_in.size()) {
CFATAL_LOG("batch size not consistency: %ld != %ld",
_batch_out.size(), _batch_in.size());
return ;
}
for (size_t ti = 0, bi = 0; ti < _tasks.size(); ++ti) {
TaskT* task = _tasks[ti].task;
size_t begin = _tasks[ti].begin;
size_t end = _tasks[ti].end;
size_t add = end - begin;
for (size_t oi = begin; oi < end; ++oi, ++bi) {
if (bi >= _batch_in.size()) {
CFATAL_LOG("batch index overflow: %d > %d",
bi, _batch_in.size());
return ;
}
(*task->out)[oi] = _batch_out[bi];
}
size_t index = task->index.fetch_add(add);
if ((index + add) >= task->in->size()) {
char c = 0;
while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) {
;
}
base::return_object(task);
}
}
}
const typename TaskT::InArrayT& in() const {
return _batch_in;
}
typename TaskT::OutArrayT& out() {
return _batch_out;
}
size_t task_size() {
return _tasks.size();
}
private:
std::vector<TaskMetaT> _tasks;
typename TaskT::InArrayT _batch_in;
typename TaskT::OutArrayT _batch_out;
size_t _rem_size;
size_t _batch_size;
bool _batch_align;
};
// BSF 任务句柄, 用来等待时指定任务列表
template<typename TaskT>
struct TaskHandler {
int read_fd;
int write_fd;
TaskHandler()
: read_fd(-1), write_fd(-1) {
// do nothing
}
TaskHandler(TaskT const& task)
: read_fd(task.read_fd)
, write_fd(task.write_fd) {
// do nothing
}
inline bool valid() const {
return read_fd >= 0 && write_fd >= 0;
}
static TaskHandler<TaskT>& valid_handle() {
static TaskHandler<TaskT> vhandle;
return vhandle;
}
};
template<typename TaskT>
class TaskExecutor;
template<typename InItemT, typename OutItemT>
class TaskManager;
template<typename TaskT>
struct ThreadContext {
TaskExecutor<TaskT>* executor;
void* user_thread_context;
THREAD_T tid;
int init_status;
ThreadContext()
: executor(NULL)
, user_thread_context(NULL)
, tid(-1), init_status(0) {
// do nothing
}
~ThreadContext() {
tid = -1;
executor = NULL;
user_thread_context = NULL;
init_status = 0;
}
};
template<typename TaskT>
class TaskExecutor {
public:
typedef typename TaskT::InType InType;
typedef typename TaskT::OutType OutType;
typedef typename TaskT::InArrayT InArrayT;
typedef typename TaskT::OutArrayT OutArrayT;
typedef std::vector<TaskT> TaskArrayT;
TaskExecutor()
: _stop(false)
, _thread_init_fn(NULL)
, _thread_reset_fn(NULL)
, _user_thread_contexts(NULL)
, _batch_size(DEFAULT_BATCH_SIZE)
, _batch_align(false)
, _fn(NULL) {
THREAD_MUTEX_INIT(&_mut, NULL);
THREAD_COND_INIT(&_cond, NULL);
_task_queue.clear();
}
~TaskExecutor() {
THREAD_MUTEX_DESTROY(&_mut);
THREAD_COND_DESTROY(&_cond);
}
static TaskExecutor<TaskT>* instance() {
static TaskExecutor<TaskT> singleton;
return &singleton;
}
void set_batch_size(size_t batch_size) {
_batch_size = batch_size;
}
void set_batch_align(size_t batch_align) {
_batch_align = batch_align;
}
void set_thread_init_fn(boost::function<int(void*)> init_fn, void** contexts = NULL) {
_thread_init_fn = init_fn;
_user_thread_contexts = contexts;
}
void set_thread_reset_fn(boost::function<int(void*)> reset_fn) {
_thread_reset_fn = reset_fn;
}
void set_thread_callback_fn(boost::function<void(const InArrayT&, OutArrayT&)> cb) {
_fn = cb;
}
int start(uint32_t thread_num, uint32_t init_timeout_sec = 0);
void stop();
static void* thread_entry(void* args);
private:
TaskExecutor(TaskExecutor<TaskT> const& other);
TaskExecutor* operator=(TaskExecutor<TaskT> const& other);
int work(ThreadContext<TaskT>* context);
TaskHandler<TaskT> schedule(const InArrayT&, OutArrayT&);
bool fetch_batch(BatchTasks<TaskT>& batch);
bool _stop;
// can't use boost::mutex, because some stupid macro
THREAD_MUTEX_T _mut;
THREAD_COND_T _cond;
std::deque<TaskT*> _task_queue;
boost::function<int(void*)> _thread_init_fn;
boost::function<int(void*)> _thread_reset_fn;
void** _user_thread_contexts;
std::vector<ThreadContext<TaskT>*> _thread_contexts;
friend class TaskManager<InType, OutType>;
boost::function<void(const InArrayT&, OutArrayT&)> _fn;
size_t _batch_size;
bool _batch_align;
};
template<typename InItemT, typename OutItemT>
class TaskManager {
public:
typedef Task<InItemT, OutItemT> TaskT;
typedef typename TaskT::InArrayT InArrayT;
typedef typename TaskT::OutArrayT OutArrayT;
explicit TaskManager(TaskExecutor<TaskT>& exe, size_t batch_size) : _executor(exe) {
}
TaskManager()
: _executor(*TaskExecutor<TaskT>::instance()) {
}
~TaskManager() {
wait();
}
bool schedule(const InArrayT& in, OutArrayT& out);
void wait();
inline void clear() {
wait();
}
private:
TaskExecutor<TaskT>& _executor;
TaskHandler<TaskT> _task_owned;
}; // class TaskManager
struct ComlogGuard {
ComlogGuard() {
com_openlog_r();
}
~ComlogGuard() {
com_closelog_r();
}
};
class AutoMutex {
public:
AutoMutex(THREAD_MUTEX_T& mut)
: _mut(mut) {
THREAD_MUTEX_LOCK(&_mut);
}
~AutoMutex() {
THREAD_MUTEX_UNLOCK(&_mut);
}
private:
THREAD_MUTEX_T& _mut;
};
} // namespace bsf
} // namespace im
#include "bsf-inl.h"
#include "bsf-inl-tensor.h"
#endif //BAIDU_PADDLE_SERVING_PREDICTOR_BSF_H
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_CHANNEL_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_CHANNEL_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
class Channel;
class Bus {
public:
Bus() {
clear();
}
int regist(const std::string& op, Channel* channel) {
std::pair<boost::unordered_map<std::string, Channel*>::iterator, bool> r
= _op_channels.insert(std::make_pair(op, channel));
if (!r.second) {
LOG(ERROR) << "Failed insert op&channel into bus:" << op;
return -1;
}
return 0;
}
Channel* channel_by_name(const std::string& op_name) {
typename boost::unordered_map<std::string, Channel*>::iterator it
= _op_channels.find(op_name);
if (it == _op_channels.end()) {
LOG(WARNING)
<< "Not found channel in bus, op_name:"
<< op_name << ".";
return NULL;
}
return it->second;
}
void clear() {
_op_channels.clear();
}
size_t size() const {
return _op_channels.size();
}
private:
boost::unordered_map<std::string, Channel*> _op_channels;
};
class Channel {
public:
Channel() {}
void init(uint32_t id, const char* op) {
_id = id;
_op = std::string(op);
clear_data();
}
void deinit() {
clear_data();
}
uint32_t id() const {
return _id;
}
const std::string& op() {
return _op;
}
int share_to_bus(Bus* bus) {
if (bus->regist(_op, this) != 0) {
LOG(FATAL)
<< "Failed regist channel[" << _op
<< "] to bus!" << noflush;
return -1;
}
return 0;
}
virtual void clear_data() = 0;
virtual void* param() = 0;
virtual const void* param() const = 0;
virtual google::protobuf::Message* message() = 0;
virtual const google::protobuf::Message* message() const = 0;
virtual Channel& operator=(const Channel& channel) = 0;
virtual std::string debug_string() const = 0;
private:
uint32_t _id;
std::string _op;
};
template<typename T>
class OpChannel : public Channel {
public:
OpChannel() {
}
void clear_data() {
_data.Clear();
}
void* param() {
return &_data;
}
const void* param() const {
return &_data;
}
google::protobuf::Message* message() {
return message_impl(derived_from_message<
TIsDerivedFromB<T, google::protobuf::Message>::RESULT>());
}
google::protobuf::Message* message_impl(derived_from_message<true>) {
return dynamic_cast<const google::protobuf::Message*>(&_data);
}
google::protobuf::Message* message_impl(derived_from_message<false>) {
LOG(FATAL) << "Current type: " << typeid(T).name()
<< " is not derived from protobuf.";
return NULL;
}
const google::protobuf::Message* message() const {
return message_impl(derived_from_message<
TIsDerivedFromB<T, google::protobuf::Message>::RESULT>());
}
const google::protobuf::Message* message_impl(derived_from_message<true>) const {
return dynamic_cast<const google::protobuf::Message*>(&_data);
}
const google::protobuf::Message* message_impl(derived_from_message<false>) const {
LOG(FATAL) << "Current type: " << typeid(T).name()
<< " is not derived from protobuf.";
return NULL;
}
Channel& operator=(const Channel& channel) {
_data = *(dynamic_cast<const OpChannel<T>&>(channel)).data();
return *this;
}
std::string debug_string() const {
return _data.ShortDebugString();
}
// functions of derived class
T* data() {
return &_data;
}
const T* data() const {
return &_data;
}
Channel& operator=(const T& obj) {
_data = obj;
return *this;
}
private:
T _data;
};
template<>
class OpChannel<google::protobuf::Message> : public Channel {
public:
OpChannel<google::protobuf::Message>() : _data(NULL) {
}
virtual ~OpChannel<google::protobuf::Message>() {
_data = NULL;
}
void clear_data() {
_data = NULL;
}
void* param() {
return const_cast<void*>((const void*)_data);
}
const void* param() const {
return _data;
}
google::protobuf::Message* message() {
return const_cast<google::protobuf::Message*>(_data);
}
const google::protobuf::Message* message() const {
return _data;
}
Channel& operator=(const Channel& channel) {
_data = channel.message();
return *this;
}
std::string debug_string() const {
if (_data) {
return _data->ShortDebugString();
} else {
return "{\"Error\": \"Null Message Ptr\"}";
}
}
// derived function imiplements
google::protobuf::Message* data() {
return const_cast<google::protobuf::Message*>(_data);
}
const google::protobuf::Message* data() const {
return _data;
}
OpChannel<google::protobuf::Message>& operator=(
google::protobuf::Message* message) {
_data = message;
return *this;
}
OpChannel<google::protobuf::Message>& operator=(
const google::protobuf::Message* message) {
_data = message;
return *this;
}
private:
const google::protobuf::Message* _data;
};
} // predictor
} // paddle_serving
} // baidu
#endif
#include "common/inner_common.h"
#include "framework/dag.h"
#include "op/op.h"
#include "framework/predictor_metric.h" // PredictorMetric
namespace baidu {
namespace paddle_serving {
namespace predictor {
Dag::Dag() {
_index_nodes.clear();
_name_nodes.clear();
_stages.clear();
}
Dag::~Dag() {
deinit();
}
int Dag::deinit() {
for (std::vector<DagStage*>::iterator iter = _stages.begin(); iter != _stages.end(); ++iter) {
if (*iter != NULL) {
delete *iter;
}
}
_stages.clear();
for (std::vector<DagNode*>::iterator iter = _index_nodes.begin();
iter != _index_nodes.end();
++iter) {
DagNode* node = *iter;
if (node != NULL) {
void* conf = node->conf;
if (conf != NULL) {
Op* op = OpRepository::instance().get_op(node->type);
if (op == NULL) {
LOG(FATAL) << "Failed to get_op, op type[" << node->type << "]";
return -1;
}
op->delete_config(conf);
OpRepository::instance().return_op(node->type, op);
}
delete node;
}
}
_index_nodes.clear();
_name_nodes.clear();
return 0;
}
EdgeMode Dag::parse_mode(std::string& mode) {
if (mode == "RO") {
return RO;
} else if (mode == "RW") {
return RW;
} else {
return UNKNOWN;
}
}
// [@Node]
// name: preprocess
// type: ProcessorOp
// [.@Depend]
// name: StartupOp
// mode: RO
// [@Node]
// name: discret_extractor
// type: DiscretExtractOp
// [.@Depend]
// name: StartupOp
// mode: RO
// [.@Depend]
// name: preprocess
// mode: RW
// [@Node]
// name: dnn_inference
// type: PaddleV2InferenceOp
// [.@Depend]
// name: discret_extractor
// mode: RO
// [@Node]
// name: postprocess
// type: PostProcessOp
// [.@Depend]
// name: dnn_inference
// mode: RO
int Dag::init(const char* path, const char* file, const std::string& name) {
comcfg::Configure conf;
if (conf.load(path, file) != 0) {
LOG(FATAL) << "Failed load conf from"
<< path << "/" << file << " in dag: "
<< name;
return ERR_INTERNAL_FAILURE;
}
return init(conf, name);
}
int Dag::init(const comcfg::Configure& conf, const std::string& name) {
_dag_name = name;
_index_nodes.clear();
_name_nodes.clear();
for (uint32_t i = 0; i < conf["Node"].size(); i++) {
DagNode* node = new (std::nothrow) DagNode();
if (node == NULL) {
LOG(ERROR) << "Failed create new dag node";
return ERR_MEM_ALLOC_FAILURE;
}
node->id = i + 1; // 0 is reserved for begginer-op
node->name = conf["Node"][i]["name"].to_cstr();
node->type = conf["Node"][i]["type"].to_cstr();
uint32_t depend_size = conf["Node"][i]["Depend"].size();
for (uint32_t j = 0; j < depend_size; j++) {
const comcfg::ConfigUnit& depend =
conf["Node"][i]["Depend"][j];
std::string name = depend["name"].to_cstr();
std::string mode = depend["mode"].to_cstr();
node->depends.insert(
std::make_pair(name, parse_mode(mode)));
}
Op* op = OpRepository::instance().get_op(node->type);
if (op == NULL) {
LOG(FATAL) << "Failed to get_op, op type[" << node->type << "]";
return ERR_INTERNAL_FAILURE;
}
// node->conf could be NULL
node->conf = op->create_config(conf["Node"][i]);
OpRepository::instance().return_op(node->type, op);
_name_nodes.insert(std::make_pair(node->name, node));
_index_nodes.push_back(node);
}
if (topo_sort() != 0) {
LOG(FATAL) << "Topo sort dag[" << _dag_name << "] failed!";
return ERR_INTERNAL_FAILURE;
}
if (FLAGS_el_log_level == 16) {
LOG(DEBUG) << "DAG: " << _dag_name << noflush;
LOG(DEBUG) << ", Op Num: " << _index_nodes.size();
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(DEBUG)
<< ", OP-" << node->id << "-" << node->name << "-"
<< node->type << noflush;
LOG(DEBUG) << " depends: " << node->depends.size() << noflush;
boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
LOG(DEBUG) << " " << it->first << " " << it->second << noflush;
}
}
LOG(DEBUG) << "";
}
return ERR_OK;
}
uint32_t Dag::nodes_size() {
return _index_nodes.size();
}
const DagNode* Dag::node_by_id(uint32_t id) {
return _index_nodes[id];
}
const DagNode* Dag::node_by_id(uint32_t id) const {
return _index_nodes[id];
}
const DagNode* Dag::node_by_name(std::string& name) {
return _name_nodes[name];
}
const DagNode* Dag::node_by_name(const std::string& name) const {
boost::unordered_map<std::string, DagNode*>::const_iterator it;
it = _name_nodes.find(name);
if (it == _name_nodes.end()) {
LOG(WARNING) << "Not found op by name:" << name;
return NULL;
}
return it->second;
}
uint32_t Dag::stage_size() {
return _stages.size();
}
const DagStage* Dag::stage_by_index(uint32_t index) {
return _stages[index];
}
int Dag::topo_sort() {
// TODO ƽ
std::stringstream ss;
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagStage* stage = new (std::nothrow) DagStage();
if (stage == NULL) {
LOG(ERROR) << "Invalid stage!";
return ERR_MEM_ALLOC_FAILURE;
}
stage->nodes.push_back(_index_nodes[nid]);
ss.str("");
ss << _stages.size();
stage->name = ss.str();
stage->full_name = full_name() + NAME_DELIMITER + stage->name;
_stages.push_back(stage);
// assign stage number after stage created
_index_nodes[nid]->stage = nid;
// assign dag node full name after stage created
_index_nodes[nid]->full_name = stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name;
}
return ERR_OK;
}
void Dag::regist_metric(const std::string& service_name) {
for (int stage_idx = 0; stage_idx < _stages.size(); ++stage_idx) {
DagStage* stage = _stages[stage_idx];
PredictorMetric::GetInstance()->regist_latency_metric(
STAGE_METRIC_PREFIX + service_name + NAME_DELIMITER + stage->full_name);
for (int node_idx = 0; node_idx < stage->nodes.size(); ++node_idx) {
DagNode* node = stage->nodes[node_idx];
PredictorMetric::GetInstance()->regist_latency_metric(
OP_METRIC_PREFIX + service_name + NAME_DELIMITER + node->full_name);
Op* op = OpRepository::instance().get_op(node->type);
if (op == NULL) {
LOG(FATAL) << "Failed to get_op, op type[" << node->type << "]";
return;
}
op->set_full_name(service_name + NAME_DELIMITER + node->full_name);
op->set_config(node->conf);
op->regist_metric();
OpRepository::instance().return_op(node->type, op);
}
}
}
} // predictor
} // paddle_serving
} // baidu
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
enum EdgeMode {
RO = 0,
RW = 1,
UNKNOWN
};
struct DagNode {
uint32_t id;
uint32_t stage;
std::string name; // opname
std::string full_name; // workflow_stageindex_opname
std::string type;
void* conf;
boost::unordered_map<std::string, EdgeMode> depends;
};
struct DagStage {
std::vector<DagNode*> nodes;
std::string name; // stageindex
std::string full_name; // workflow_stageindex
};
class Dag {
public:
Dag();
virtual ~Dag();
EdgeMode parse_mode(std::string& mode);
int init(const char* path, const char* file, const std::string& name);
int init(const comcfg::Configure& conf, const std::string& name);
int deinit();
uint32_t nodes_size();
const DagNode* node_by_id(uint32_t id);
const DagNode* node_by_id(uint32_t id) const;
const DagNode* node_by_name(std::string& name);
const DagNode* node_by_name(const std::string& name) const;
uint32_t stage_size();
const DagStage* stage_by_index(uint32_t index);
const std::string& name() const {
return _dag_name;
}
const std::string& full_name() const {
return _dag_name;
}
void regist_metric(const std::string& service_name);
private:
int topo_sort();
private:
std::string _dag_name;
boost::unordered_map<std::string, DagNode*> _name_nodes;
std::vector<DagNode*> _index_nodes;
std::vector<DagStage*> _stages;
};
} // predictor
} // paddle_serving
} // baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_DAG_H
#include "framework/dag_view.h"
#include <baidu/rpc/traceprintf.h> // TRACEPRINTF
#include "common/inner_common.h"
#include "framework/op_repository.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int DagView::init(Dag* dag, const std::string& service_name) {
_name = dag->name();
_full_name = service_name + NAME_DELIMITER + dag->name();
_bus = base::get_object<Bus>();
_bus->clear();
uint32_t stage_size = dag->stage_size();
// create tls stage view
for (uint32_t si = 0; si < stage_size; si++) {
const DagStage* stage = dag->stage_by_index(si);
if (stage == NULL) {
LOG(FATAL) << "Failed get stage by index:" << si;
return ERR_INTERNAL_FAILURE;
}
ViewStage* vstage = base::get_object<ViewStage>();
if (vstage == NULL) {
LOG(FATAL)
<< "Failed get vstage from object pool"
<< "at:" << si;
return ERR_MEM_ALLOC_FAILURE;
}
vstage->full_name = service_name + NAME_DELIMITER + stage->full_name;
uint32_t node_size = stage->nodes.size();
// create tls view node
for (uint32_t ni = 0; ni < node_size; ni++) {
DagNode* node = stage->nodes[ni];
ViewNode* vnode = base::get_object<ViewNode>();
if (vnode == NULL) {
LOG(FATAL) << "Failed get vnode at:" << ni;
return ERR_MEM_ALLOC_FAILURE;
}
// factory type
Op* op = OpRepository::instance().get_op(node->type);
if (op == NULL) {
LOG(FATAL) << "Failed get op with type:"
<< node->type;
return ERR_INTERNAL_FAILURE;
}
// initialize a TLS op object
if (op->init(_bus, dag, node->id, node->name, node->type, node->conf) != 0) {
LOG(WARNING) << "Failed init op, type:" << node->type;
return ERR_INTERNAL_FAILURE;
}
op->set_full_name(service_name + NAME_DELIMITER + node->full_name);
vnode->conf = node;
vnode->op = op;
vstage->nodes.push_back(vnode);
}
_view.push_back(vstage);
}
return ERR_OK;
}
int DagView::deinit() {
uint32_t stage_size = _view.size();
for (uint32_t si = 0; si < stage_size; si++) {
ViewStage* vstage = _view[si];
uint32_t node_size = vstage->nodes.size();
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
vnode->op->deinit();
OpRepository::instance().return_op(vnode->op);
vnode->reset();
// clear item
base::return_object(vnode);
}
// clear vector
vstage->nodes.clear();
base::return_object(vstage);
}
_view.clear();
_bus->clear();
base::return_object(_bus);
return ERR_OK;
}
int DagView::execute(base::IOBufBuilder* debug_os) {
uint32_t stage_size = _view.size();
for (uint32_t si = 0; si < stage_size; si++) {
TRACEPRINTF("start to execute stage[%u]", si);
int errcode = execute_one_stage(_view[si], debug_os);
TRACEPRINTF("finish to execute stage[%u]", si);
if (errcode < 0) {
LOG(FATAL)
<< "failed execute stage["
<< _view[si]->debug();
return errcode;
}
}
return ERR_OK;
}
// The default execution strategy is in sequencing
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
int DagView::execute_one_stage(ViewStage* vstage,
base::IOBufBuilder* debug_os) {
base::Timer stage_time(base::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
Op* op = vnode->op;
TRACEPRINTF("start to execute op[%s]", op->name());
int errcode = op->process(debug_os != NULL);
TRACEPRINTF("finish to execute op[%s]", op->name());
if (errcode < 0) {
LOG(FATAL)
<< "Execute failed, Op:" << op->debug_string();
return errcode;
}
if (errcode > 0) {
LOG(TRACE)
<< "Execute ignore, Op:" << op->debug_string();
continue;
}
if (debug_os) {
(*debug_os)
<< "{\"op_name\": \"" << op->name()
<< "\", \"debug_str:\": \""
<< op->debug_string()
<< "\", \"time_info\": \"" << op->time_info() << "\"}";
}
//LOG(DEBUG) << "Execute succ, Op:" << op->debug_string();
}
stage_time.stop();
PredictorMetric::GetInstance()->update_latency_metric(
STAGE_METRIC_PREFIX + vstage->full_name, stage_time.u_elapsed());
return ERR_OK;
}
int DagView::set_request_channel(Channel& request) {
// Each workflow should get the very beginning
// request (channel), and commit it to bus, for
// the first stage ops consuming.
request.share_to_bus(_bus);
return ERR_OK;
}
const Channel* DagView::get_response_channel() const {
// Caller obtains response channel from bus, and
// writes it to rpc response(protbuf/json)
if (_view.size() < 1) {
LOG(FATAL) << "invalid empty view stage!" << noflush;
return NULL;
}
ViewStage* last_stage = _view[_view.size() - 1];
if (last_stage->nodes.size() != 1
|| last_stage->nodes[0] == NULL) {
LOG(FATAL) << "Invalid last stage, size["
<< last_stage->nodes.size()
<< "] != 1" << noflush;
return NULL;
}
Op* last_op = last_stage->nodes[0]->op;
if (last_op == NULL) {
LOG(FATAL) << "Last op is NULL";
return NULL;
}
return last_op->mutable_channel();
}
} // predictor
} // paddle_serving
} // baidu
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_DAG_VIEW_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_DAG_VIEW_H
#include "op/op.h"
#include "common/inner_common.h"
#include "framework/channel.h"
#include "framework/dag.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
class Op;
struct ViewNode {
Op* op; // op->full_name == service_workflow_stageindex_opname
DagNode* conf;
void reset() {
op = NULL;
conf = NULL;
}
};
struct ViewStage {
std::vector<ViewNode*> nodes;
std::string full_name; // service_workflow_stageindex
std::string debug() {
return "TOBE IMPLEMENTED!";
}
};
class DagView {
public:
DagView() : _bus(NULL) {
_view.clear();
}
~DagView() {}
int init(Dag* dag, const std::string& service_name);
int deinit();
int execute(base::IOBufBuilder* debug_os);
// The default execution strategy is in sequencing
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
virtual int execute_one_stage(ViewStage* vstage,
base::IOBufBuilder* debug_os);
int set_request_channel(Channel& request);
const Channel* get_response_channel() const;
const std::string& name() const {
return _name;
}
const std::string& full_name() const {
return _full_name;
}
private:
std::string _name;
std::string _full_name;
std::vector<ViewStage*> _view;
Bus* _bus;
};
// The derived DagView supports parallel execution
// strategy, by implments the execute_one_stage().
class ParallelDagView : public DagView {
public:
int execute_one_stage(ViewStage* vstage, base::IOBufBuilder*) {
return 0;
}
};
} // predictor
} // paddle_serving
} // baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_DAG_VIEW_H
/***************************************************************************
*
* Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
*
**************************************************************************/
/**
* @file include/factory.h
* @author wanlijin01(wanlijin01@baidu.com)
* @date 2018/07/10 22:09:57
* @brief
*
**/
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_FACTORY_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_FACTORY_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
//////////////// DECLARE INTERFACE ////////////////
#define DECLARE_FACTORY_OBJECT(D, B) \
static int regist(const std::string& tag) { \
FactoryDerive<D, B>* factory = \
new (std::nothrow) FactoryDerive<D, B>();\
if (factory == NULL \
|| FactoryPool<B>::instance().register_factory(\
tag, factory) != 0) { \
LOG(FATAL) << "Failed regist factory:"\
<< #D << " in macro!"; \
return -1; \
} \
return 0; \
}
#define PDS_STR_CAT(a, b) PDS_STR_CAT_I(a, b)
#define PDS_STR_CAT_I(a, b) a ## b
#define DEFINE_FACTORY_OBJECT(D) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
D::regist(#D); \
}
//////////////// REGISTER INTERFACE ////////////////
#define REGIST_FACTORY_OBJECT_IMPL(D, B) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
::baidu::paddle_serving::predictor::FactoryDerive<D, B>* factory =\
new (::std::nothrow) ::baidu::paddle_serving::predictor::FactoryDerive<D, B>();\
if (factory == NULL \
|| ::baidu::paddle_serving::predictor::FactoryPool<B>::instance().register_factory(\
#D, factory) != 0) { \
LOG(FATAL) << "Failed regist factory:" \
<< #D << "->" << #B << " in macro!";\
return ; \
} \
return ; \
}
#define REGIST_FACTORY_OBJECT_IMPL_WITH_NAME(D, B, N) \
__attribute__((constructor)) static void PDS_STR_CAT(GlobalRegistObject, __LINE__)(void) \
{ \
::baidu::paddle_serving::predictor::FactoryDerive<D, B>* factory =\
new (::std::nothrow) ::baidu::paddle_serving::predictor::FactoryDerive<D, B>();\
if (factory == NULL \
|| ::baidu::paddle_serving::predictor::FactoryPool<B>::instance().register_factory(\
N, factory) != 0) { \
LOG(FATAL) << "Failed regist factory:" \
<< #D << "->" << #B << ", tag: " \
<< N << " in macro!"; \
return ; \
} \
LOG(WARNING) << "Succ regist factory:" \
<< #D << "->" << #B << ", tag: " \
<< N << " in macro!"; \
return ; \
}
template<typename B>
class FactoryBase {
public:
virtual B* gen() = 0;
virtual void del(B* obj) = 0;
};
template<typename D, typename B>
class FactoryDerive : public FactoryBase<B> {
public:
B* gen() {
return new(std::nothrow) D();
}
void del(B* obj) {
delete dynamic_cast<D*>(obj);
}
};
template<typename B>
class FactoryPool {
public:
static FactoryPool<B>& instance() {
static FactoryPool<B> singleton;
return singleton;
}
int register_factory(const std::string& tag,
FactoryBase<B>* factory) {
typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag);
if (it != _pool.end()) {
LOG(FATAL) << "Insert duplicate with tag: "
<< tag;
return -1;
}
std::pair<
typename std::map<std::string, FactoryBase<B>*>::iterator,
bool> r = _pool.insert(std::make_pair(tag, factory));
if (!r.second) {
LOG(FATAL) << "Failed insert new factory with:"
<< tag;
return -1;
}
LOG(TRACE) << "Succ insert one factory, tag: " << tag
<< ", base type: " << typeid(B).name();
return 0;
}
B* generate_object(const std::string& tag) {
typename std::map<std::string, FactoryBase<B>*>::iterator it
= _pool.find(tag);
if (it == _pool.end() || it->second == NULL) {
LOG(FATAL) << "Not found factory pool, tag:"
<< tag << ", pool size: " << _pool.size();
return NULL;
}
return it->second->gen();
}
template<typename D>
void return_object(B* object) {
FactoryDerive<D, B> factory;
factory.del(object);
}
private:
std::map<std::string, FactoryBase<B>*> _pool;
};
} // predictor
} // paddle_serving
} // baidu
#endif //BAIDU_PADDLE_SERVING_PREDICTOR_FACTORY_H
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
此差异已折叠。
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_INFER_DATA_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_INFER_DATA_H
#include "common/inner_common.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
enum DataType {
FLOAT32,
INT64
};
class DataBuf {
public:
DataBuf() : _data(NULL), _size(0), _owned(true) {}
DataBuf(size_t size)
: _data(new char[size]), _size(size), _owned(true) {}
DataBuf(void* data, size_t size)
: _data(data), _size(size), _owned(false) {}
DataBuf(void* data, size_t size, bool owned)
: _data(data), _size(size), _owned(owned) {}
void* data() const {
return _data;
}
size_t size() const {
return _size;
}
void free() {
_size = 0;
if (_owned) {
delete[] (char*)_data;
}
}
~DataBuf() {
free();
}
private:
void* _data;
size_t _size;
bool _owned;
};
struct Tensor {
Tensor() {
shape.clear();
for (int li = 0; li < lod.size(); ++li) {
lod[li].clear();
}
lod.clear();
}
Tensor(const Tensor& tensor) {
name = tensor.name;
data = tensor.data;
type = tensor.type;
shape.assign(tensor.shape.begin(), tensor.shape.end());
for (int li = 0; li < tensor.lod.size(); ++li) {
std::vector<size_t> l;
l.assign(tensor.lod[li].begin(), tensor.lod[li].end());
lod.push_back(l);
}
}
~Tensor() {
shape.clear();
}
size_t ele_byte() const {
if (type == INT64) {
return sizeof(int64_t);
} else {
return sizeof(float);
}
}
bool valid() const {
if (shape.empty()) {
if (data.data() || data.size()) {
CFATAL_LOG("data should be empty");
return false;
}
return true;
}
if (!data.data() || !data.size()) {
CFATAL_LOG("data cannot empty");
return false;
}
size_t byte_size = 1;
for (size_t si = 0; si < shape.size(); ++si) {
byte_size *= shape[si];
}
if (byte_size * ele_byte() != data.size()) {
CFATAL_LOG("wrong data size: %ld vs. %ld",
byte_size * ele_byte(), data.size());
return false;
}
}
size_t shape0() {
if (shape.empty()) {
return 0;
}
return shape[0];
}
std::string name;
std::vector<int> shape;
DataBuf data;
DataType type;
std::vector<std::vector<size_t> > lod;
};
class BatchTensor {
public:
BatchTensor() {}
~BatchTensor() {
_features.clear();
}
BatchTensor(const BatchTensor& tv) {
_features.assign(
tv.features().begin(), tv.features().end());
}
Tensor& operator[](int index) {
return _features[index];
}
const Tensor& operator[](int index) const {
return _features[index];
}
void push_back(const Tensor& tensor) {
_features.push_back(tensor);
}
size_t count() const {
return _features.size();
}
size_t size() const {
// shape0 indicates batch_size
if (count() <= 0 || _features[0].shape.size() <= 0) {
return 0;
}
return _features[0].shape[0];
}
const std::vector<Tensor>& features() const {
return _features;
}
void clear() {
_features.clear();
}
private:
std::vector<Tensor> _features;
};
} // predictor
} // paddle_serving
} // baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_INFER_DATA_H
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#include "predictor_metric.h"
#include "base/memory/singleton.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
PredictorMetric* PredictorMetric::GetInstance() {
return Singleton<PredictorMetric>::get();
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册