未验证 提交 70dda301 编写于 作者: M MRXLT 提交者: GitHub

Merge pull request #8 from PaddlePaddle/develop

update from origin
......@@ -40,6 +40,9 @@ message EngineDesc {
}
optional SparseParamServiceType sparse_param_service_type = 11;
optional string sparse_param_service_table_name = 12;
optional bool enable_memory_optimization = 13;
optional bool static_optimization = 14;
optional bool force_update_static_cache = 15;
};
// model_toolkit conf
......@@ -49,8 +52,7 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; };
message ResourceConf {
required string model_toolkit_path = 1;
required string model_toolkit_file = 2;
optional string cube_config_path = 3;
optional string cube_config_file = 4;
optional string cube_config_file = 3;
};
// DAG node depency info
......
......@@ -68,6 +68,9 @@ int test_write_conf() {
engine->set_enable_batch_align(0);
engine->set_sparse_param_service_type(EngineDesc::LOCAL);
engine->set_sparse_param_service_table_name("local_kv");
engine->set_enable_memory_optimization(true);
engine->set_static_optimization(false);
engine->set_force_update_static_cache(false);
int ret = baidu::paddle_serving::configure::write_proto_conf(
&model_toolkit_conf, output_dir, model_toolkit_conf_file);
......@@ -79,6 +82,7 @@ int test_write_conf() {
ResourceConf resource_conf;
resource_conf.set_model_toolkit_path(output_dir);
resource_conf.set_model_toolkit_file("model_toolkit.prototxt");
resource_conf.set_cube_config_file("./conf/cube.conf");
ret = baidu::paddle_serving::configure::write_proto_conf(
&resource_conf, output_dir, resource_conf_file);
if (ret != 0) {
......
......@@ -17,9 +17,6 @@ package agent
import (
"errors"
_ "github.com/Badangel/logex"
"github.com/Badangel/pipeline"
"os/exec"
"strconv"
"strings"
"sync"
)
......@@ -89,26 +86,3 @@ func GetMaster(master string) (host, port string, err error) {
return MasterHost[0], MasterPort[0], nil
}
}
func init() {
dfCmd := "df -h | grep -E '/home|/ssd'"
stdout, _, err := pipeline.Run(exec.Command("/bin/bash", "-c", dfCmd))
if err == nil && stdout.String() != "" {
t := strings.TrimSpace(stdout.String())
diskLi := strings.Split(t, "\n")
for _, diskStr := range diskLi {
disk := strings.Fields(diskStr)
usedPercent, _ := strconv.Atoi(strings.TrimRight(disk[4], "%"))
if usedPercent <= 40 {
disks = append(disks, disk[5])
}
}
}
if len(disks) == 0 {
disks = append(disks, "/home")
}
//logex.Debugf("available disks found: (%+v)", disks)
}
......@@ -132,7 +132,7 @@ func getHostname(ip string) (hostname string, err error) {
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
} else {
if len(hostnames) > 0 {
hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")]
hostname = hostnames[0]
} else {
hostname = ip
}
......
......@@ -725,7 +725,7 @@ func DoDownloadIndividual(source, downloadDir string, isService bool, timeOut in
func checkSources(source string) ([]string, error) {
sources := strings.Split(source, ";")
for i := 0; i < len(sources); i++ {
if sources[i] == "" || !strings.HasPrefix(sources[i], "ftp://") {
if sources[i] == "" || (!strings.HasPrefix(sources[i], "ftp://") && !strings.HasPrefix(sources[i], "http://")) {
return sources, errors.New("Invalid sources")
}
}
......
......@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <signal.h>
#include <sys/stat.h>
#include <brpc/server.h>
......
[default]
dict_name: test_dict
mode: base_only
storage_place: LOCAL
download_mode: http
wget_port: 8009
buildtool_local: /home/work/Serving/build/output/bin/cube-builder
donefile_address: http://127.0.0.1/home/work/dangyifei/donefile
output_address: /home/work/dangyifei/test-transfer/test_data/output
tmp_address: /home/work/dangyifei/test-transfer/test_data/tmp
shard_num: 1
copy_num: 2
shard_num: 2
copy_num: 1
deploy_path: /home/work/test_dict
transfer_address: 127.0.0.1
[cube_agent]
agent0_0: 127.0.0.1:8001
cube0_0: 127.0.0.1:8000:/ssd2/cube_open
agent0_1: 127.0.0.1:8001
cube0_1: 127.0.0.1:8000:/home/disk1/cube_open
agent1_0: 127.0.0.1:8001
cube1_0: 127.0.0.1:8000:/home/disk1/cube_open
......@@ -95,13 +95,27 @@ Log options:
}
logex.Notice(">>> Mode:", transfer.Dict.DictMode)
transfer.Dict.StoragePlace = configMgr.Read("default", "storage_place")
if transfer.Dict.StoragePlace == "" || transfer.Dict.StoragePlace != "LOCAL" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] StoragePlace in config_file! only support Local")
transfer.Dict.DownloadMode = configMgr.Read("default", "download_mode")
if transfer.Dict.DownloadMode != "http" && transfer.Dict.DownloadMode != "ftp" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] download_mode in config_file! only support ftp or http")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> StoragePlace:", transfer.Dict.StoragePlace)
logex.Notice(">>> DownloadMode:", transfer.Dict.DownloadMode)
transfer.Dict.WgetPort = configMgr.Read("default", "wget_port")
if transfer.Dict.WgetPort == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] wget_port in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
var wget_port int
wget_port, err = strconv.Atoi(transfer.Dict.WgetPort)
if err != nil {
logex.Fatal("wget_port form is not right need int")
os.Exit(1)
}
logex.Notice(">>> WgetPort:", wget_port)
transfer.BuildToolLocal = configMgr.Read("default", "buildtool_local")
if transfer.BuildToolLocal == "" {
......
......@@ -72,7 +72,7 @@ func CmdInstsDownload() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[download cmd]%v:%v", address, json_params)
go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i])
......@@ -121,7 +121,7 @@ func CmdInstsReload() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[reload cmd]%v:%v", address, json_params)
......@@ -170,7 +170,7 @@ func CmdInstsEnable() {
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
json_params.Source = dict.GetFileHead(Dict.DownloadMode, TransferAddr, Dict.WgetPort) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[enable cmd]%v:%v", address, json_params)
......
......@@ -24,7 +24,8 @@ type DictInfo struct {
DonefileAddress string `json:"donefile_addr"`
OutputAddress string `json:"output_addr"`
TmpAddress string `json:"tmp_addr"`
StoragePlace string `json:"storage_place"`
DownloadMode string `json:"download_mode"`
WgetPort string `json:"wget_port"`
DownloadSuccInsts int `json:"download_inst"`
ReloadSuccInsts int `json:"reload_insts"`
EnableSuccInsts int `json:"enable_insts"`
......
......@@ -36,7 +36,7 @@ type DictShardInfo struct {
IsActive bool `json:"is_active,omitempty"`
}
func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace string, transferaddr string)(info DictShardInfo){
func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, downloadMode string, transferAddr string, wgetPort string)(info DictShardInfo){
info.Name = dictVersionInfo.DictName
info.Version = strconv.Itoa(dictVersionInfo.Version)
info.Depend = strconv.Itoa(dictVersionInfo.Depend)
......@@ -44,16 +44,17 @@ func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace
info.Key = strconv.Itoa(dictVersionInfo.Key)
info.Mode = dictVersionInfo.Mode
info.Shard = shard
info.Source = GetFileHead(storagePlace, transferaddr) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar"
info.Source = GetFileHead(downloadMode, transferAddr, wgetPort) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar"
return
}
func GetFileHead(storagePlace string, transferaddr string) string {
if storagePlace == "LOCAL"{
return "ftp://" + transferaddr
func GetFileHead(downloadMode string, transferAddr string, wgetPort string) string {
if downloadMode == "http" {
return HTTP_HEADER + transferAddr + ":" + wgetPort
} else if downloadMode == "ftp" {
return FTP_HEADER + transferAddr + ":" + wgetPort
} else {
return ""
}
}
\ No newline at end of file
......@@ -66,11 +66,11 @@ func GetDictScaler(subpath string, m map[string]string) (string, string, int, er
}
for _, version := range Dict.CurrentVersionInfo {
info := dict.GetDictShardScaler(shard, version, Dict.StoragePlace, TransferAddr)
info := dict.GetDictShardScaler(shard, version, Dict.DownloadMode, TransferAddr, Dict.WgetPort)
infos = append(infos, info)
}
if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying {
info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.StoragePlace, TransferAddr)
info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.DownloadMode, TransferAddr, Dict.WgetPort)
infos = append(infos, info)
}
......
engines {
name: "image_classification_resnet"
type: "FLUID_CPU_NATIVE_DIR"
type: "FLUID_CPU_ANALYSIS_DIR"
reloadable_meta: "./data/model/paddle/fluid_time_file"
reloadable_type: "timestamp_ne"
model_data_path: "./data/model/paddle/fluid/SE_ResNeXt50_32x4d"
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
enable_memory_optimization: true
static_optimization: false
force_update_static_cache: false
}
engines {
name: "text_classification_bow"
type: "FLUID_CPU_ANALYSIS_DIR"
......
......@@ -18,9 +18,8 @@ services {
其中
port: 该字段标明本机serving实例启动的监听端口。默认为8010。还可以通过--port=8010命令行参数指定。
services: 可以配置多个services。Paddle Serving被设计为单个Serving实例可以同时承载多个预测服务,服务间通过service name进行区分。例如以下代码配置2个预测服务:
- port: 该字段标明本机serving实例启动的监听端口。默认为8010。还可以通过--port=8010命令行参数指定。
- services: 可以配置多个services。Paddle Serving被设计为单个Serving实例可以同时承载多个预测服务,服务间通过service name进行区分。例如以下代码配置2个预测服务:
```JSON
port: 8010
services {
......@@ -33,7 +32,7 @@ services {
}
```
service.name: 请填写serving/proto/xx.proto文件的service名称,例如,在serving/proto/image_class.proto中,service名称如下声明:
- service.name: 请填写serving/proto/xx.proto文件的service名称,例如,在serving/proto/image_class.proto中,service名称如下声明:
```JSON
service ImageClassifyService {
rpc inference(Request) returns (Response);
......@@ -43,11 +42,11 @@ service ImageClassifyService {
```
则service name就是`ImageClassifyService`
service.workflows: 用于指定该service下所配的workflow列表。可以配置多个workflow。在本例中,为`ImageClassifyService`配置了一个workflow:`workflow1``workflow1`的具体定义在workflow.prototxt
- service.workflows: 用于指定该service下所配的workflow列表。可以配置多个workflow。在本例中,为`ImageClassifyService`配置了一个workflow:`workflow1``workflow1`的具体定义在workflow.prototxt
## 2. workflow.prototxt
workflow.prototxt用来描述每一个具体的workflow,他的protobuf格式可参考`configure/server_configure.protobuf``Workflow`类型。具体的磁盘文件路径可通过--workflow_path和--workflow_file指定。一个例子如下:
workflow.prototxt用来描述每一个具体的workflow,他的protobuf格式可参考`configure/server_configure.protobuf``Workflow`类型。具体的磁盘文件路径可通过`--workflow_path``--workflow_file`指定。一个例子如下:
```JSON
workflows {
......@@ -86,32 +85,32 @@ workflows {
```
以上样例配置了2个workflow:`workflow1``workflow2`。以`workflow1`为例:
name: workflow名称,用于从service.prototxt索引到具体的workflow
workflow_type: 可选"Sequence", "Parallel",表示本workflow下节点所代表的OP是否可并行。**当前只支持Sequence类型,如配置了Parallel类型,则该workflow不会被执行**
nodes: 用于串联成workflow的所有节点,可配置多个nodes。nodes间通过配置dependencies串联起来
node.name: 随意,建议取一个能代表当前node所执行OP的类
node.type: 当前node所执行OP的类名称,与serving/op/下每个具体的OP类的名称对应
node.dependencies: 依赖的上游node列表
node.dependencies.name: 与workflow内节点的name保持一致
node.dependencies.mode: RO-Read Only, RW-Read Write
- name: workflow名称,用于从service.prototxt索引到具体的workflow
- workflow_type: 可选"Sequence", "Parallel",表示本workflow下节点所代表的OP是否可并行。**当前只支持Sequence类型,如配置了Parallel类型,则该workflow不会被执行**
- nodes: 用于串联成workflow的所有节点,可配置多个nodes。nodes间通过配置dependencies串联起来
- node.name: 随意,建议取一个能代表当前node所执行OP的类
- node.type: 当前node所执行OP的类名称,与serving/op/下每个具体的OP类的名称对应
- node.dependencies: 依赖的上游node列表
- node.dependencies.name: 与workflow内节点的name保持一致
- node.dependencies.mode: RO-Read Only, RW-Read Write
# 3. resource.prototxt
Serving端resource配置的入口是resource.prototxt,用于配置模型信息。它的protobuf格式参考`configure/proto/server_configure.proto`的ResourceConf。具体的磁盘文件路径可用--resource_path和--resource_file指定。样例如下:
Serving端resource配置的入口是resource.prototxt,用于配置模型信息。它的protobuf格式参考`configure/proto/server_configure.proto`的ResourceConf。具体的磁盘文件路径可用`--resource_path``--resource_file`指定。样例如下:
```JSON
model_manager_path: ./conf
model_manager_file: model_toolkit.prototxt
model_toolkit_path: "./conf"
model_toolkit_file: "model_toolkit.prototxt"
cube_config_file: "./conf/cube.conf"
```
主要用来指定model_toolkit.prototxt路径
其中:
- model_toolkit_path:用来指定model_toolkit.prototxt所在的目录
- model_toolkit_file: 用来指定model_toolkit.prototxt所在的文件名
- cube_config_file: 用来指定cube配置文件所在路径与文件名
Cube是Paddle Serving中用于大规模稀疏参数的组件。
# 4. model_toolkit.prototxt
......@@ -127,14 +126,18 @@ engines {
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
sparse_param_service_type: LOCAL
sparse_param_service_table_name: "local_kv"
enable_memory_optimization: true
static_optimization: false
force_update_static_cache: false
}
```
其中
name: 模型名称。InferManager通过此名称,找到要使用的模型和预测引擎。可参考serving/op/classify_op.h与serving/op/classify_op.cpp的InferManager::instance().infer()方法的参数来了解。
type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cpp找到当前注册的预测引擎列表
- name: 模型名称。InferManager通过此名称,找到要使用的模型和预测引擎。可参考serving/op/classify_op.h与serving/op/classify_op.cpp的InferManager::instance().infer()方法的参数来了解。
- type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cpp找到当前注册的预测引擎列表
|预测引擎|含义|
|--------|----|
......@@ -152,9 +155,8 @@ type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cp
Analysis API在模型加载过程中,会对模型计算逻辑进行多种优化,包括但不限于zero copy tensor,相邻OP的fuse等。**但优化逻辑不是一定对所有模型都有加速作用,有时甚至会有反作用,请以实测结果为准**
reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值
reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/none
- reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值
- reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/none
|reloadable_type|含义|
|---------------|----|
......@@ -163,13 +165,22 @@ reloadable_type: 检查reload条件:timestamp_ne/timestamp_gt/md5sum/revision/
|md5sum|目前无用,配置后永远不reload|
|revision|目前无用,配置后用于不reload|
model_data_path: 模型文件路径
runtime_thread_num: 若大于0, 则启用bsf多线程调度框架,在每个预测bthread worker内启动多线程预测。要注意的是,当启用worker内多线程预测,workflow中OP需要用Serving框架的BatchTensor类做预测的输入和输出 (predictor/framework/infer_data.h, `class BatchTensor`)。
batch_infer_size: 启用bsf多线程预测时,每个预测线程的batch size
enable_batch_align:
- model_data_path: 模型文件路径
- runtime_thread_num: 若大于0, 则启用bsf多线程调度框架,在每个预测bthread worker内启动多线程预测。要注意的是,当启用worker内多线程预测,workflow中OP需要用Serving框架的BatchTensor类做预测的输入和输出 (predictor/framework/infer_data.h, `class BatchTensor`)。
- batch_infer_size: 启用bsf多线程预测时,每个预测线程的batch size
- enable_batch_align:
- sparse_param_service_type: 枚举类型,可选参数,大规模稀疏参数服务类型
|sparse_param_service_type|含义|
|-------------------------|--|
|NONE|不使用大规模稀疏参数服务|
|LOCAL|单机本地大规模稀疏参数服务,以rocksdb作为引擎|
|REMOTE|分布式大规模稀疏参数服务,以Cube作为引擎|
- sparse_param_service_table_name: 可选参数,大规模稀疏参数服务承载本模型所用参数的表名。
- enable_memory_optimization: bool类型,可选参数,是否启用内存优化。只在使用fluid Analysis预测API时有意义。需要说明的是,在GPU预测时,会执行显存优化
- static_optimization: bool类型,是否执行静态优化。只有当启用内存优化时有意义。
- force_update_static_cache: bool类型,是否强制更新静态优化cache。只有当启用内存优化时有意义。
## 5. 命令行配置参数
......
......@@ -93,7 +93,7 @@ class FluidFamilyCore {
return true;
}
virtual int create(const std::string& data_path) = 0;
virtual int create(const predictor::InferEngineCreationParams& params) = 0;
virtual int clone(void* origin_core) {
if (origin_core == NULL) {
......@@ -119,7 +119,8 @@ class FluidFamilyCore {
// infer interface
class FluidCpuAnalysisCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -131,6 +132,12 @@ class FluidCpuAnalysisCore : public FluidFamilyCore {
analysis_config.SetProgFile(data_path + "/__model__");
analysis_config.DisableGpu();
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
analysis_config.SwitchSpecifyInputNames(true);
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
......@@ -147,7 +154,8 @@ class FluidCpuAnalysisCore : public FluidFamilyCore {
class FluidCpuNativeCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -177,7 +185,8 @@ class FluidCpuNativeCore : public FluidFamilyCore {
class FluidCpuAnalysisDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -189,6 +198,12 @@ class FluidCpuAnalysisDirCore : public FluidFamilyCore {
analysis_config.DisableGpu();
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......@@ -204,7 +219,8 @@ class FluidCpuAnalysisDirCore : public FluidFamilyCore {
class FluidCpuNativeDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -380,7 +396,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
virtual ~FluidCpuWithSigmoidCore() {}
public:
int create(const std::string& model_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string model_path = params.get_path();
size_t pos = model_path.find_last_of("/\\");
std::string conf_path = model_path.substr(0, pos);
std::string conf_file = model_path.substr(pos);
......@@ -393,7 +410,9 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
_core.reset(new SigmoidFluidModel);
std::string fluid_model_data_path = conf.dnn_model_path();
int ret = load_fluid_model(fluid_model_data_path);
predictor::InferEngineCreationParams new_params(params);
new_params.set_path(fluid_model_data_path);
int ret = load_fluid_model(new_params);
if (ret < 0) {
LOG(ERROR) << "fail to load fluid model.";
return -1;
......@@ -442,7 +461,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
virtual SigmoidFluidModel* get() { return _core.get(); }
virtual int load_fluid_model(const std::string& data_path) = 0;
virtual int load_fluid_model(
const predictor::InferEngineCreationParams& params) = 0;
int softmax(float x, double& o) { // NOLINT
return _core->_sigmoid_core->softmax(x, o);
......@@ -454,7 +474,8 @@ class FluidCpuWithSigmoidCore : public FluidFamilyCore {
class FluidCpuNativeDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -483,7 +504,8 @@ class FluidCpuNativeDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
class FluidCpuAnalysisDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -495,6 +517,12 @@ class FluidCpuAnalysisDirWithSigmoidCore : public FluidCpuWithSigmoidCore {
analysis_config.DisableGpu();
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core->_fluid_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......
......@@ -95,7 +95,7 @@ class FluidFamilyCore {
return true;
}
virtual int create(const std::string& data_path) = 0;
virtual int create(const predictor::InferEngineCreationParams& params) = 0;
virtual int clone(void* origin_core) {
if (origin_core == NULL) {
......@@ -121,7 +121,8 @@ class FluidFamilyCore {
// infer interface
class FluidGpuAnalysisCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -133,7 +134,12 @@ class FluidGpuAnalysisCore : public FluidFamilyCore {
analysis_config.SetProgFile(data_path + "/__model__");
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
analysis_config.SwitchSpecifyInputNames(true);
AutoLock lock(GlobalPaddleCreateMutex::instance());
......@@ -151,7 +157,8 @@ class FluidGpuAnalysisCore : public FluidFamilyCore {
class FluidGpuNativeCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -180,7 +187,8 @@ class FluidGpuNativeCore : public FluidFamilyCore {
class FluidGpuAnalysisDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -192,7 +200,11 @@ class FluidGpuAnalysisDirCore : public FluidFamilyCore {
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core =
......@@ -209,7 +221,8 @@ class FluidGpuAnalysisDirCore : public FluidFamilyCore {
class FluidGpuNativeDirCore : public FluidFamilyCore {
public:
int create(const std::string& data_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -385,7 +398,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
virtual ~FluidGpuWithSigmoidCore() {}
public:
int create(const std::string& model_path) {
int create(const predictor::InferEngineCreationParams& params) {
std::string model_path = params.get_path();
size_t pos = model_path.find_last_of("/\\");
std::string conf_path = model_path.substr(0, pos);
std::string conf_file = model_path.substr(pos);
......@@ -398,7 +412,9 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
_core.reset(new SigmoidFluidModel);
std::string fluid_model_data_path = conf.dnn_model_path();
int ret = load_fluid_model(fluid_model_data_path);
predictor::InferEngineCreationParams new_params(params);
new_params.set_path(fluid_model_data_path);
int ret = load_fluid_model(new_params);
if (ret < 0) {
LOG(ERROR) << "fail to load fluid model.";
return -1;
......@@ -447,7 +463,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
virtual SigmoidFluidModel* get() { return _core.get(); }
virtual int load_fluid_model(const std::string& data_path) = 0;
virtual int load_fluid_model(
const predictor::InferEngineCreationParams& params) = 0;
int softmax(float x, double& o) { // NOLINT
return _core->_sigmoid_core->softmax(x, o);
......@@ -459,7 +476,8 @@ class FluidGpuWithSigmoidCore : public FluidFamilyCore {
class FluidGpuNativeDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -488,7 +506,8 @@ class FluidGpuNativeDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
class FluidGpuAnalysisDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
public:
int load_fluid_model(const std::string& data_path) {
int load_fluid_model(const predictor::InferEngineCreationParams& params) {
std::string data_path = params.get_path();
if (access(data_path.c_str(), F_OK) == -1) {
LOG(ERROR) << "create paddle predictor failed, path not exits: "
<< data_path;
......@@ -500,7 +519,12 @@ class FluidGpuAnalysisDirWithSigmoidCore : public FluidGpuWithSigmoidCore {
analysis_config.EnableUseGpu(100, FLAGS_gpuid);
analysis_config.SwitchSpecifyInputNames(true);
analysis_config.SetCpuMathLibraryNumThreads(1);
analysis_config.EnableMemoryOptim(false, false);
if (params.enable_memory_optimization()) {
analysis_config.EnableMemoryOptim(params.static_optimization(),
params.force_update_static_cache());
}
AutoLock lock(GlobalPaddleCreateMutex::instance());
_core->_fluid_core =
paddle::CreatePaddlePredictor<paddle::AnalysisConfig>(analysis_config);
......
......@@ -29,6 +29,55 @@ namespace predictor {
using configure::ModelToolkitConf;
class InferEngineCreationParams {
public:
InferEngineCreationParams() {
_path = "";
_enable_memory_optimization = false;
_static_optimization = false;
_force_update_static_cache = false;
}
void set_path(const std::string& path) { _path = path; }
void set_enable_memory_optimization(bool enable_memory_optimization) {
_enable_memory_optimization = enable_memory_optimization;
}
bool enable_memory_optimization() const {
return _enable_memory_optimization;
}
void set_static_optimization(bool static_optimization = false) {
_static_optimization = static_optimization;
}
void set_force_update_static_cache(bool force_update_static_cache = false) {
_force_update_static_cache = force_update_static_cache;
}
bool static_optimization() const { return _static_optimization; }
bool force_update_static_cache() const { return _force_update_static_cache; }
std::string get_path() const { return _path; }
void dump() const {
LOG(INFO) << "InferEngineCreationParams: "
<< "model_path = " << _path << ", "
<< "enable_memory_optimization = " << _enable_memory_optimization
<< ", "
<< "static_optimization = " << _static_optimization << ", "
<< "force_update_static_cache = " << _force_update_static_cache;
}
private:
std::string _path;
bool _enable_memory_optimization;
bool _static_optimization;
bool _force_update_static_cache;
};
class InferEngine {
public:
virtual ~InferEngine() {}
......@@ -75,7 +124,7 @@ class ReloadableInferEngine : public InferEngine {
typedef im::bsf::Task<Tensor, Tensor> TaskT;
virtual int load(const std::string& data_path) = 0;
virtual int load(const InferEngineCreationParams& params) = 0;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
......@@ -84,7 +133,31 @@ class ReloadableInferEngine : public InferEngine {
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
if (!check_need_reload() || load(_model_data_path) != 0) {
bool enable_memory_optimization = false;
if (conf.has_enable_memory_optimization()) {
enable_memory_optimization = conf.enable_memory_optimization();
}
bool static_optimization = false;
if (conf.has_static_optimization()) {
static_optimization = conf.static_optimization();
}
bool force_update_static_cache = false;
if (conf.has_force_update_static_cache()) {
force_update_static_cache = conf.force_update_static_cache();
}
_infer_engine_params.set_path(_model_data_path);
if (enable_memory_optimization) {
_infer_engine_params.set_enable_memory_optimization(true);
_infer_engine_params.set_static_optimization(static_optimization);
_infer_engine_params.set_force_update_static_cache(
force_update_static_cache);
}
if (!check_need_reload() || load(_infer_engine_params) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_data_path;
return -1;
}
......@@ -175,7 +248,7 @@ class ReloadableInferEngine : public InferEngine {
int reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_data_path << "].";
return load(_model_data_path);
return load(_infer_engine_params);
}
return 0;
}
......@@ -243,6 +316,7 @@ class ReloadableInferEngine : public InferEngine {
protected:
std::string _model_data_path;
InferEngineCreationParams _infer_engine_params;
private:
std::string _reload_tag_file;
......@@ -281,32 +355,35 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
return ReloadableInferEngine::proc_initialize(conf, version);
}
virtual int load(const std::string& model_data_dir) {
virtual int load(const InferEngineCreationParams& params) {
if (_reload_vec.empty()) {
return 0;
}
for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) {
if (load_data(_reload_vec[ti], model_data_dir) != 0) {
if (load_data(_reload_vec[ti], params) != 0) {
LOG(ERROR) << "Failed reload engine model: " << ti;
return -1;
}
}
LOG(WARNING) << "Succ load engine, path: " << model_data_dir;
LOG(WARNING) << "Succ load engine, path: " << params.get_path();
return 0;
}
int load_data(ModelData<EngineCore>* md, const std::string& data_path) {
int load_data(ModelData<EngineCore>* md,
const InferEngineCreationParams& params) {
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
if (!md->cores[next_idx] || md->cores[next_idx]->create(data_path) != 0) {
LOG(ERROR) << "Failed create model, path: " << data_path;
params.dump();
if (!md->cores[next_idx] || md->cores[next_idx]->create(params) != 0) {
LOG(ERROR) << "Failed create model, path: " << params.get_path();
return -1;
}
md->current_idx = next_idx;
......@@ -321,8 +398,9 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
}
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _model_data_path) != 0) {
LOG(ERROR) << "Failed create thread data from " << _model_data_path;
if (!md || load_data(md, _infer_engine_params) != 0) {
LOG(ERROR) << "Failed create thread data from "
<< _infer_engine_params.get_path();
return -1;
}
......@@ -383,17 +461,16 @@ class CloneDBReloadableInferEngine
return DBReloadableInferEngine<EngineCore>::proc_initialize(conf, version);
}
virtual int load(const std::string& model_data_dir) {
virtual int load(const InferEngineCreationParams& params) {
// 加载进程级模型数据
if (!_pd ||
DBReloadableInferEngine<EngineCore>::load_data(_pd, model_data_dir) !=
0) {
LOG(ERROR) << "Failed to create common model from [" << model_data_dir
DBReloadableInferEngine<EngineCore>::load_data(_pd, params) != 0) {
LOG(ERROR) << "Failed to create common model from [" << params.get_path()
<< "].";
return -1;
}
LOG(WARNING) << "Succ load common model[" << _pd->cores[_pd->current_idx]
<< "], path[" << model_data_dir << "].";
<< "], path[" << params.get_path() << "].";
if (DBReloadableInferEngine<EngineCore>::_reload_vec.empty()) {
return 0;
......@@ -409,7 +486,7 @@ class CloneDBReloadableInferEngine
}
}
LOG(WARNING) << "Succ load clone model, path[" << model_data_dir << "]";
LOG(WARNING) << "Succ load clone model, path[" << params.get_path() << "]";
return 0;
}
......
......@@ -110,12 +110,6 @@ int Resource::cube_initialize(const std::string& path,
}
int err = 0;
std::string cube_config_path = resource_conf.cube_config_path();
if (err != 0) {
LOG(ERROR) << "reade cube_config_path failed, path[" << path << "], file["
<< cube_config_path << "]";
return -1;
}
std::string cube_config_file = resource_conf.cube_config_file();
if (err != 0) {
LOG(ERROR) << "reade cube_config_file failed, path[" << path << "], file["
......@@ -124,8 +118,8 @@ int Resource::cube_initialize(const std::string& path,
}
err = CubeAPI::instance()->init(cube_config_file.c_str());
if (err != 0) {
LOG(ERROR) << "failed initialize cube, config: " << cube_config_path << "/"
<< cube_config_file << " error code : " << err;
LOG(ERROR) << "failed initialize cube, config: " << cube_config_file
<< " error code : " << err;
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册