提交 a8985b1b 编写于 作者: X xulongteng

Merge branch 'develop' into bert

......@@ -75,7 +75,6 @@ include(generic)
include(flags)
if (NOT CLIENT_ONLY)
include(external/mklml)
include(paddlepaddle)
include(external/opencv)
endif()
......
# Copyright (c) 2017 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
IF(NOT ${WITH_MKLML})
return()
ENDIF(NOT ${WITH_MKLML})
IF(APPLE)
MESSAGE(WARNING "Mac is not supported with MKLML in Paddle yet. Force WITH_MKLML=OFF.")
SET(WITH_MKLML OFF CACHE STRING "Disable MKLML package in MacOS" FORCE)
return()
ENDIF()
INCLUDE(ExternalProject)
SET(MKLML_DST_DIR "mklml")
SET(MKLML_INSTALL_ROOT "${THIRD_PARTY_PATH}/install")
SET(MKLML_INSTALL_DIR ${MKLML_INSTALL_ROOT}/${MKLML_DST_DIR})
SET(MKLML_ROOT ${MKLML_INSTALL_DIR})
SET(MKLML_INC_DIR ${MKLML_ROOT}/include)
SET(MKLML_LIB_DIR ${MKLML_ROOT}/lib)
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${MKLML_ROOT}/lib")
SET(TIME_VERSION "2019.0.1.20181227")
IF(WIN32)
SET(MKLML_VER "mklml_win_${TIME_VERSION}" CACHE STRING "" FORCE)
SET(MKLML_URL "https://paddlepaddledeps.cdn.bcebos.com/${MKLML_VER}.zip" CACHE STRING "" FORCE)
SET(MKLML_LIB ${MKLML_LIB_DIR}/mklml.lib)
SET(MKLML_IOMP_LIB ${MKLML_LIB_DIR}/libiomp5md.lib)
SET(MKLML_SHARED_LIB ${MKLML_LIB_DIR}/mklml.dll)
SET(MKLML_SHARED_IOMP_LIB ${MKLML_LIB_DIR}/libiomp5md.dll)
ELSE()
SET(MKLML_VER "mklml_lnx_${TIME_VERSION}" CACHE STRING "" FORCE)
SET(MKLML_URL "http://paddlepaddledeps.cdn.bcebos.com/${MKLML_VER}.tgz" CACHE STRING "" FORCE)
SET(MKLML_LIB ${MKLML_LIB_DIR}/libmklml_intel.so)
SET(MKLML_IOMP_LIB ${MKLML_LIB_DIR}/libiomp5.so)
SET(MKLML_SHARED_LIB ${MKLML_LIB_DIR}/libmklml_intel.so)
SET(MKLML_SHARED_IOMP_LIB ${MKLML_LIB_DIR}/libiomp5.so)
ENDIF()
SET(MKLML_PROJECT "extern_mklml")
MESSAGE(STATUS "MKLML_VER: ${MKLML_VER}, MKLML_URL: ${MKLML_URL}")
SET(MKLML_SOURCE_DIR "${THIRD_PARTY_PATH}/mklml")
SET(MKLML_DOWNLOAD_DIR "${MKLML_SOURCE_DIR}/src/${MKLML_PROJECT}")
ExternalProject_Add(
${MKLML_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${MKLML_SOURCE_DIR}
URL ${MKLML_URL}
DOWNLOAD_DIR ${MKLML_DOWNLOAD_DIR}
DOWNLOAD_NO_PROGRESS 1
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
UPDATE_COMMAND ""
INSTALL_COMMAND
${CMAKE_COMMAND} -E copy_directory ${MKLML_DOWNLOAD_DIR}/include ${MKLML_INC_DIR} &&
${CMAKE_COMMAND} -E copy_directory ${MKLML_DOWNLOAD_DIR}/lib ${MKLML_LIB_DIR}
)
INCLUDE_DIRECTORIES(${MKLML_INC_DIR})
ADD_LIBRARY(mklml SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET mklml PROPERTY IMPORTED_LOCATION ${MKLML_LIB})
ADD_DEPENDENCIES(mklml ${MKLML_PROJECT})
LIST(APPEND external_project_dependencies mklml)
......@@ -40,6 +40,9 @@ message EngineDesc {
}
optional SparseParamServiceType sparse_param_service_type = 11;
optional string sparse_param_service_table_name = 12;
optional bool enable_memory_optimization = 13;
optional bool static_optimization = 14;
optional bool force_update_static_cache = 15;
};
// model_toolkit conf
......@@ -49,8 +52,7 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; };
message ResourceConf {
required string model_toolkit_path = 1;
required string model_toolkit_file = 2;
optional string cube_config_path = 3;
optional string cube_config_file = 4;
optional string cube_config_file = 3;
};
// DAG node depency info
......
......@@ -68,6 +68,9 @@ int test_write_conf() {
engine->set_enable_batch_align(0);
engine->set_sparse_param_service_type(EngineDesc::LOCAL);
engine->set_sparse_param_service_table_name("local_kv");
engine->set_enable_memory_optimization(true);
engine->set_static_optimization(false);
engine->set_force_update_static_cache(false);
int ret = baidu::paddle_serving::configure::write_proto_conf(
&model_toolkit_conf, output_dir, model_toolkit_conf_file);
......@@ -79,6 +82,7 @@ int test_write_conf() {
ResourceConf resource_conf;
resource_conf.set_model_toolkit_path(output_dir);
resource_conf.set_model_toolkit_file("model_toolkit.prototxt");
resource_conf.set_cube_config_file("./conf/cube.conf");
ret = baidu::paddle_serving::configure::write_proto_conf(
&resource_conf, output_dir, resource_conf_file);
if (ret != 0) {
......
......@@ -17,9 +17,6 @@ package agent
import (
"errors"
_ "github.com/Badangel/logex"
"github.com/Badangel/pipeline"
"os/exec"
"strconv"
"strings"
"sync"
)
......@@ -89,26 +86,3 @@ func GetMaster(master string) (host, port string, err error) {
return MasterHost[0], MasterPort[0], nil
}
}
func init() {
dfCmd := "df -h | grep -E '/home|/ssd'"
stdout, _, err := pipeline.Run(exec.Command("/bin/bash", "-c", dfCmd))
if err == nil && stdout.String() != "" {
t := strings.TrimSpace(stdout.String())
diskLi := strings.Split(t, "\n")
for _, diskStr := range diskLi {
disk := strings.Fields(diskStr)
usedPercent, _ := strconv.Atoi(strings.TrimRight(disk[4], "%"))
if usedPercent <= 40 {
disks = append(disks, disk[5])
}
}
}
if len(disks) == 0 {
disks = append(disks, "/home")
}
//logex.Debugf("available disks found: (%+v)", disks)
}
......@@ -132,7 +132,7 @@ func getHostname(ip string) (hostname string, err error) {
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
} else {
if len(hostnames) > 0 {
hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")]
hostname = hostnames[0]
} else {
hostname = ip
}
......
......@@ -725,7 +725,7 @@ func DoDownloadIndividual(source, downloadDir string, isService bool, timeOut in
func checkSources(source string) ([]string, error) {
sources := strings.Split(source, ";")
for i := 0; i < len(sources); i++ {
if sources[i] == "" || !strings.HasPrefix(sources[i], "ftp://") {
if sources[i] == "" || (!strings.HasPrefix(sources[i], "ftp://") && !strings.HasPrefix(sources[i], "http://")) {
return sources, errors.New("Invalid sources")
}
}
......
--port=8027
--port=8000
--dict_split=1
--in_mem=true
--log_dir=./log/
\ No newline at end of file
--log_dir=./log/
......@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <signal.h>
#include <sys/stat.h>
#include <brpc/server.h>
......
[default]
dict_name: test_dict
mode: base_only
storage_place: LOCAL
download_mode: http
wget_port: 8009
buildtool_local: /home/work/Serving/build/output/bin/cube-builder
donefile_address: http://127.0.0.1/home/work/dangyifei/donefile
output_address: /home/work/dangyifei/test-transfer/test_data/output
tmp_address: /home/work/dangyifei/test-transfer/test_data/tmp
shard_num: 1
copy_num: 2
shard_num: 2
copy_num: 1
deploy_path: /home/work/test_dict
transfer_address: 127.0.0.1
[cube_agent]
agent0_0: 127.0.0.1:8001
cube0_0: 127.0.0.1:8000:/ssd2/cube_open
agent0_1: 127.0.0.1:8001
cube0_1: 127.0.0.1:8000:/home/disk1/cube_open
agent1_0: 127.0.0.1:8001
cube1_0: 127.0.0.1:8000:/home/disk1/cube_open
......@@ -16,8 +16,8 @@ package main
import (
"fmt"
"github.com/docopt/docopt-go"
"github.com/Badangel/logex"
"github.com/docopt/docopt-go"
"os"
"path/filepath"
"runtime"
......@@ -64,6 +64,7 @@ Log options:
// settings:
if opts["-p"] == nil {
logex.Fatal("ERROR: -p PORT must be set!")
fmt.Fprintln(os.Stderr, "ERROR: -p PORT must be set!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -72,6 +73,7 @@ Log options:
logex.Notice(">>> port:", transfer.Port)
if opts["--config"] == nil {
logex.Fatal("ERROR: --config config_file must be set!")
fmt.Fprintln(os.Stderr, "ERROR: --config config_file must be set!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -81,6 +83,7 @@ Log options:
configMgr.Init(opts["--config"].(string))
transfer.Dict.DictName = configMgr.Read("default", "dict_name")
if transfer.Dict.DictName == "" {
logex.Fatal("ERROR: nead [default] DictName in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictName in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -89,22 +92,40 @@ Log options:
transfer.Dict.DictMode = configMgr.Read("default", "mode")
if transfer.Dict.DictMode == "" {
logex.Fatal("ERROR: nead [default] DictMode in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictMode in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> Mode:", transfer.Dict.DictMode)
transfer.Dict.StoragePlace = configMgr.Read("default", "storage_place")
if transfer.Dict.StoragePlace == "" || transfer.Dict.StoragePlace != "LOCAL" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] StoragePlace in config_file! only support Local")
transfer.Dict.DownloadMode = configMgr.Read("default", "download_mode")
if transfer.Dict.DownloadMode != "http" && transfer.Dict.DownloadMode != "ftp" {
logex.Fatal("ERROR: nead [default] download_mode in config_file! only support ftp or http")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] download_mode in config_file! only support ftp or http")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> StoragePlace:", transfer.Dict.StoragePlace)
logex.Notice(">>> DownloadMode:", transfer.Dict.DownloadMode)
transfer.Dict.WgetPort = configMgr.Read("default", "wget_port")
if transfer.Dict.WgetPort == "" {
logex.Fatal("ERROR: nead [default] wget_port in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] wget_port in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
var wget_port int
wget_port, err = strconv.Atoi(transfer.Dict.WgetPort)
if err != nil {
logex.Fatal("wget_port form is not right need int")
os.Exit(1)
}
logex.Notice(">>> WgetPort:", wget_port)
transfer.BuildToolLocal = configMgr.Read("default", "buildtool_local")
if transfer.BuildToolLocal == "" {
logex.Fatal("ERROR: nead [default] BuildToolLocal in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] BuildToolLocal in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -113,6 +134,7 @@ Log options:
transfer.Dict.DonefileAddress = configMgr.Read("default", "donefile_address")
if transfer.Dict.DonefileAddress == "" {
logex.Fatal("ERROR: nead [default] DonefileAddress in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DonefileAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -121,6 +143,7 @@ Log options:
transfer.Dict.OutputAddress = configMgr.Read("default", "output_address")
if transfer.Dict.OutputAddress == "" {
logex.Fatal("ERROR: nead [default] OutputAddress in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] OutputAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -129,6 +152,7 @@ Log options:
transfer.Dict.TmpAddress = configMgr.Read("default", "tmp_address")
if transfer.Dict.TmpAddress == "" {
logex.Fatal("ERROR: nead [default] TmpAddress in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] TmpAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -137,6 +161,7 @@ Log options:
ShardNumStr := configMgr.Read("default", "shard_num")
if ShardNumStr == "" {
logex.Fatal("ERROR: nead [default] ShardNum in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] ShardNum in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -150,6 +175,7 @@ Log options:
CopyNumStr := configMgr.Read("default", "copy_num")
if CopyNumStr == "" {
logex.Fatal("ERROR: nead [default] CopyNum in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] CopyNum in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -165,6 +191,7 @@ Log options:
transfer.Dict.DeployPath = configMgr.Read("default", "deploy_path")
if transfer.Dict.DeployPath == "" {
logex.Fatal("ERROR: nead [default] DeployPath in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DeployPath in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -173,6 +200,7 @@ Log options:
transfer.TransferAddr = configMgr.Read("default", "transfer_address")
if transfer.TransferAddr == "" {
logex.Fatal("ERROR: nead [default] TransferAddr in config_file!")
fmt.Fprintln(os.Stderr, "ERROR: nead [default] TransferAddr in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
......@@ -185,9 +213,17 @@ Log options:
agentName := fmt.Sprintf("agent%d_%d", i, j)
agentInfo := configMgr.Read("cube_agent", agentName)
agentInfoSlice := strings.Split(agentInfo, ":")
if len(agentInfoSlice) != 2 {
logex.Fatal("agent conf format not right! sample: ip:port")
os.Exit(1)
}
cubeName := fmt.Sprintf("cube%d_%d", i, j)
cubeInfo := configMgr.Read("cube_agent", cubeName)
cubeInfoSlice := strings.Split(cubeInfo, ":")
if len(cubeInfoSlice) != 3 {
logex.Fatal("cube conf format not right! sample: ip:port:deploy_path")
os.Exit(1)
}
instance.DictName = transfer.Dict.DictName
instance.AgentIp = agentInfoSlice[0]
instance.AgentPort, _ = strconv.Atoi(agentInfoSlice[1])
......
......@@ -72,7 +72,7 @@ func CmdInstsDownload() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[download cmd]%v:%v", address, json_params)
go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i])
......@@ -121,7 +121,7 @@ func CmdInstsReload() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[reload cmd]%v:%v", address, json_params)
......@@ -170,7 +170,7 @@ func CmdInstsEnable() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[enable cmd]%v:%v", address, json_params)
......
......@@ -24,7 +24,8 @@ type DictInfo struct {
DonefileAddress string `json:"donefile_addr"`
OutputAddress string `json:"output_addr"`
TmpAddress string `json:"tmp_addr"`
StoragePlace string `json:"storage_place"`
DownloadMode string `json:"download_mode"`
WgetPort string `json:"wget_port"`
DownloadSuccInsts int `json:"download_inst"`
ReloadSuccInsts int `json:"reload_insts"`
EnableSuccInsts int `json:"enable_insts"`
......
......@@ -36,7 +36,7 @@ type DictShardInfo struct {
IsActive bool `json:"is_active,omitempty"`
}
func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace string, transferaddr string)(info DictShardInfo){
func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, downloadMode string, transferAddr string, wgetPort string)(info DictShardInfo){
info.Name = dictVersionInfo.DictName
info.Version = strconv.Itoa(dictVersionInfo.Version)
info.Depend = strconv.Itoa(dictVersionInfo.Depend)
......@@ -44,16 +44,17 @@ func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace
info.Key = strconv.Itoa(dictVersionInfo.Key)
info.Mode = dictVersionInfo.Mode
info.Shard = shard
info.Source = GetFileHead(storagePlace, transferaddr) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar"
info.Source = GetFileHead(downloadMode, transferAddr, wgetPort) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar"
return
}
func GetFileHead(storagePlace string, transferaddr string) string {
if storagePlace == "LOCAL"{
return "ftp://" + transferaddr
func GetFileHead(downloadMode string, transferAddr string, wgetPort string) string {
if downloadMode == "http" {
return HTTP_HEADER + transferAddr + ":" + wgetPort
} else if downloadMode == "ftp" {
return FTP_HEADER + transferAddr + ":" + wgetPort
} else {
return ""
}
}
\ No newline at end of file
......@@ -66,11 +66,11 @@ func GetDictScaler(subpath string, m map[string]string) (string, string, int, er
}
for _, version := range Dict.CurrentVersionInfo {
info := dict.GetDictShardScaler(shard, version, Dict.StoragePlace, TransferAddr)
info := dict.GetDictShardScaler(shard, version, Dict.DownloadMode, TransferAddr, Dict.WgetPort)
infos = append(infos, info)
}
if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying {
info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.StoragePlace, TransferAddr)
info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.DownloadMode, TransferAddr, Dict.WgetPort)
infos = append(infos, info)
}
......
......@@ -52,7 +52,16 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
index := len(lines) - 2
index := len(lines) - 1
//one line length smaller than 3 maybe blank or return
for len(lines[index]) < 3 {
index--
}
if index < 0 {
logex.Noticef("[trigrer]get base donfile info error")
err = fmt.Errorf("[trigrer]get base donfile info error")
return
}
var donefileInfo dict.DonefileInfo
fmt.Printf("line %v: %v\n", index, lines[index])
if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil {
......@@ -83,7 +92,13 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
for index := 0; index < len(lines)-1; index++ {
if len(lines[index]) < 3 {
logex.Noticef("[trigrer]get patch donfile info error")
err = fmt.Errorf("[trigrer]get patch donfile info error")
return
}
var donefileInfo dict.DonefileInfo
if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil {
return
......
# Cube Server 社区版本性能报告
## 机器配置
Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz
## 测试数据
100w条样例kv 数据。 key为uint_64类型,单条value长度 40 Byte (一般实际场景对应一个10维特征向量)。
## 单Key高qps访问场景
一般来说实际预估服务生产环境下,单key访问场景较少,不过单机qps依然是一个比较重要的性能指标,故在下表提供了不同qps下单机Cube server性能指标。
| qps | 10w | 50w | 100w |
|---|---|---|---|
|kps|10w|50w|100w|
|cpu(top)| 6.5% | 38.3% | 71.4% |
|client端延迟| avg 196 us<br>50% 160 us<br>70% 188 us<br>90% 292 us<br>95% 419 us<br>97% 547 us<br>99% 835 us<br>99.9% 1556 us<br>99.99% 1779 us| avg 563 us<br>50% 342 us<br>70% 502 us<br>90% 1063 us<br>95% 1703 us<br>97% 2399 us<br> 99% 4036 us<br>99.9% 7195 us<br>99.99% 7340 us| avg 4234 us<br>50% 3120 us<br>70% 5459 us<br>90% 10657 us<br>95% 14074 us<br>97% 16215 us<br> 99% 19434 us<br>99.9% 29398 us<br>99.99% 33921 us|
## 高kps场景
生产环境下,预估服务更多的会以batch形式访问cube server,这类情况kps相对qps能更准确的判断服务性能。我们以单次100key为例,给出不同kps下cube server的相关性能指标。
| qps | 2w | 10w | 20w |
|---|---|---|---|
|kps|200w|1000w|2000w|
|cpu(top)| 3.5% | 17.8% | 37.6% |
|client端延迟| avg 375 us<br>50% 346 us<br>70% 390 us<br>90% 498 us<br>95% 582 us<br>97% 648 us<br>99% 908 us<br>99.9% 1623 us<br>99.99% 3312 us| avg 1141 us<br>50% 900 us<br>70% 1237 us<br>90% 2028 us<br>95% 2662 us<br>97% 3173 us<br>99% 4614 us<br> 99.9% 6786 us<br>99.99% 6273 us| avg 1940 us<br>50% 1299 us<br>70% 1876 us<br>90% 3629 us<br>95% 5535 us<br>97% 7365 us<br>99% 10529 us<br>99.9% 14060 us<br>99.99% 16028 us|
## 分布式场景客户端延迟
在batch查询场景下,单次batch查询耗时是业务方(特别是在线预估服务)非常关心的敏感指标。在分布式场景下,单次batch查询往往要并发请求下游所有分片server,此时batch查询耗时取决于最慢的分片请求。我们在下文给出了一个典型分布式场景下的batch查询耗时。
测试条件:
cube分片数:10
client机器ping下游server机器约0.06ms
| batch size | 100 | 500 | 1000 |
|---|---|---|---|
|qps|100|100|100|
|kps|1w| 5w| 10w|
|client端延迟|avg 324 us<br>80% 346 us<br>99% 1190 us<br>99.9% 3662 us<br>99.99% 9392 us|avg 466 us<br>80% 488 us<br>99% 1492 us<br>99.9% 4122 us<br>99.99% 10612 us|avg 675 us<br>80% 691 us<br>99% 2101 us<br>99.9% 5169 us<br>99.99% 14867 us|
## 结语
上述表格主要阐述了cube server单机性能,在生产环境下,对于更高的kps(qps)压力我们可以方便地通过增加副本的方式来增强集群负载能力。 在模型大小超过单机内存限制时,我们亦可通过部署多数据分片的分布式集群来提供服务。
整体来说,cube性能能够满足当前业界各类预估服务的kv访问需求。
## PS
本文所列性能均为cube原始代码版本性能,如果根据实际情情况做适当调优:如链入tcmalloc or jemalloc、 进行numa绑定 以及 打开编译优化参数等等,都有可能进一步明显提升cube server的性能表现。
......@@ -83,6 +83,10 @@ install(TARGETS echo
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/python/echo.py
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/python)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/php/echo.php
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo/php)
install(TARGETS echo_kvdb
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/demo/client/echo_kvdb/bin)
......@@ -114,6 +118,14 @@ install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data/text_classification DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/data)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/python/text_classification.py
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/python)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/php/text_classification.php
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/php)
install(TARGETS ctr_prediction
RUNTIME DESTINATION
......
<?
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
function http_post($url, $data) {
// array to json string
$data_string = json_encode($data);
$ch = curl_init($url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
// post data 封装
curl_setopt($ch, CURLOPT_POSTFIELDS, $data_string);
// true是获取文本,不直接输出
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
// 强制curl不使用100-continue
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Expect:'));
// set header
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json',
'Content-Length: ' . strlen($data_string))
);
// 执行
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
//key value 数组,如果多,后面用逗号分开key =>value ,key1 => value1 ,....
echo http_post('http://127.0.0.1:8010/BuiltinTestEchoService/inference', array("a" => 1, "b" => 0.5));
?>
<?
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
define(BATCH_SIZE, "10");
function read_data($data_file, &$samples, &$labels) {
$handle = fopen($data_file, "r");
$search = array("(", ")", "[", "]");
$count = 0;
while (($buffer = fgets($handle)) !== false) {
$count++;
$buffer = str_ireplace($search, "", $buffer);
$x = explode(",", $buffer);
$ids = array();
for ($i = 0; $i < count($x); ++$i) {
$ids[] = (int)($x[$i]);
}
$label = array_slice($ids, count($ids) - 1);
$sample = array_slice($ids, 0, count($ids) - 1);
$samples[] = array("ids" => $sample);
$labels[] = $label;
unset($x);
unset($buffer);
unset($ids);
unset($sample);
unset($label);
}
if (!feof($handle)) {
echo "Unexpected fgets() fail";
return -1;
}
fclose($handle);
}
function &http_connect($url) {
$ch = curl_init($url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
// true是获取文本,不直接输出
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
// 强制curl不使用100-continue
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Expect:'));
// set header
curl_setopt($ch,
CURLOPT_HTTPHEADER,
array(
'Content-Type: application/json'
)
);
return $ch;
}
function http_post(&$ch, $data) {
// array to json string
$data_string = json_encode($data);
// post data 封装
curl_setopt($ch, CURLOPT_POSTFIELDS, $data_string);
// set header
curl_setopt($ch,
CURLOPT_HTTPHEADER,
array(
'Content-Length: ' . strlen($data_string)
)
);
// 执行
$result = curl_exec($ch);
return $result;
}
if ($argc != 2) {
echo "Usage: php text_classification.php DATA_SET_FILE\n";
return -1;
}
ini_set('memory_limit', '-1');
$samples = array();
$labels = array();
read_data($argv[1], $samples, $labels);
echo count($samples) . "\n";
$ch = &http_connect('http://127.0.0.1:8010/TextClassificationService/inference');
$count = 0;
for ($i = 0; $i < count($samples) - BATCH_SIZE; $i += BATCH_SIZE) {
$instances = array_slice($samples, $i, BATCH_SIZE);
echo http_post($ch, array("instances" => $instances)) . "\n";
}
curl_close($ch);
?>
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import urllib2
data = {"a": 1, "b": 0.5}
request_json = json.dumps(data)
req = urllib2.Request("http://127.0.0.1:8010/BuiltinTestEchoService/inference")
try:
response = urllib2.urlopen(req, request_json, 1)
print response.read()
except urllib2.HTTPError as e:
print e.reason
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import httplib
import sys
import os
BATCH_SIZE = 10
def data_reader(data_file, samples, labels):
if not os.path.exists(data_file):
print "Path %s not exist" % data_file
return -1
with open(data_file, "r") as f:
for line in f:
line = line.replace('(', ' ')
line = line.replace(')', ' ')
line = line.replace('[', ' ')
line = line.replace(']', ' ')
ids = line.split(',')
ids = [int(x) for x in ids]
label = ids[-1]
ids = ids[0:-1]
samples.append(ids)
labels.append(label)
if __name__ == "__main__":
""" main
"""
if len(sys.argv) != 2:
print "Usage: python text_classification.py DATA_FILE"
sys.exit(-1)
samples = []
labels = []
ret = data_reader(sys.argv[1], samples, labels)
conn = httplib.HTTPConnection("127.0.0.1", 8010)
for i in range(0, len(samples) - BATCH_SIZE, BATCH_SIZE):
batch = samples[i:i + BATCH_SIZE]
ids = []
for x in batch:
ids.append({"ids": x})
ids = {"instances": ids}
request_json = json.dumps(ids)
try:
conn.request('POST', "/TextClassificationService/inference",
request_json, {"Content-Type": "application/json"})
response = conn.getresponse()
print response.read()
except httplib.HTTPException as e:
print e.reason
......@@ -25,7 +25,12 @@ if (NOT EXISTS
endif()
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../kvdb/include)
find_library(MKLML_LIBS NAMES libmklml_intel.so libiomp5.so)
find_library(MKLML_LIB NAMES libmklml_intel.so PATHS
${CMAKE_BINARY_DIR}/Paddle/third_party/install/mklml/lib/)
find_library(MKLML_IOMP_LIB NAMES libiomp5.so PATHS
${CMAKE_BINARY_DIR}/Paddle/third_party/install/mklml/lib)
include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(serving ${serving_srcs})
......@@ -75,8 +80,7 @@ install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/serving)
if (${WITH_MKL})
install(FILES ${THIRD_PARTY_PATH}/install/mklml/lib/libmklml_intel.so
${THIRD_PARTY_PATH}/install/mklml/lib/libmklml_gnu.so
${THIRD_PARTY_PATH}/install/mklml/lib/libiomp5.so DESTINATION
install(FILES ${CMAKE_BINARY_DIR}/Paddle/third_party/install/mklml/lib/libmklml_intel.so
${CMAKE_BINARY_DIR}/Paddle/third_party/install/mklml/lib/libiomp5.so DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/serving/bin)
endif()
engines {
name: "image_classification_resnet"
type: "FLUID_CPU_NATIVE_DIR"
type: "FLUID_CPU_ANALYSIS_DIR"
reloadable_meta: "./data/model/paddle/fluid_time_file"
reloadable_type: "timestamp_ne"
model_data_path: "./data/model/paddle/fluid/SE_ResNeXt50_32x4d"
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
enable_memory_optimization: true
static_optimization: false
force_update_static_cache: false
}
engines {
name: "text_classification_bow"
type: "FLUID_CPU_ANALYSIS_DIR"
......
# HTTP Inferface
Paddle Serving服务均可以通过HTTP接口访问,客户端只需按照Service定义的Request消息格式构造json字符串即可。客户端构造HTTP请求,将json格式数据以POST请求发给serving端,serving端**自动**按Service定义的Protobuf消息格式,将json数据转换成protobuf消息。
本文档介绍以python和PHP语言访问Serving的HTTP服务接口的用法。
## 1. 访问地址
访问Serving节点的HTTP服务与C++服务使用同一个端口(例如8010),访问URL规则为:
```
http://127.0.0.1:8010/ServiceName/inference
http://127.0.0.1:8010/ServiceName/debug
```
其中ServiceName应该与Serving的配置文件`conf/services.prototxt`中配置的一致,假如有如下2个service:
```protobuf
services {
name: "BuiltinTestEchoService"
workflows: "workflow3"
}
services {
name: "TextClassificationService"
workflows: "workflow6"
}
```
则访问上述2个Serving服务的HTTP URL分别为:
```
http://127.0.0.1:8010/BuiltinTestEchoService/inference
http://127.0.0.1:8010/BuiltinTestEchoService/debug
http://127.0.0.1:8010/TextClassificationService/inference
http://127.0.0.1:8010/TextClassificationService/debug
```
## 2. Python访问HTTP Serving
Python语言访问HTTP Serving,关键在于构造json格式的请求数据,可以通过以下步骤完成:
1) 按照Service定义的Request消息格式构造python object
2) `json.dump()` / `json.dumps()` 等函数将python object转换成json格式字符串
以TextClassificationService为例,关键代码如下:
```python
# Connect to server
conn = httplib.HTTPConnection("127.0.0.1", 8010)
# samples是一个list,其中每个元素是一个ids字典:
# samples[0] = [190, 1, 70, 382, 914, 5146, 190...]
for i in range(0, len(samples) - BATCH_SIZE, BATCH_SIZE):
# 构建批量预测数据
batch = samples[i: i + BATCH_SIZE]
ids = []
for x in batch:
ids.append({"ids" : x})
ids = {"instances": ids}
# python object转成json
request_json = json.dumps(ids)
# 请求HTTP服务,打印response
try:
conn.request('POST', "/TextClassificationService/inference", request_json, {"Content-Type": "application/json"})
response = conn.getresponse()
print response.read()
except httplib.HTTPException as e:
print e.reason
```
完整示例请参考[text_classification.py](../demo-client/python/text_classification.py)
## 3. PHP访问HTTP Serving
PHP语言构造json格式字符串的步骤如下:
1) 按照Service定义的Request消息格式,构造PHP array
2) `json_encode()`函数将PHP array转换成json字符串
以TextCLassificationService为例,关键代码如下:
```PHP
function http_post(&$ch, $data) {
// array to json string
$data_string = json_encode($data);
// post data 封装
curl_setopt($ch, CURLOPT_POSTFIELDS, $data_string);
// set header
curl_setopt($ch,
CURLOPT_HTTPHEADER,
array(
'Content-Length: ' . strlen($data_string)
)
);
// 执行
$result = curl_exec($ch);
return $result;
}
$ch = &http_connect('http://127.0.0.1:8010/TextClassificationService/inference');
$count = 0;
# $samples是一个2层array,其中每个元素是一个如下array:
# $samples[0] = array(
# "ids" => array(
# [0] => int(190),
# [1] => int(1),
# [2] => int(70),
# [3] => int(382),
# [4] => int(914),
# [5] => int(5146),
# [6] => int(190)...)
# )
for ($i = 0; $i < count($samples) - BATCH_SIZE; $i += BATCH_SIZE) {
$instances = array_slice($samples, $i, BATCH_SIZE);
echo http_post($ch, array("instances" => $instances)) . "\n";
}
curl_close($ch);
```
完整代码请参考[text_classification.php](../demo-client/php/text_classification.php)
......@@ -18,9 +18,8 @@ services {
其中
port: 该字段标明本机serving实例启动的监听端口。默认为8010。还可以通过--port=8010命令行参数指定。
services: 可以配置多个services。Paddle Serving被设计为单个Serving实例可以同时承载多个预测服务,服务间通过service name进行区分。例如以下代码配置2个预测服务:
- port: 该字段标明本机serving实例启动的监听端口。默认为8010。还可以通过--port=8010命令行参数指定。
- services: 可以配置多个services。Paddle Serving被设计为单个Serving实例可以同时承载多个预测服务,服务间通过service name进行区分。例如以下代码配置2个预测服务:
```JSON
port: 8010
services {
......@@ -33,7 +32,7 @@ services {
}
```
service.name: 请填写serving/proto/xx.proto文件的service名称,例如,在serving/proto/image_class.proto中,service名称如下声明:
- service.name: 请填写serving/proto/xx.proto文件的service名称,例如,在serving/proto/image_class.proto中,service名称如下声明:
```JSON
service ImageClassifyService {
rpc inference(Request) returns (Response);
......@@ -43,11 +42,11 @@ service ImageClassifyService {
```
则service name就是`ImageClassifyService`
service.workflows: 用于指定该service下所配的workflow列表。可以配置多个workflow。在本例中,为`ImageClassifyService`配置了一个workflow:`workflow1``workflow1`的具体定义在workflow.prototxt
- service.workflows: 用于指定该service下所配的workflow列表。可以配置多个workflow。在本例中,为`ImageClassifyService`配置了一个workflow:`workflow1``workflow1`的具体定义在workflow.prototxt
## 2. workflow.prototxt
workflow.prototxt用来描述每一个具体的workflow,他的protobuf格式可参考`configure/server_configure.protobuf``Workflow`类型。具体的磁盘文件路径可通过--workflow_path和--workflow_file指定。一个例子如下:
workflow.prototxt用来描述每一个具体的workflow,他的protobuf格式可参考`configure/server_configure.protobuf``Workflow`类型。具体的磁盘文件路径可通过`--workflow_path``--workflow_file`指定。一个例子如下:
```JSON
workflows {
......@@ -86,32 +85,32 @@ workflows {
```
以上样例配置了2个workflow:`workflow1``workflow2`。以`workflow1`为例:
name: workflow名称,用于从service.prototxt索引到具体的workflow
workflow_type: 可选"Sequence", "Parallel",表示本workflow下节点所代表的OP是否可并行。**当前只支持Sequence类型,如配置了Parallel类型,则该workflow不会被执行**
nodes: 用于串联成workflow的所有节点,可配置多个nodes。nodes间通过配置dependencies串联起来
node.name: 随意,建议取一个能代表当前node所执行OP的类
node.type: 当前node所执行OP的类名称,与serving/op/下每个具体的OP类的名称对应
node.dependencies: 依赖的上游node列表
node.dependencies.name: 与workflow内节点的name保持一致
node.dependencies.mode: RO-Read Only, RW-Read Write
- name: workflow名称,用于从service.prototxt索引到具体的workflow
- workflow_type: 可选"Sequence", "Parallel",表示本workflow下节点所代表的OP是否可并行。**当前只支持Sequence类型,如配置了Parallel类型,则该workflow不会被执行**
- nodes: 用于串联成workflow的所有节点,可配置多个nodes。nodes间通过配置dependencies串联起来
- node.name: 随意,建议取一个能代表当前node所执行OP的类
- node.type: 当前node所执行OP的类名称,与serving/op/下每个具体的OP类的名称对应
- node.dependencies: 依赖的上游node列表
- node.dependencies.name: 与workflow内节点的name保持一致
- node.dependencies.mode: RO-Read Only, RW-Read Write
# 3. resource.prototxt
Serving端resource配置的入口是resource.prototxt,用于配置模型信息。它的protobuf格式参考`configure/proto/server_configure.proto`的ResourceConf。具体的磁盘文件路径可用--resource_path和--resource_file指定。样例如下:
Serving端resource配置的入口是resource.prototxt,用于配置模型信息。它的protobuf格式参考`configure/proto/server_configure.proto`的ResourceConf。具体的磁盘文件路径可用`--resource_path``--resource_file`指定。样例如下:
```JSON
model_manager_path: ./conf
model_manager_file: model_toolkit.prototxt
model_toolkit_path: "./conf"
model_toolkit_file: "model_toolkit.prototxt"
cube_config_file: "./conf/cube.conf"
```
主要用来指定model_toolkit.prototxt路径
其中:
- model_toolkit_path:用来指定model_toolkit.prototxt所在的目录
- model_toolkit_file: 用来指定model_toolkit.prototxt所在的文件名
- cube_config_file: 用来指定cube配置文件所在路径与文件名
Cube是Paddle Serving中用于大规模稀疏参数的组件。
# 4. model_toolkit.prototxt
......@@ -127,14 +126,18 @@ engines {
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
sparse_param_service_type: LOCAL
sparse_param_service_table_name: "local_kv"
enable_memory_optimization: true
static_optimization: false
force_update_static_cache: false
}
```
其中
name: 模型名称。InferManager通过此名称,找到要使用的模型和预测引擎。可参考serving/op/classify_op.h与serving/op/classify_op.cpp的InferManager::instance().infer()方法的参数来了解。
type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cpp找到当前注册的预测引擎列表
- name: 模型名称。InferManager通过此名称,找到要使用的模型和预测引擎。可参考serving/op/classify_op.h与serving/op/classify_op.cpp的InferManager::instance().infer()方法的参数来了解。
- type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cpp找到当前注册的预测引擎列表
|预测引擎|含义|
|--------|----|
......@@ -152,9 +155,8 @@ type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cp
Analysis API在模型加载过程中,会对模型计算逻辑进行多种优化,包括但不限于zero copy tensor,相邻OP的fuse等。**但优化逻辑不是一定对所有模型都有加速作用,有时甚至会有反作用,请以实测结果为准**
reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值
reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/none
- reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值
- reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/none
|reloadable_type|含义|
|---------------|----|
......@@ -163,13 +165,22 @@ reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/
|md5sum|目前无用,配置后永远不reload|
|revision|目前无用,配置后用于不reload|
model_data_path: 模型文件路径
runtime_thread_num: 若大于0, 则启用bsf多线程调度框架,在每个预测bthread worker内启动多线程预测。要注意的是,当启用worker内多线程预测,workflow中OP需要用Serving框架的BatchTensor类做预测的输入和输出 (predictor/framework/infer_data.h, `class BatchTensor`)。
batch_infer_size: 启用bsf多线程预测时,每个预测线程的batch size
enable_batch_align:
- model_data_path: 模型文件路径
- runtime_thread_num: 若大于0, 则启用bsf多线程调度框架,在每个预测bthread worker内启动多线程预测。要注意的是,当启用worker内多线程预测,workflow中OP需要用Serving框架的BatchTensor类做预测的输入和输出 (predictor/framework/infer_data.h, `class BatchTensor`)。
- batch_infer_size: 启用bsf多线程预测时,每个预测线程的batch size
- enable_batch_align:
- sparse_param_service_type: 枚举类型,可选参数,大规模稀疏参数服务类型
|sparse_param_service_type|含义|
|-------------------------|--|
|NONE|不使用大规模稀疏参数服务|
|LOCAL|单机本地大规模稀疏参数服务,以rocksdb作为引擎|
|REMOTE|分布式大规模稀疏参数服务,以Cube作为引擎|
- sparse_param_service_table_name: 可选参数,大规模稀疏参数服务承载本模型所用参数的表名。
- enable_memory_optimization: bool类型,可选参数,是否启用内存优化。只在使用fluid Analysis预测API时有意义。需要说明的是,在GPU预测时,会执行显存优化
- static_optimization: bool类型,是否执行静态优化。只有当启用内存优化时有意义。
- force_update_static_cache: bool类型,是否强制更新静态优化cache。只有当启用内存优化时有意义。
## 5. 命令行配置参数
......
......@@ -93,7 +93,7 @@ class FluidFamilyCore {
return true;
}
virtual int create(const std::string& data_path) = 0;
virtual int create(const predictor::InferEngineCreationParams& params) = 0;
virtual int clone(void* origin_core) {
if (origin_core == NULL) {
......@@ -119,7 +119,8 @@ class FluidFamilyCore {
// infer interface
class FluidCpuAnalysisCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -131,6 +132,12 @@ class FluidCpuAnalysisCore : public FluidFamilyCore {
analysis_config.SetProgFile(data_path + "/__model__");
analysis_config.DisableGpu();
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
analysis_config.SwitchSpecifyInputNames(true);
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
......@@ -147,7 +154,8 @@ class FluidCpuAnalysisCore : public FluidFamilyCore {
class FluidCpuNativeCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -177,7 +185,8 @@ class FluidCpuNativeCore : public FluidFamilyCore {
class FluidCpuAnalysisDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -189,6 +198,12 @@ class FluidCpuAnalysisDirCore : public FluidFamilyCore {
analysis_config.DisableGpu();
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......@@ -204,7 +219,8 @@ class FluidCpuAnalysisDirCore : public FluidFamilyCore {
class FluidCpuNativeDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -380,7 +396,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
virtual ~FluidCpuWithSigmoidCore() {}
public:
int create(const std::string& model_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string model_path = params.get_path();
size_t pos = model_path.find_last_of("/\\");
std::string conf_path = model_path.substr(0, pos);
std::string conf_file = model_path.substr(pos);
......@@ -393,7 +410,9 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
_core.reset(new SigmoidFluidModel);
std::string fluid_model_data_path = conf.dnn_model_path();
int ret = load_fluid_model(fluid_model_data_path);
predictor::InferEngineCreationParams new_params(params);
new_params.set_path(fluid_model_data_path);
int ret = load_fluid_model(new_params);
if (ret < 0) {
LOG(ERROR) << "fail to load fluid model.";
return -1;
......@@ -442,7 +461,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
virtual SigmoidFluidModel* get() { return _core.get(); }
virtual int load_fluid_model(const std::string& data_path) = 0;
virtual int load_fluid_model(
const predictor::InferEngineCreationParams& params) = 0;
int softmax(float x, double& o) { // NOLINT
return _core->_sigmoid_core->softmax(x, o);
......@@ -454,7 +474,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
class FluidCpuNativeDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -483,7 +504,8 @@ class FluidCpuNativeDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
class FluidCpuAnalysisDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -495,6 +517,12 @@ class FluidCpuAnalysisDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
analysis_config.DisableGpu();
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core->_fluid_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......
......@@ -95,7 +95,7 @@ class FluidFamilyCore {
return true;
}
virtual int create(const std::string& data_path) = 0;
virtual int create(const predictor::InferEngineCreationParams& params) = 0;
virtual int clone(void* origin_core) {
if (origin_core == NULL) {
......@@ -121,7 +121,8 @@ class FluidFamilyCore {
// infer interface
class FluidGpuAnalysisCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -133,7 +134,12 @@ class FluidGpuAnalysisCore : public FluidFamilyCore {
analysis_config.SetProgFile(data_path + "/__model__");
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
analysis_config.SwitchSpecifyInputNames(true);
AutoLock lock(GlobalPaddleCreateMutex::instance());
......@@ -151,7 +157,8 @@ class FluidGpuAnalysisCore : public FluidFamilyCore {
class FluidGpuNativeCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -180,7 +187,8 @@ class FluidGpuNativeCore : public FluidFamilyCore {
class FluidGpuAnalysisDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -192,7 +200,11 @@ class FluidGpuAnalysisDirCore : public FluidFamilyCore {
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
......@@ -209,7 +221,8 @@ class FluidGpuAnalysisDirCore : public FluidFamilyCore {
class FluidGpuNativeDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -385,7 +398,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
virtual ~FluidGpuWithSigmoidCore() {}
public:
int create(const std::string& model_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string model_path = params.get_path();
size_t pos = model_path.find_last_of("/\\");
std::string conf_path = model_path.substr(0, pos);
std::string conf_file = model_path.substr(pos);
......@@ -398,7 +412,9 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
_core.reset(new SigmoidFluidModel);
std::string fluid_model_data_path = conf.dnn_model_path();
int ret = load_fluid_model(fluid_model_data_path);
predictor::InferEngineCreationParams new_params(params);
new_params.set_path(fluid_model_data_path);
int ret = load_fluid_model(new_params);
if (ret < 0) {
LOG(ERROR) << "fail to load fluid model.";
return -1;
......@@ -447,7 +463,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
virtual SigmoidFluidModel* get() { return _core.get(); }
virtual int load_fluid_model(const std::string& data_path) = 0;
virtual int load_fluid_model(
const predictor::InferEngineCreationParams& params) = 0;
int softmax(float x, double& o) { // NOLINT
return _core->_sigmoid_core->softmax(x, o);
......@@ -459,7 +476,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
class FluidGpuNativeDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -488,7 +506,8 @@ class FluidGpuNativeDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
class FluidGpuAnalysisDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -500,7 +519,12 @@ class FluidGpuAnalysisDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core->_fluid_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......
......@@ -29,6 +29,55 @@ namespace predictor {
using configure::ModelToolkitConf;
class InferEngineCreationParams {
public:
InferEngineCreationParams() {
_path = "";
_enable_memory_optimization = false;
_static_optimization = false;
_force_update_static_cache = false;
}
void set_path(const std::string& path) { _path = path; }
void set_enable_memory_optimization(bool enable_memory_optimization) {
_enable_memory_optimization = enable_memory_optimization;
}
bool enable_memory_optimization() const {
return _enable_memory_optimization;
}
void set_static_optimization(bool static_optimization = false) {
_static_optimization = static_optimization;
}
void set_force_update_static_cache(bool force_update_static_cache = false) {
_force_update_static_cache = force_update_static_cache;
}
bool static_optimization() const { return _static_optimization; }
bool force_update_static_cache() const { return _force_update_static_cache; }
std::string get_path() const { return _path; }
void dump() const {
LOG(INFO) << "InferEngineCreationParams: "
<< "model_path = " << _path << ", "
<< "enable_memory_optimization = " << _enable_memory_optimization
<< ", "
<< "static_optimization = " << _static_optimization << ", "
<< "force_update_static_cache = " << _force_update_static_cache;
}
private:
std::string _path;
bool _enable_memory_optimization;
bool _static_optimization;
bool _force_update_static_cache;
};
class InferEngine {
public:
virtual ~InferEngine() {}
......@@ -75,7 +124,7 @@ class ReloadableInferEngine : public InferEngine {
typedef im::bsf::Task<Tensor, Tensor> TaskT;
virtual int load(const std::string& data_path) = 0;
virtual int load(const InferEngineCreationParams& params) = 0;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
......@@ -84,7 +133,31 @@ class ReloadableInferEngine : public InferEngine {
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
if (!check_need_reload() || load(_model_data_path) != 0) {
bool enable_memory_optimization = false;
if (conf.has_enable_memory_optimization()) {
enable_memory_optimization = conf.enable_memory_optimization();
}
bool static_optimization = false;
if (conf.has_static_optimization()) {
static_optimization = conf.static_optimization();
}
bool force_update_static_cache = false;
if (conf.has_force_update_static_cache()) {
force_update_static_cache = conf.force_update_static_cache();
}
_infer_engine_params.set_path(_model_data_path);
if (enable_memory_optimization) {
_infer_engine_params.set_enable_memory_optimization(true);
_infer_engine_params.set_static_optimization(static_optimization);
_infer_engine_params.set_force_update_static_cache(
force_update_static_cache);
}
if (!check_need_reload() || load(_infer_engine_params) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_data_path;
return -1;
}
......@@ -175,7 +248,7 @@ class ReloadableInferEngine : public InferEngine {
int reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_data_path << "].";
return load(_model_data_path);
return load(_infer_engine_params);
}
return 0;
}
......@@ -243,6 +316,7 @@ class ReloadableInferEngine : public InferEngine {
protected:
std::string _model_data_path;
InferEngineCreationParams _infer_engine_params;
private:
std::string _reload_tag_file;
......@@ -281,32 +355,35 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
return ReloadableInferEngine::proc_initialize(conf, version);
}
virtual int load(const std::string& model_data_dir) {
virtual int load(const InferEngineCreationParams& params) {
if (_reload_vec.empty()) {
return 0;
}
for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) {
if (load_data(_reload_vec[ti], model_data_dir) != 0) {
if (load_data(_reload_vec[ti], params) != 0) {
LOG(ERROR) << "Failed reload engine model: " << ti;
return -1;
}
}
LOG(WARNING) << "Succ load engine, path: " << model_data_dir;
LOG(WARNING) << "Succ load engine, path: " << params.get_path();
return 0;
}
int load_data(ModelData<EngineCore>* md, const std::string& data_path) {
int load_data(ModelData<EngineCore>* md,
const InferEngineCreationParams& params) {
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
if (!md->cores[next_idx] || md->cores[next_idx]->create(data_path) != 0) {
LOG(ERROR) << "Failed create model, path: " << data_path;
params.dump();
if (!md->cores[next_idx] || md->cores[next_idx]->create(params) != 0) {
LOG(ERROR) << "Failed create model, path: " << params.get_path();
return -1;
}
md->current_idx = next_idx;
......@@ -321,8 +398,9 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
}
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _model_data_path) != 0) {
LOG(ERROR) << "Failed create thread data from " << _model_data_path;
if (!md || load_data(md, _infer_engine_params) != 0) {
LOG(ERROR) << "Failed create thread data from "
<< _infer_engine_params.get_path();
return -1;
}
......@@ -383,17 +461,16 @@ class CloneDBReloadableInferEngine
return DBReloadableInferEngine<EngineCore>::proc_initialize(conf, version);
}
virtual int load(const std::string& model_data_dir) {
virtual int load(const InferEngineCreationParams& params) {
// 加载进程级模型数据
if (!_pd ||
DBReloadableInferEngine<EngineCore>::load_data(_pd, model_data_dir) !=
0) {
LOG(ERROR) << "Failed to create common model from [" << model_data_dir
DBReloadableInferEngine<EngineCore>::load_data(_pd, params) != 0) {
LOG(ERROR) << "Failed to create common model from [" << params.get_path()
<< "].";
return -1;
}
LOG(WARNING) << "Succ load common model[" << _pd->cores[_pd->current_idx]
<< "], path[" << model_data_dir << "].";
<< "], path[" << params.get_path() << "].";
if (DBReloadableInferEngine<EngineCore>::_reload_vec.empty()) {
return 0;
......@@ -409,7 +486,7 @@ class CloneDBReloadableInferEngine
}
}
LOG(WARNING) << "Succ load clone model, path[" << model_data_dir << "]";
LOG(WARNING) << "Succ load clone model, path[" << params.get_path() << "]";
return 0;
}
......
......@@ -110,12 +110,6 @@ int Resource::cube_initialize(const std::string& path,
}
int err = 0;
std::string cube_config_path = resource_conf.cube_config_path();
if (err != 0) {
LOG(ERROR) << "reade cube_config_path failed, path[" << path << "], file["
<< cube_config_path << "]";
return -1;
}
std::string cube_config_file = resource_conf.cube_config_file();
if (err != 0) {
LOG(ERROR) << "reade cube_config_file failed, path[" << path << "], file["
......@@ -124,8 +118,8 @@ int Resource::cube_initialize(const std::string& path,
}
err = CubeAPI::instance()->init(cube_config_file.c_str());
if (err != 0) {
LOG(ERROR) << "failed initialize cube, config: " << cube_config_path << "/"
<< cube_config_file << " error code : " << err;
LOG(ERROR) << "failed initialize cube, config: " << cube_config_file
<< " error code : " << err;
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册