diff --git a/CMakeLists.txt b/CMakeLists.txt
old mode 100755
new mode 100644
index 4cb661faf996bc32424f88103f238088efd08520..cad0bb5bc638e08bd05a573fe548c7a81323435c
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -30,7 +30,7 @@ find_package(Threads REQUIRED)
find_package(CUDA QUIET)
include(simd)
-
+# SET(CMAKE_BUILD_TYPE "Debug")
# CMAKE_BUILD_TYPE
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "RelWithDebInfo" CACHE STRING
diff --git a/README.md b/README.md
index ab6b1c0148315f2d19838b67a84cc732f175c944..6c6d0924bf44137dc463fb68599713835d4cb0f2 100644
--- a/README.md
+++ b/README.md
@@ -175,9 +175,12 @@ python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --p
| Argument | Type | Default | Description |
| ---------------------------------------------- | ---- | ------- | ----------------------------------------------------- |
-| `thread` | int | `4` | Concurrency of current service |
+| `thread` | int | `2` | Number of brpc service thread |
+| `op_num` | int[]| `0` | Thread Number for each model in asynchronous mode |
+| `op_max_batch` | int[]| `0` | Batch Number for each model in asynchronous mode |
+| `gpu_ids` | str[]| `"-1"` | Gpu card id for each model |
| `port` | int | `9292` | Exposed port of current service to users |
-| `model` | str | `""` | Path of paddle model directory to be served |
+| `model` | str[]| `""` | Path of paddle model directory to be served |
| `mem_optim_off` | - | - | Disable memory / graphic memory optimization |
| `ir_optim` | bool | False | Enable analysis and optimization of calculation graph |
| `use_mkl` (Only for cpu version) | - | - | Run inference with MKL |
@@ -186,7 +189,24 @@ python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --p
| `use_xpu` | - | - | Run PaddleLite inference with Baidu Kunlun XPU |
| `precision` | str | FP32 | Precision Mode, support FP32, FP16, INT8 |
| `use_calib` | bool | False | Only for deployment with TensorRT |
-
+| `gpu_multi_stream` | bool | False | EnableGpuMultiStream to get larger QPS |
+
+#### Description of asynchronous model
+ Asynchronous mode is suitable for 1. When the number of requests is very large, 2. When multiple models are concatenated and you want to specify the concurrency number of each model.
+ Asynchronous mode helps to improve the throughput (QPS) of service, but for a single request, the delay will increase slightly.
+ In asynchronous mode, each model will start n threads of the number you specify, and each thread contains a model instance. In other words, each model is equivalent to a thread pool containing N threads, and the task is taken from the task queue of the thread pool to execute.
+ In asynchronous mode, each RPC server thread is only responsible for putting the request into the task queue of the model thread pool. After the task is executed, the completed task is removed from the task queue.
+ In the above table, the number of RPC server threads is specified by --thread, and the default value is 2.
+ --op_num specifies the number of threads in the thread pool of each model. The default value is 0, indicating that asynchronous mode is not used.
+ --op_max_batch specifies the number of batches for each model. The default value is 32. It takes effect when --op_num is not 0.
+#### When you want a model to use multiple GPU cards.
+python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 --gpu_ids 0,1,2
+#### When you want 2 models.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292
+#### When you want 2 models, and want each of them use multiple GPU cards.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292 --gpu_ids 0,1 1,2
+#### When a service contains two models, and each model needs to specify multiple GPU cards, and needs asynchronous mode, each model specifies different concurrency number.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292 --gpu_ids 0,1 1,2 --op_num 4 8
```python
diff --git a/README_CN.md b/README_CN.md
index d728071dbd80ae2400a6e95b5ccb06010fd7ef06..a1bb9f9e7c513a3d772cce2d56d0bcd76e3548f9 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -172,19 +172,40 @@ python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --p
```
-| Argument | Type | Default | Description |
-| ---------------------------------------------- | ---- | ------- | ------------------------------------------------------ |
-| `thread` | int | `4` | Concurrency of current service |
-| `port` | int | `9292` | Exposed port of current service to users |
-| `name` | str | `""` | Service name, can be used to generate HTTP request url |
-| `model` | str | `""` | Path of paddle model directory to be served |
-| `mem_optim_off` | - | - | Disable memory optimization |
-| `ir_optim` | bool | False | Enable analysis and optimization of calculation graph |
-| `use_mkl` (Only for cpu version) | - | - | Run inference with MKL |
-| `use_trt` (Only for Cuda>=10.1 version) | - | - | Run inference with TensorRT |
-| `use_lite` (Only for Intel x86 CPU or ARM CPU) | - | - | Run PaddleLite inference |
-| `use_xpu` | - | - | Run PaddleLite inference with Baidu Kunlun XPU |
-| `precision` | str | FP32 | Precision Mode, support FP32, FP16, INT8 |
+| Argument | Type | Default | Description |
+| ---------------------------------------------- | ---- | ------- | ----------------------------------------------------- |
+| `thread` | int | `2` | Number of brpc service thread |
+| `op_num` | int[]| `0` | Thread Number for each model in asynchronous mode |
+| `op_max_batch` | int[]| `32` | Batch Number for each model in asynchronous mode |
+| `gpu_ids` | str[]| `"-1"` | Gpu card id for each model |
+| `port` | int | `9292` | Exposed port of current service to users |
+| `model` | str[]| `""` | Path of paddle model directory to be served |
+| `mem_optim_off` | - | - | Disable memory / graphic memory optimization |
+| `ir_optim` | bool | False | Enable analysis and optimization of calculation graph |
+| `use_mkl` (Only for cpu version) | - | - | Run inference with MKL |
+| `use_trt` (Only for trt version) | - | - | Run inference with TensorRT |
+| `use_lite` (Only for Intel x86 CPU or ARM CPU) | - | - | Run PaddleLite inference |
+| `use_xpu` | - | - | Run PaddleLite inference with Baidu Kunlun XPU |
+| `precision` | str | FP32 | Precision Mode, support FP32, FP16, INT8 |
+| `use_calib` | bool | False | Only for deployment with TensorRT |
+| `gpu_multi_stream` | bool | False | EnableGpuMultiStream to get larger QPS |
+
+#### 异步模型的说明
+ 异步模式适用于1、请求数量非常大的情况,2、多模型串联,想要分别指定每个模型的并发数的情况。
+ 异步模式有助于提高Service服务的吞吐(QPS),但对于单次请求而言,时延会有少量增加。
+ 异步模式中,每个模型会启动您指定个数的N个线程,每个线程中包含一个模型实例,换句话说每个模型相当于包含N个线程的线程池,从线程池的任务队列中取任务来执行。
+ 异步模式中,各个RPC Server的线程只负责将Request请求放入模型线程池的任务队列中,等任务被执行完毕后,再从任务队列中取出已完成的任务。
+ 上表中通过 --thread 10 指定的是RPC Server的线程数量,默认值为2,--op_num 指定的是各个模型的线程池中线程数N,默认值为0,表示不使用异步模式。
+ --op_max_batch 指定的各个模型的batch数量,默认值为32,该参数只有当--op_num不为0时才生效。
+
+#### 当您的某个模型想使用多张GPU卡部署时.
+python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 --gpu_ids 0,1,2
+#### 当您的一个服务包含两个模型部署时.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292
+#### 当您的一个服务包含两个模型,且每个模型都需要指定多张GPU卡部署时.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292 --gpu_ids 0,1 1,2
+#### 当您的一个服务包含两个模型,且每个模型都需要指定多张GPU卡,且需要异步模式每个模型指定不同的并发数时.
+python3 -m paddle_serving_server.serve --model uci_housing_model_1 uci_housing_model_2 --thread 10 --port 9292 --gpu_ids 0,1 1,2 --op_num 4 8
diff --git a/cmake/external/brpc.cmake b/cmake/external/brpc.cmake
old mode 100644
new mode 100755
index 9fe5e89cbc89edd2238653b6cf5aeda41184a8a6..5f217566921adf3a6235ed11d7d126b56f58f506
--- a/cmake/external/brpc.cmake
+++ b/cmake/external/brpc.cmake
@@ -39,11 +39,11 @@ INCLUDE_DIRECTORIES(${BRPC_INCLUDE_DIR})
set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/leveldb|${THIRD_PARTY_PATH}/install/snappy|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog")
if(WITH_LITE)
- set(BRPC_REPO "https://github.com/zhangjun/incubator-brpc.git")
- set(BRPC_TAG "master")
+ set(BRPC_REPO "https://github.com/apache/incubator-brpc")
+ set(BRPC_TAG "1.0.0-rc01")
else()
- set(BRPC_REPO "https://github.com/wangjiawei04/brpc")
- set(BRPC_TAG "6d79e0b17f25107c35b705ea58d888083f59ff47")
+ set(BRPC_REPO "https://github.com/apache/incubator-brpc")
+ set(BRPC_TAG "1.0.0-rc01")
endif()
# If minimal .a is need, you can set WITH_DEBUG_SYMBOLS=OFF
diff --git a/cmake/external/zlib.cmake b/cmake/external/zlib.cmake
old mode 100755
new mode 100644
diff --git a/core/configure/CMakeLists.txt b/core/configure/CMakeLists.txt
old mode 100644
new mode 100755
index 32534fee141ee5b4b0b7b1eed580e1769deb5cff..846f5e2af66b821f71c3364e08ecce3edb70eaa7
--- a/core/configure/CMakeLists.txt
+++ b/core/configure/CMakeLists.txt
@@ -33,9 +33,9 @@ if (WITH_PYTHON)
add_custom_target(general_model_config_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(general_model_config_py_proto general_model_config_py_proto_init)
- py_grpc_proto_compile(multi_lang_general_model_service_py_proto SRCS proto/multi_lang_general_model_service.proto)
- add_custom_target(multi_lang_general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
- add_dependencies(multi_lang_general_model_service_py_proto multi_lang_general_model_service_py_proto_init)
+ py_grpc_proto_compile(general_model_service_py_proto SRCS proto/general_model_service.proto)
+ add_custom_target(general_model_service_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
+ add_dependencies(general_model_service_py_proto general_model_service_py_proto_init)
if (CLIENT)
py_proto_compile(sdk_configure_py_proto SRCS proto/sdk_configure.proto)
@@ -52,12 +52,13 @@ if (WITH_PYTHON)
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
-
- add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
+
+ add_custom_command(TARGET general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/proto
- COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_client/proto."
+ COMMENT "Copy generated general_model_service proto file into directory paddle_serving_client/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
+
endif()
if (APP)
@@ -83,12 +84,13 @@ if (WITH_PYTHON)
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMENT "Copy generated general_model_config proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
-
- add_custom_command(TARGET multi_lang_general_model_service_py_proto POST_BUILD
+
+ add_custom_command(TARGET general_model_service_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
COMMAND cp -f *.py ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/proto
- COMMENT "Copy generated multi_lang_general_model_service proto file into directory paddle_serving_server/proto."
+ COMMENT "Copy generated general_model_service proto file into directory paddle_serving_server/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
+
endif()
endif()
diff --git a/core/configure/proto/general_model_service.proto b/core/configure/proto/general_model_service.proto
new file mode 100644
index 0000000000000000000000000000000000000000..89ac489f8ae3b90b74c94a3f9f3c82711086cd64
--- /dev/null
+++ b/core/configure/proto/general_model_service.proto
@@ -0,0 +1,52 @@
+// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+package baidu.paddle_serving.predictor.general_model;
+option java_multiple_files = true;
+
+message Tensor {
+ repeated string data = 1;
+ repeated int32 int_data = 2;
+ repeated int64 int64_data = 3;
+ repeated float float_data = 4;
+ optional int32 elem_type =
+ 5; // 0 means int64, 1 means float32, 2 means int32, 3 means string
+ repeated int32 shape = 6; // shape should include batch
+ repeated int32 lod = 7; // only for fetch tensor currently
+ optional string name = 8; // get from the Model prototxt
+ optional string alias_name = 9; // get from the Model prototxt
+};
+
+message Request {
+ repeated Tensor tensor = 1;
+ repeated string fetch_var_names = 2;
+ optional bool profile_server = 3 [ default = false ];
+ required uint64 log_id = 4 [ default = 0 ];
+};
+
+message Response {
+ repeated ModelOutput outputs = 1;
+ repeated int64 profile_time = 2;
+};
+
+message ModelOutput {
+ repeated Tensor tensor = 1;
+ optional string engine_name = 2;
+}
+
+service GeneralModelService {
+ rpc inference(Request) returns (Response) {}
+ rpc debug(Request) returns (Response) {}
+};
diff --git a/core/configure/proto/multi_lang_general_model_service.proto b/core/configure/proto/multi_lang_general_model_service.proto
deleted file mode 100755
index 18fbcf760647e1694e738c0832fe45f4f7d9934f..0000000000000000000000000000000000000000
--- a/core/configure/proto/multi_lang_general_model_service.proto
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto2";
-
-package baidu.paddle_serving.multi_lang;
-
-option java_multiple_files = true;
-option java_package = "io.paddle.serving.grpc";
-option java_outer_classname = "ServingProto";
-
-message Tensor {
- optional bytes data = 1;
- repeated int32 int_data = 2;
- repeated int64 int64_data = 3;
- repeated float float_data = 4;
- optional int32 elem_type = 5;
- repeated int32 shape = 6;
- repeated int32 lod = 7; // only for fetch tensor currently
-};
-
-message FeedInst { repeated Tensor tensor_array = 1; };
-
-message FetchInst { repeated Tensor tensor_array = 1; };
-
-message InferenceRequest {
- repeated FeedInst insts = 1;
- repeated string feed_var_names = 2;
- repeated string fetch_var_names = 3;
- required bool is_python = 4 [ default = false ];
- required uint64 log_id = 5 [ default = 0 ];
-};
-
-message InferenceResponse {
- repeated ModelOutput outputs = 1;
- optional string tag = 2;
- required int32 err_code = 3;
-};
-
-message ModelOutput {
- repeated FetchInst insts = 1;
- optional string engine_name = 2;
-}
-
-message SetTimeoutRequest { required int32 timeout_ms = 1; }
-
-message SimpleResponse { required int32 err_code = 1; }
-
-message GetClientConfigRequest {}
-
-message GetClientConfigResponse { required string client_config_str = 1; }
-
-service MultiLangGeneralModelService {
- rpc Inference(InferenceRequest) returns (InferenceResponse) {}
- rpc SetTimeout(SetTimeoutRequest) returns (SimpleResponse) {}
- rpc GetClientConfig(GetClientConfigRequest)
- returns (GetClientConfigResponse) {}
-};
diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto
old mode 100755
new mode 100644
index 24fb62806476effdcf453cb7b4047122731106ea..5cace06420e29e1590218f63777c85bbcf504b29
--- a/core/configure/proto/server_configure.proto
+++ b/core/configure/proto/server_configure.proto
@@ -21,11 +21,12 @@ message EngineDesc {
required string reloadable_meta = 3;
required string reloadable_type = 4;
required string model_dir = 5;
- required int32 runtime_thread_num = 6;
- required int32 batch_infer_size = 7;
- required int32 enable_batch_align = 8;
- optional string version_file = 9;
- optional string version_type = 10;
+ repeated int32 gpu_ids = 6;
+ required int32 runtime_thread_num = 7;
+ required int32 batch_infer_size = 8;
+ required int32 enable_batch_align = 9;
+ optional string version_file = 10;
+ optional string version_type = 11;
/*
* Sparse Parameter Service type. Valid types are:
@@ -38,16 +39,17 @@ message EngineDesc {
LOCAL = 1;
REMOTE = 2;
}
- optional SparseParamServiceType sparse_param_service_type = 11;
- optional string sparse_param_service_table_name = 12;
- optional bool enable_memory_optimization = 13;
- optional bool enable_ir_optimization = 14;
- optional bool use_trt = 15;
- optional bool use_lite = 16;
- optional bool use_xpu = 17;
- optional bool use_gpu = 18;
- optional bool combined_model = 19;
- optional bool encrypted_model = 20;
+ optional SparseParamServiceType sparse_param_service_type = 12;
+ optional string sparse_param_service_table_name = 13;
+ optional bool enable_memory_optimization = 14;
+ optional bool enable_ir_optimization = 15;
+ optional bool use_trt = 16;
+ optional bool use_lite = 17;
+ optional bool use_xpu = 18;
+ optional bool use_gpu = 19;
+ optional bool combined_model = 20;
+ optional bool encrypted_model = 21;
+ optional bool gpu_multi_stream = 22;
};
// model_toolkit conf
diff --git a/core/cube/CMakeLists.txt b/core/cube/CMakeLists.txt
index f9dc4d2c2508720f450b4aee3aba5dfdd7ccd43b..a61d2df92a92bc26fabd4a3cf87c6db1dc1cc3f0 100644
--- a/core/cube/CMakeLists.txt
+++ b/core/cube/CMakeLists.txt
@@ -11,10 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
-
-#execute_process(COMMAND go env -w GO111MODULE=off)
add_subdirectory(cube-server)
add_subdirectory(cube-api)
add_subdirectory(cube-builder)
-#add_subdirectory(cube-transfer)
-#add_subdirectory(cube-agent)
+add_subdirectory(cube-transfer)
+add_subdirectory(cube-agent)
diff --git a/core/cube/cube-agent/CMakeLists.txt b/core/cube/cube-agent/CMakeLists.txt
index 30158aa506e53ec8a37d10aef4f29bfcd5a60d06..701f0c8a55e3326e1327f3b1f68458f99c60143b 100644
--- a/core/cube/cube-agent/CMakeLists.txt
+++ b/core/cube/cube-agent/CMakeLists.txt
@@ -15,7 +15,6 @@
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
project(cube-agent Go)
-
include(cmake/golang.cmake)
ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go)
diff --git a/core/cube/cube-agent/src/agent/http.go b/core/cube/cube-agent/src/agent/http.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-agent/src/agent/http_get.go b/core/cube/cube-agent/src/agent/http_get.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-agent/src/agent/http_post.go b/core/cube/cube-agent/src/agent/http_post.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-builder/CMakeLists.txt b/core/cube/cube-builder/CMakeLists.txt
old mode 100755
new mode 100644
index 65f77f4eb0ff16299d5ee54f192c2171ac5b956c..00278939b78235ba5f5b3042d347ad905ac3c8fe
--- a/core/cube/cube-builder/CMakeLists.txt
+++ b/core/cube/cube-builder/CMakeLists.txt
@@ -22,7 +22,7 @@ include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../)
add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h)
-add_dependencies(cube-builder jsoncpp boost)
+add_dependencies(cube-builder jsoncpp boost brpc)
set(DYNAMIC_LIB
gflags
@@ -39,4 +39,8 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
-install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
+install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kvtool.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
+
+install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kv_to_seqfile.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
+
+install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool/source DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
diff --git a/core/cube/cube-server/include/cube/slim_hash_map.h b/core/cube/cube-server/include/cube/slim_hash_map.h
index 761ce9214f628a824f257611c07b07dab2503a48..26e9cd8c5702810a3fcaa83eeaeac17cdae97ba1 100644
--- a/core/cube/cube-server/include/cube/slim_hash_map.h
+++ b/core/cube/cube-server/include/cube/slim_hash_map.h
@@ -212,7 +212,7 @@ class slim_hash_map {
int copy_data_from(const slim_hash_map& rhs) {
destroy();
-
+ LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize;
if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) {
@@ -231,7 +231,7 @@ class slim_hash_map {
<< sizeof(hash_node_t) * BLOCK_SIZE;
return -1;
}
-
+ LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE;
memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE);
@@ -265,11 +265,13 @@ class slim_hash_map {
}
size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]);
-
+ int node_cnt = 0;
while (node != NULL && node->data.first != key) {
+ LOG(INFO) << "node link get:" << node->data.first;
+ node_cnt++;
node = get_node(node->next);
}
-
+ LOG(INFO) << "key: " << key << " , found count: " << node_cnt;
if (node == NULL) {
return end();
}
@@ -390,7 +392,6 @@ class slim_hash_map {
if (node != NULL) {
return node->data.second;
}
-
return add_node(index, key)->data.second;
}
void clear() {
@@ -399,16 +400,16 @@ class slim_hash_map {
m_nFreeEntries = 0;
m_nSize = 0;
}
- bool load(const char* file) {
+ bool load(const char* file, uint32_t block_id) {
// clear();
+ // bias = 0 means base mode, bias = K means patch mode, and base dict has size K
int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast(malloc(size * 100000));
-
+ LOG(INFO) << "current block id: " << block_id;
if (fp == NULL || buf == NULL) {
return false;
}
-
size_t read_count;
bool err = false;
key_t key;
@@ -423,6 +424,8 @@ class slim_hash_map {
for (int i = 0; i < static_cast(read_count); ++i) {
key = *(reinterpret_cast(buf + i * size));
value = *(reinterpret_cast(buf + i * size + sizeof(key_t)));
+ value = ((uint64_t)block_id << 32) | value;
+ LOG(INFO) << "slim map key: " << key << " , value: " << value;
(*this)[key] = value;
}
}
@@ -557,7 +560,6 @@ class slim_hash_map {
}
hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize;
-
if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr);
@@ -569,7 +571,7 @@ class slim_hash_map {
}
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
-
+ //LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum;
if (block >= m_nBlockNum) {
try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
@@ -581,7 +583,6 @@ class slim_hash_map {
return NULL;
}
}
-
uint32_t addr = m_nNextEntry;
++m_nNextEntry;
hash_node_t* node = get_node(addr);
diff --git a/core/cube/cube-server/src/dict.cpp b/core/cube/cube-server/src/dict.cpp
index 05f401115ab5e95f8b014bf30bda71d8a10a74cb..dd21d518e61bd199108032c2e382d76d3b8b55a7 100644
--- a/core/cube/cube-server/src/dict.cpp
+++ b/core/cube/cube-server/src/dict.cpp
@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path,
bool in_mem,
const std::string& v_path) {
TIME_FLAG(load_start);
-
int ret = load_index(dict_path, v_path);
if (ret != E_OK) {
LOG(WARNING) << "load index failed";
return ret;
}
-
+ LOG(INFO) << "load index in mem mode: " << in_mem ;
if (in_mem) {
ret = load_data(dict_path, v_path);
if (ret != E_OK) {
@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path);
index_n_path.append(v_path);
index_n_path.append("/index.n");
+
+ uint32_t cur_block_id = 0;
+ if (_base_dict) cur_block_id = _base_dict->_block_set.size();
LOG(INFO) << "index file path: " << index_n_path;
-
+ //ERR HERE
std::unique_ptr pf(fopen(index_n_path.c_str(), "rb"),
&fclose);
if (pf.get() == NULL) {
@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
} else {
+ if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
+ LOG(ERROR) << "copy data from old index failed in patch mode";
+ return E_DATA_ERROR;
+ }
file_idx = 0;
LOG(INFO)
- << "index check file len failed in patch mode, set file_idx to 0";
+ << "index check fail, direct copy";
}
}
-
+ LOG(INFO) << "resize slim table, new count: " << count/2;
_slim_table.resize(count / 2);
char file[1024];
@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
dict_path.c_str(),
v_path.c_str(),
file_idx);
+ LOG(INFO) << "load file str: " << file;
if (stat(file, &fstat) < 0) {
if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist";
@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
<< (uint64_t)fstat.st_size;
return E_DATA_ERROR;
}
- LOG(INFO) << "loading from index." << file_idx;
- if (!_slim_table.load(file) || _slim_table.size() > count) {
+ LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size();
+ if (!_slim_table.load(file, cur_block_id)) {
return E_DATA_ERROR;
}
@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
}
int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
+ std::vector block_size;
+ uint64_t total_data_size = 0;
if (_base_dict) {
_block_set = _base_dict->_block_set;
+ LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size;
+ for (size_t i = 0; i < _block_set.size(); ++i) {
+ block_size.push_back(_block_set[i].size);
+ total_data_size += _block_set[i].size;
+ }
}
std::string data_n_path(dict_path);
@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
- std::vector block_size;
- uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0;
if (fread(reinterpret_cast(&size), sizeof(uint32_t), 1, pf) != 1) {
@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
block_size.push_back(size);
+ LOG(INFO) << "new block size: " << size;
total_data_size += size;
}
g_data_size << (total_data_size / 1024 / 1024);
@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
pf = NULL;
uint32_t old_size = _block_set.size();
+ LOG(INFO) << "load data old size: " << old_size;
for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) {
old_size = 0;
break;
}
}
- _block_set.resize(count);
+ LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size;
+ _block_set.resize(count + old_size);
for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024];
LOG(INFO) << "load from data." << i;
- snprintf(
- data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
-
+ //snprintf(
+ // data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
+ snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size);
FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) {
- LOG(WARNING) << "open data file [" << data_path << " failed";
+ LOG(WARNING) << "open data file [" << data_path << " ]failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
-
- _block_set[i].s_data.reset(
- reinterpret_cast(malloc(block_size[i] * sizeof(char))));
+ _block_set[i].s_data.reset(reinterpret_cast(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed";
fclose(data_file);
return E_OOM;
}
_block_set[i].size = block_size[i];
-
if (fread(reinterpret_cast(_block_set[i].s_data.get()),
sizeof(char),
_block_set[i].size,
@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
fclose(data_file);
return E_DATA_ERROR;
}
-
+ LOG(INFO) << "load new data to BlockSet succ";
+ for (size_t ii = 0; ii < 20; ++ii) {
+ LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]);
+ }
fclose(data_file);
}
@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag);
-
+ LOG(INFO) << "search key: " << id << " , addr: " << addr;
if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size;
char* block_data = NULL;
block_data = _block_set[id].s_data.get();
-
if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) {
@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
<< default_buffer_size;
return false;
}
+ LOG(INFO) << "seek key: " << key << " , addr: " << addr;
memcpy(buff,
(block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t));
diff --git a/core/cube/cube-transfer/CMakeLists.txt b/core/cube/cube-transfer/CMakeLists.txt
index 78e47c5b840631a3092f3a799e2424d370444a2e..2e9d3dede03c5b27bcd0e24eaa6584df343c09e2 100644
--- a/core/cube/cube-transfer/CMakeLists.txt
+++ b/core/cube/cube-transfer/CMakeLists.txt
@@ -18,11 +18,9 @@ project(cube-transfer Go)
include(cmake/golang.cmake)
-ExternalGoProject_Add(rfw github.com/mipearson/rfw)
-ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go)
-add_custom_target(logex
- COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get github.com/Badangel/logex
- DEPENDS rfw)
+ExternalGoProject_Add(transfer-rfw github.com/mipearson/rfw)
+ExternalGoProject_Add(transfer-docopt-go github.com/docopt/docopt-go)
+ExternalGoProject_Add(transfer-logex github.com/Badangel/logex)
add_subdirectory(src)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
diff --git a/core/cube/cube-transfer/conf/transfer.conf b/core/cube/cube-transfer/conf/transfer.conf
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/CMakeLists.txt b/core/cube/cube-transfer/src/CMakeLists.txt
index 62d3f7ef7759a0d2a09eb4fe32a064694ece5408..b71278537a2ee03468019e7bd7e5ec4d786becf2 100644
--- a/core/cube/cube-transfer/src/CMakeLists.txt
+++ b/core/cube/cube-transfer/src/CMakeLists.txt
@@ -14,6 +14,6 @@
set(SOURCE_FILE cube-transfer.go)
add_go_executable(cube-transfer ${SOURCE_FILE})
-add_dependencies(cube-transfer docopt-go)
-add_dependencies(cube-transfer rfw)
-add_dependencies(cube-transfer logex)
+add_dependencies(cube-transfer transfer-docopt-go)
+add_dependencies(cube-transfer transfer-rfw)
+add_dependencies(cube-transfer transfer-logex)
diff --git a/core/cube/cube-transfer/src/cube-transfer.go b/core/cube/cube-transfer/src/cube-transfer.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/builder.go b/core/cube/cube-transfer/src/transfer/builder.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/config.go b/core/cube/cube-transfer/src/transfer/config.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/deployer.go b/core/cube/cube-transfer/src/transfer/deployer.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/cube_agent_server.go b/core/cube/cube-transfer/src/transfer/dict/cube_agent_server.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/define.go b/core/cube/cube-transfer/src/transfer/dict/define.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/dict_info.go b/core/cube/cube-transfer/src/transfer/dict/dict_info.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/dict_instance_status.go b/core/cube/cube-transfer/src/transfer/dict/dict_instance_status.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/dict_shard_info.go b/core/cube/cube-transfer/src/transfer/dict/dict_shard_info.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/dict/dict_version_info.go b/core/cube/cube-transfer/src/transfer/dict/dict_version_info.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/global.go b/core/cube/cube-transfer/src/transfer/global.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/http.go b/core/cube/cube-transfer/src/transfer/http.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/http_get.go b/core/cube/cube-transfer/src/transfer/http_get.go
old mode 100755
new mode 100644
diff --git a/core/cube/cube-transfer/src/transfer/transfer.go b/core/cube/cube-transfer/src/transfer/transfer.go
old mode 100755
new mode 100644
index 84ab7427333b3a639efd2e48df3dd248209924be..d29c29a496a62930a86ae1dcb44c02a4d32f1552
--- a/core/cube/cube-transfer/src/transfer/transfer.go
+++ b/core/cube/cube-transfer/src/transfer/transfer.go
@@ -17,68 +17,56 @@ package transfer
import (
"fmt"
"github.com/Badangel/logex"
- "os"
- "time"
"transfer/dict"
)
func Start() {
- go BackupTransfer()
- logex.Notice(">>> starting server...")
- addr := ":" + Port
- err := startHttp(addr)
- if err != nil {
- logex.Fatalf("start http(addr=%v) failed: %v", addr, err)
- os.Exit(255)
- }
-
- logex.Notice(">>> start server succ")
+ BackupTransfer()
}
func BackupTransfer() {
- for {
- //trigger
- version, err := TriggerStart(Dict.DonefileAddress)
- if err != nil {
- logex.Fatalf("[trigger err]trigger err:%v ", err)
- fmt.Printf("[error]trigger err:%v \n", err)
- break
- }
- logex.Noticef("[trigger] get version:%v \n", version)
- if version.Id == 0 {
- logex.Noticef("[sleep]no new version, sleep 5 min")
- fmt.Printf("[sleep]no new version, wait 5 min\n")
- time.Sleep(5 * time.Minute)
- continue
- }
+ //trigger
+ version, err := TriggerStart(Dict.DonefileAddress)
+ if err != nil {
+ logex.Fatalf("[trigger err]trigger err:%v ", err)
+ fmt.Printf("[error]trigger err:%v \n", err)
+ fmt.Print("transfer over!")
+ logex.Noticef("[transfer]status machine exit!")
+ return
+ }
+ logex.Noticef("[trigger] get version:%v \n", version)
Dict.WaitVersionInfo = version
- logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
- WriteWaitVersionInfoToFile()
+ logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
+ WriteWaitVersionInfoToFile()
- //builder
- Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
- Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
- WriteWaitVersionInfoToFile()
- if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
- logex.Fatalf("builder err:%v \n", err)
- }
+ //builder
+ Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
+ Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
+ WriteWaitVersionInfoToFile()
+ if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
+ logex.Fatalf("builder err:%v \n", err)
+ }
- if Dict.WaitVersionInfo.Mode == dict.BASE {
- var newCurrentVersion []dict.DictVersionInfo
- Dict.CurrentVersionInfo = newCurrentVersion
- WriteCurrentVersionInfoToFile()
- }
- logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
+ if Dict.WaitVersionInfo.Mode == dict.BASE {
+ var newCurrentVersion []dict.DictVersionInfo
+ Dict.CurrentVersionInfo = newCurrentVersion
+ WriteCurrentVersionInfoToFile()
+ }
+ if Dict.WaitVersionInfo.Mode == dict.DELTA {
+ var newCurrentVersion []dict.DictVersionInfo
+ Dict.CurrentVersionInfo = newCurrentVersion
+ WriteCurrentVersionInfoToFile()
+ }
+ logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
- //deployer
- Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
- WriteWaitVersionInfoToFile()
- if err = DeployStart(Dict.WaitVersionInfo); err != nil {
- logex.Fatalf("deploy err:%v \n", err)
- }
- logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
+ //deployer
+ Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
+ WriteWaitVersionInfoToFile()
+ if err = DeployStart(Dict.WaitVersionInfo); err != nil {
+ logex.Fatalf("deploy err:%v \n", err)
}
+ logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
}
diff --git a/core/cube/cube-transfer/src/transfer/trigger.go b/core/cube/cube-transfer/src/transfer/trigger.go
old mode 100755
new mode 100644
index b3696dc58b7ca33de307cbe7ea2d4509d269753c..768f7218c036d7d948c046a6763d11c68ce9a306
--- a/core/cube/cube-transfer/src/transfer/trigger.go
+++ b/core/cube/cube-transfer/src/transfer/trigger.go
@@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
Wget(addr, donefileAddr)
addr = donefileAddr
}
-
- baseDonefile := addr + "/base.txt"
- fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
- logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
- contents, err := ioutil.ReadFile(baseDonefile)
VersionLen := len(Dict.CurrentVersionInfo)
version.DictName = Dict.DictName
- if err != nil {
- fmt.Printf("[trigrer]read files err:%v \n", err)
- logex.Fatalf("[trigrer]read files err:%v ", err)
+ fmt.Printf("get into mode check here\n")
+ if Dict.DictMode == dict.BASE_ONLY {
+ baseDonefile := addr + "/base.txt"
+ fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
+ logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
+ contents, err_0 := ioutil.ReadFile(baseDonefile)
+ if err_0 != nil {
+ fmt.Printf("[trigrer]read files err:%v \n", err_0)
+ logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
- } else {
+ } else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
index := len(lines) - 1
@@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
version.Mode = dict.BASE
return
}
- }
- if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 {
+ }
+ }
+ if Dict.DictMode == dict.BASR_DELTA {
patchDonefile := addr + "/patch.txt"
fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile)
logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile)
- contents, err = ioutil.ReadFile(patchDonefile)
- if err != nil {
- fmt.Printf("read files err:%v \n", err)
+ contents, err_0 := ioutil.ReadFile(patchDonefile)
+ if err_0 != nil {
+ fmt.Printf("[trigrer]read files err:%v \n", err_0)
+ logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
-
+ fmt.Printf("[trigger]get patch lines here\n")
for index := 0; index < len(lines)-1; index++ {
if len(lines[index]) < 3 {
logex.Noticef("[trigrer]get patch donfile info error")
@@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id)
newKey, _ := strconv.Atoi(donefileInfo.Key)
- if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key {
+ fmt.Printf("[trigger]read patch id: %d, key: %d\n", newId, newKey)
+ if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Id {
version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion
version.Version = deployVersion
- version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend
+ version.Depend = deployVersion
version.Mode = dict.DELTA
return
}
diff --git a/core/cube/cube-transfer/src/transfer/util.go b/core/cube/cube-transfer/src/transfer/util.go
old mode 100755
new mode 100644
index f3c1834319ab2752a2338cda737855854cf73356..8f9e5c545f35e504f248466e84f8a7d368b80db8
--- a/core/cube/cube-transfer/src/transfer/util.go
+++ b/core/cube/cube-transfer/src/transfer/util.go
@@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) {
func Wget(ftpPath string, downPath string) {
var params []string
- params = append(params, "-P")
+ params = append(params, "--limit-rate=100m")
+ params = append(params, "-P")
params = append(params, downPath)
params = append(params, "-r")
params = append(params, "-N")
@@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) {
if err != nil {
fmt.Printf("wget exe: %v\n", err)
}
-}
\ No newline at end of file
+}
diff --git a/core/general-client/include/general_model.h b/core/general-client/include/general_model.h
old mode 100755
new mode 100644
index b1c4f71f5602bed4eded49822d7afe7caac6e242..88ec7a59f1181eec32e2da800a9a1b71e3cdc084
--- a/core/general-client/include/general_model.h
+++ b/core/general-client/include/general_model.h
@@ -53,6 +53,9 @@ class ModelRes {
res._int32_value_map.end());
_shape_map.insert(res._shape_map.begin(), res._shape_map.end());
_lod_map.insert(res._lod_map.begin(), res._lod_map.end());
+ _tensor_alias_names.insert(_tensor_alias_names.end(),
+ res._tensor_alias_names.begin(),
+ res._tensor_alias_names.end());
}
ModelRes(ModelRes&& res) {
_engine_name = std::move(res._engine_name);
@@ -69,6 +72,10 @@ class ModelRes {
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
+ _tensor_alias_names.insert(
+ _tensor_alias_names.end(),
+ std::make_move_iterator(std::begin(res._tensor_alias_names)),
+ std::make_move_iterator(std::end(res._tensor_alias_names)));
}
~ModelRes() {}
const std::vector& get_int64_by_name(const std::string& name) {
@@ -105,6 +112,10 @@ class ModelRes {
_engine_name = engine_name;
}
const std::string& engine_name() { return _engine_name; }
+
+ const std::vector& tensor_alias_names() {
+ return _tensor_alias_names;
+ }
ModelRes& operator=(ModelRes&& res) {
if (this != &res) {
_engine_name = std::move(res._engine_name);
@@ -121,6 +132,10 @@ class ModelRes {
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
+ _tensor_alias_names.insert(
+ _tensor_alias_names.end(),
+ std::make_move_iterator(std::begin(res._tensor_alias_names)),
+ std::make_move_iterator(std::end(res._tensor_alias_names)));
}
return *this;
}
@@ -132,6 +147,7 @@ class ModelRes {
std::map> _int32_value_map;
std::map> _shape_map;
std::map> _lod_map;
+ std::vector _tensor_alias_names;
};
class PredictorRes {
@@ -193,11 +209,16 @@ class PredictorRes {
}
const std::string& variant_tag() { return _variant_tag; }
const std::vector& get_engine_names() { return _engine_names; }
+ const std::vector& get_tensor_alias_names(const int model_idx) {
+ _tensor_alias_names = _models[model_idx].tensor_alias_names();
+ return _tensor_alias_names;
+ }
private:
std::vector _models;
std::string _variant_tag;
std::vector _engine_names;
+ std::vector _tensor_alias_names;
};
class PredictorClient {
@@ -207,7 +228,7 @@ class PredictorClient {
void init_gflags(std::vector argv);
- int init(const std::vector &client_conf);
+ int init(const std::vector& client_conf);
void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file);
@@ -218,23 +239,22 @@ class PredictorClient {
int destroy_predictor();
- int numpy_predict(
- const std::vector>>& float_feed_batch,
- const std::vector& float_feed_name,
- const std::vector>& float_shape,
- const std::vector>& float_lod_slot_batch,
- const std::vector>>& int_feed_batch,
- const std::vector& int_feed_name,
- const std::vector>& int_shape,
- const std::vector>& int_lod_slot_batch,
- const std::vector>& string_feed_batch,
- const std::vector& string_feed_name,
- const std::vector>& string_shape,
- const std::vector>& string_lod_slot_batch,
- const std::vector& fetch_name,
- PredictorRes& predict_res_batch, // NOLINT
- const int& pid,
- const uint64_t log_id);
+ int numpy_predict(const std::vector>& float_feed,
+ const std::vector& float_feed_name,
+ const std::vector>& float_shape,
+ const std::vector>& float_lod_slot_batch,
+ const std::vector>& int_feed,
+ const std::vector& int_feed_name,
+ const std::vector>& int_shape,
+ const std::vector>& int_lod_slot_batch,
+ const std::vector& string_feed,
+ const std::vector& string_feed_name,
+ const std::vector>& string_shape,
+ const std::vector>& string_lod_slot_batch,
+ const std::vector& fetch_name,
+ PredictorRes& predict_res_batch, // NOLINT
+ const int& pid,
+ const uint64_t log_id);
private:
PredictorApi _api;
@@ -243,6 +263,7 @@ class PredictorClient {
std::string _predictor_path;
std::string _conf_file;
std::map _feed_name_to_idx;
+ std::vector _feed_name;
std::map _fetch_name_to_idx;
std::map _fetch_name_to_var_name;
std::map _fetch_name_to_type;
diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp
index 0ade573de6ac2da59156ba82f5ff3e04f1b7f6b2..d04ab89ae31d048e5a38ada7abec5f27d46ab62f 100644
--- a/core/general-client/src/general_model.cpp
+++ b/core/general-client/src/general_model.cpp
@@ -25,8 +25,6 @@ using baidu::paddle_serving::Timer;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Tensor;
-using baidu::paddle_serving::predictor::general_model::FeedInst;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32, P_STRING };
std::once_flag gflags_init_flag;
namespace py = pybind11;
@@ -68,9 +66,13 @@ int PredictorClient::init(const std::vector &conf_file) {
_fetch_name_to_idx.clear();
_shape.clear();
int feed_var_num = model_config.feed_var_size();
+ _feed_name.clear();
VLOG(2) << "feed var num: " << feed_var_num;
for (int i = 0; i < feed_var_num; ++i) {
_feed_name_to_idx[model_config.feed_var(i).alias_name()] = i;
+ VLOG(2) << "feed [" << i << "]"
+ << " name: " << model_config.feed_var(i).name();
+ _feed_name.push_back(model_config.feed_var(i).name());
VLOG(2) << "feed alias name: " << model_config.feed_var(i).alias_name()
<< " index: " << i;
std::vector tmp_feed_shape;
@@ -146,15 +148,15 @@ int PredictorClient::create_predictor() {
}
int PredictorClient::numpy_predict(
- const std::vector>> &float_feed_batch,
+ const std::vector> &float_feed,
const std::vector &float_feed_name,
const std::vector> &float_shape,
const std::vector> &float_lod_slot_batch,
- const std::vector>> &int_feed_batch,
+ const std::vector> &int_feed,
const std::vector &int_feed_name,
const std::vector> &int_shape,
const std::vector> &int_lod_slot_batch,
- const std::vector> &string_feed_batch,
+ const std::vector &string_feed,
const std::vector &string_feed_name,
const std::vector> &string_shape,
const std::vector> &string_lod_slot_batch,
@@ -162,16 +164,10 @@ int PredictorClient::numpy_predict(
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
- int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
- batch_size = batch_size > string_feed_batch.size() ? batch_size
- : string_feed_batch.size();
- VLOG(2) << "batch size: " << batch_size;
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
- int fetch_name_num = fetch_name.size();
-
_api.thrd_initialize();
std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag);
@@ -188,134 +184,122 @@ int PredictorClient::numpy_predict(
}
int vec_idx = 0;
- for (int bi = 0; bi < batch_size; bi++) {
- VLOG(2) << "prepare batch " << bi;
- std::vector tensor_vec;
- FeedInst *inst = req.add_insts();
- std::vector> float_feed = float_feed_batch[bi];
- std::vector> int_feed = int_feed_batch[bi];
- std::vector string_feed = string_feed_batch[bi];
- for (auto &name : float_feed_name) {
- tensor_vec.push_back(inst->add_tensor_array());
- }
-
- for (auto &name : int_feed_name) {
- tensor_vec.push_back(inst->add_tensor_array());
- }
+ // batch is already in Tensor.
+ std::vector tensor_vec;
- for (auto &name : string_feed_name) {
- tensor_vec.push_back(inst->add_tensor_array());
- }
+ for (auto &name : float_feed_name) {
+ tensor_vec.push_back(req.add_tensor());
+ }
- VLOG(2) << "batch [" << bi << "] "
- << "prepared";
+ for (auto &name : int_feed_name) {
+ tensor_vec.push_back(req.add_tensor());
+ }
- vec_idx = 0;
- for (auto &name : float_feed_name) {
- int idx = _feed_name_to_idx[name];
- if (idx >= tensor_vec.size()) {
- LOG(ERROR) << "idx > tensor_vec.size()";
- return -1;
- }
- int nbytes = float_feed[vec_idx].nbytes();
- void *rawdata_ptr = (void *)(float_feed[vec_idx].data(0));
- int total_number = float_feed[vec_idx].size();
- Tensor *tensor = tensor_vec[idx];
-
- VLOG(2) << "prepare float feed " << name << " shape size "
- << float_shape[vec_idx].size();
- for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
- tensor->add_shape(float_shape[vec_idx][j]);
- }
- for (uint32_t j = 0; j < float_lod_slot_batch[vec_idx].size(); ++j) {
- tensor->add_lod(float_lod_slot_batch[vec_idx][j]);
- }
- tensor->set_elem_type(P_FLOAT32);
+ for (auto &name : string_feed_name) {
+ tensor_vec.push_back(req.add_tensor());
+ }
- tensor->mutable_float_data()->Resize(total_number, 0);
- memcpy(tensor->mutable_float_data()->mutable_data(), rawdata_ptr, nbytes);
- vec_idx++;
+ vec_idx = 0;
+ for (auto &name : float_feed_name) {
+ int idx = _feed_name_to_idx[name];
+ if (idx >= tensor_vec.size()) {
+ LOG(ERROR) << "idx > tensor_vec.size()";
+ return -1;
+ }
+ VLOG(2) << "prepare float feed " << name << " idx " << idx;
+ int nbytes = float_feed[vec_idx].nbytes();
+ void *rawdata_ptr = (void *)(float_feed[vec_idx].data(0));
+ int total_number = float_feed[vec_idx].size();
+ Tensor *tensor = tensor_vec[idx];
+
+ VLOG(2) << "prepare float feed " << name << " shape size "
+ << float_shape[vec_idx].size();
+ for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
+ tensor->add_shape(float_shape[vec_idx][j]);
+ }
+ for (uint32_t j = 0; j < float_lod_slot_batch[vec_idx].size(); ++j) {
+ tensor->add_lod(float_lod_slot_batch[vec_idx][j]);
}
+ tensor->set_elem_type(P_FLOAT32);
- VLOG(2) << "batch [" << bi << "] "
- << "float feed value prepared";
+ tensor->set_name(_feed_name[idx]);
+ tensor->set_alias_name(name);
- vec_idx = 0;
- for (auto &name : int_feed_name) {
- int idx = _feed_name_to_idx[name];
- if (idx >= tensor_vec.size()) {
- LOG(ERROR) << "idx > tensor_vec.size()";
- return -1;
- }
- Tensor *tensor = tensor_vec[idx];
- int nbytes = int_feed[vec_idx].nbytes();
- void *rawdata_ptr = (void *)(int_feed[vec_idx].data(0));
- int total_number = int_feed[vec_idx].size();
+ tensor->mutable_float_data()->Resize(total_number, 0);
+ memcpy(tensor->mutable_float_data()->mutable_data(), rawdata_ptr, nbytes);
+ vec_idx++;
+ }
- for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
- tensor->add_shape(int_shape[vec_idx][j]);
- }
- for (uint32_t j = 0; j < int_lod_slot_batch[vec_idx].size(); ++j) {
- tensor->add_lod(int_lod_slot_batch[vec_idx][j]);
- }
- tensor->set_elem_type(_type[idx]);
-
- if (_type[idx] == P_INT64) {
- tensor->mutable_int64_data()->Resize(total_number, 0);
- memcpy(
- tensor->mutable_int64_data()->mutable_data(), rawdata_ptr, nbytes);
- } else {
- tensor->mutable_int_data()->Resize(total_number, 0);
- memcpy(tensor->mutable_int_data()->mutable_data(), rawdata_ptr, nbytes);
- }
- vec_idx++;
+ vec_idx = 0;
+ for (auto &name : int_feed_name) {
+ int idx = _feed_name_to_idx[name];
+ if (idx >= tensor_vec.size()) {
+ LOG(ERROR) << "idx > tensor_vec.size()";
+ return -1;
}
+ Tensor *tensor = tensor_vec[idx];
+ int nbytes = int_feed[vec_idx].nbytes();
+ void *rawdata_ptr = (void *)(int_feed[vec_idx].data(0));
+ int total_number = int_feed[vec_idx].size();
- VLOG(2) << "batch [" << bi << "] "
- << "int feed value prepared";
+ for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
+ tensor->add_shape(int_shape[vec_idx][j]);
+ }
+ for (uint32_t j = 0; j < int_lod_slot_batch[vec_idx].size(); ++j) {
+ tensor->add_lod(int_lod_slot_batch[vec_idx][j]);
+ }
+ tensor->set_elem_type(_type[idx]);
+ tensor->set_name(_feed_name[idx]);
+ tensor->set_alias_name(name);
+
+ if (_type[idx] == P_INT64) {
+ tensor->mutable_int64_data()->Resize(total_number, 0);
+ memcpy(tensor->mutable_int64_data()->mutable_data(), rawdata_ptr, nbytes);
+ } else {
+ tensor->mutable_int_data()->Resize(total_number, 0);
+ memcpy(tensor->mutable_int_data()->mutable_data(), rawdata_ptr, nbytes);
+ }
+ vec_idx++;
+ }
- vec_idx = 0;
- for (auto &name : string_feed_name) {
- int idx = _feed_name_to_idx[name];
- if (idx >= tensor_vec.size()) {
- LOG(ERROR) << "idx > tensor_vec.size()";
- return -1;
- }
- Tensor *tensor = tensor_vec[idx];
+ vec_idx = 0;
+ for (auto &name : string_feed_name) {
+ int idx = _feed_name_to_idx[name];
+ if (idx >= tensor_vec.size()) {
+ LOG(ERROR) << "idx > tensor_vec.size()";
+ return -1;
+ }
+ Tensor *tensor = tensor_vec[idx];
- for (uint32_t j = 0; j < string_shape[vec_idx].size(); ++j) {
- tensor->add_shape(string_shape[vec_idx][j]);
- }
- for (uint32_t j = 0; j < string_lod_slot_batch[vec_idx].size(); ++j) {
- tensor->add_lod(string_lod_slot_batch[vec_idx][j]);
- }
- tensor->set_elem_type(P_STRING);
-
- const int string_shape_size = string_shape[vec_idx].size();
- // string_shape[vec_idx] = [1];cause numpy has no datatype of string.
- // we pass string via vector >.
- if (string_shape_size != 1) {
- LOG(ERROR) << "string_shape_size should be 1-D, but received is : "
- << string_shape_size;
- return -1;
- }
- switch (string_shape_size) {
- case 1: {
- tensor->add_data(string_feed[vec_idx]);
- break;
- }
+ for (uint32_t j = 0; j < string_shape[vec_idx].size(); ++j) {
+ tensor->add_shape(string_shape[vec_idx][j]);
+ }
+ for (uint32_t j = 0; j < string_lod_slot_batch[vec_idx].size(); ++j) {
+ tensor->add_lod(string_lod_slot_batch[vec_idx][j]);
+ }
+ tensor->set_elem_type(P_STRING);
+ tensor->set_name(_feed_name[idx]);
+ tensor->set_alias_name(name);
+
+ const int string_shape_size = string_shape[vec_idx].size();
+ // string_shape[vec_idx] = [1];cause numpy has no datatype of string.
+ // we pass string via vector >.
+ if (string_shape_size != 1) {
+ LOG(ERROR) << "string_shape_size should be 1-D, but received is : "
+ << string_shape_size;
+ return -1;
+ }
+ switch (string_shape_size) {
+ case 1: {
+ tensor->add_data(string_feed[vec_idx]);
+ break;
}
- vec_idx++;
}
-
- VLOG(2) << "batch [" << bi << "] "
- << "string feed value prepared";
+ vec_idx++;
}
int64_t preprocess_end = timeline.TimeStampUS();
-
int64_t client_infer_start = timeline.TimeStampUS();
-
Response res;
int64_t client_infer_end = 0;
@@ -343,52 +327,46 @@ int PredictorClient::numpy_predict(
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
-
- int idx = 0;
- for (auto &name : fetch_name) {
+ // 在ResponseOp处,已经按照fetch_name对输出数据进行了处理
+ // 所以,输出的数据与fetch_name是严格对应的,按顺序处理即可。
+ for (int idx = 0; idx < output.tensor_size(); ++idx) {
// int idx = _fetch_name_to_idx[name];
- int shape_size = output.insts(0).tensor_array(idx).shape_size();
+ const std::string name = output.tensor(idx).alias_name();
+ model._tensor_alias_names.push_back(name);
+ int shape_size = output.tensor(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
- model._shape_map[name][i] =
- output.insts(0).tensor_array(idx).shape(i);
+ model._shape_map[name][i] = output.tensor(idx).shape(i);
}
- int lod_size = output.insts(0).tensor_array(idx).lod_size();
+ int lod_size = output.tensor(idx).lod_size();
if (lod_size > 0) {
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
- model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
+ model._lod_map[name][i] = output.tensor(idx).lod(i);
}
}
- idx += 1;
- }
- idx = 0;
-
- for (auto &name : fetch_name) {
- // int idx = _fetch_name_to_idx[name];
if (_fetch_name_to_type[name] == P_INT64) {
VLOG(2) << "ferch var " << name << "type int64";
- int size = output.insts(0).tensor_array(idx).int64_data_size();
+ int size = output.tensor(idx).int64_data_size();
model._int64_value_map[name] = std::vector(
- output.insts(0).tensor_array(idx).int64_data().begin(),
- output.insts(0).tensor_array(idx).int64_data().begin() + size);
+ output.tensor(idx).int64_data().begin(),
+ output.tensor(idx).int64_data().begin() + size);
} else if (_fetch_name_to_type[name] == P_FLOAT32) {
VLOG(2) << "fetch var " << name << "type float";
- int size = output.insts(0).tensor_array(idx).float_data_size();
+ int size = output.tensor(idx).float_data_size();
model._float_value_map[name] = std::vector(
- output.insts(0).tensor_array(idx).float_data().begin(),
- output.insts(0).tensor_array(idx).float_data().begin() + size);
+ output.tensor(idx).float_data().begin(),
+ output.tensor(idx).float_data().begin() + size);
} else if (_fetch_name_to_type[name] == P_INT32) {
VLOG(2) << "fetch var " << name << "type int32";
- int size = output.insts(0).tensor_array(idx).int_data_size();
+ int size = output.tensor(idx).int_data_size();
model._int32_value_map[name] = std::vector(
- output.insts(0).tensor_array(idx).int_data().begin(),
- output.insts(0).tensor_array(idx).int_data().begin() + size);
+ output.tensor(idx).int_data().begin(),
+ output.tensor(idx).int_data().begin() + size);
}
- idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
diff --git a/core/general-client/src/pybind_general_model.cpp b/core/general-client/src/pybind_general_model.cpp
old mode 100755
new mode 100644
index 499f0856ad8b7ffae5f3f037142036ac486cc035..ad26bb7d3c175f08438ee22a5a42425fd5147117
--- a/core/general-client/src/pybind_general_model.cpp
+++ b/core/general-client/src/pybind_general_model.cpp
@@ -69,7 +69,10 @@ PYBIND11_MODULE(serving_client, m) {
})
.def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); })
.def("get_engine_names",
- [](PredictorRes &self) { return self.get_engine_names(); });
+ [](PredictorRes &self) { return self.get_engine_names(); })
+ .def("get_tensor_alias_names", [](PredictorRes &self, int model_idx) {
+ return self.get_tensor_alias_names(model_idx);
+ });
py::class_(m, "PredictorClient", py::buffer_protocol())
.def(py::init())
@@ -97,33 +100,31 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.destroy_predictor(); })
.def("numpy_predict",
[](PredictorClient &self,
- const std::vector>>
- &float_feed_batch,
+ const std::vector> &float_feed,
const std::vector &float_feed_name,
const std::vector> &float_shape,
const std::vector> &float_lod_slot_batch,
- const std::vector>>
- &int_feed_batch,
+ const std::vector> &int_feed,
const std::vector &int_feed_name,
const std::vector> &int_shape,
const std::vector> &int_lod_slot_batch,
- const std::vector>& string_feed_batch,
- const std::vector& string_feed_name,
- const std::vector>& string_shape,
- const std::vector>& string_lod_slot_batch,
+ const std::vector &string_feed,
+ const std::vector &string_feed_name,
+ const std::vector> &string_shape,
+ const std::vector> &string_lod_slot_batch,
const std::vector &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
- return self.numpy_predict(float_feed_batch,
+ return self.numpy_predict(float_feed,
float_feed_name,
float_shape,
float_lod_slot_batch,
- int_feed_batch,
+ int_feed,
int_feed_name,
int_shape,
int_lod_slot_batch,
- string_feed_batch,
+ string_feed,
string_feed_name,
string_shape,
string_lod_slot_batch,
diff --git a/core/general-server/CMakeLists.txt b/core/general-server/CMakeLists.txt
old mode 100755
new mode 100644
diff --git a/core/general-server/op/CMakeLists.txt b/core/general-server/op/CMakeLists.txt
old mode 100755
new mode 100644
diff --git a/core/general-server/op/general_copy_op.cpp b/core/general-server/op/general_copy_op.cpp
deleted file mode 100644
index 0391a98bcb7f471c0a0687dd9deb7b404a15a2bf..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_copy_op.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "core/general-server/op/general_copy_op.h"
-#include
-#include
-#include
-#include
-#include "core/general-server/op/general_infer_helper.h"
-#include "core/predictor/framework/infer.h"
-#include "core/predictor/framework/memory.h"
-#include "core/util/include/timer.h"
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-using baidu::paddle_serving::Timer;
-using baidu::paddle_serving::predictor::MempoolWrapper;
-using baidu::paddle_serving::predictor::general_model::Tensor;
-using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FeedInst;
-using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
-
-int GeneralCopyOp::inference() {
- // reade request from client
- const std::vector pre_node_names = pre_names();
- if (pre_node_names.size() != 1) {
- LOG(ERROR) << "This op(" << op_name()
- << ") can only have one predecessor op, but received "
- << pre_node_names.size();
- return -1;
- }
- const std::string pre_name = pre_node_names[0];
-
- const GeneralBlob *input_blob = get_depend_argument(pre_name);
- uint64_t log_id = input_blob->GetLogId();
-
- VLOG(2) << "(logid=" << log_id << ") precedent name: " << pre_name;
- const TensorVector *in = &input_blob->tensor_vector;
- VLOG(2) << "(logid=" << log_id << ") input size: " << in->size();
- int batch_size = input_blob->GetBatchSize();
- int input_var_num = 0;
-
- GeneralBlob *res = mutable_data();
- res->SetLogId(log_id);
- TensorVector *out = &res->tensor_vector;
-
- VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
- res->SetBatchSize(batch_size);
-
- if (!res) {
- LOG(ERROR) << "(logid=" << log_id
- << ") Failed get op tls reader object output";
- }
-
- Timer timeline;
- int64_t start = timeline.TimeStampUS();
-
- VLOG(2) << "(logid=" << log_id << ") Going to init lod tensor";
- for (int i = 0; i < in->size(); ++i) {
- paddle::PaddleTensor lod_tensor;
- CopyLod(&in->at(i), &lod_tensor);
- lod_tensor.dtype = in->at(i).dtype;
- lod_tensor.name = in->at(i).name;
- VLOG(2) << "(logid=" << log_id << ") lod tensor [" << i
- << "].name = " << lod_tensor.name;
- out->push_back(lod_tensor);
- }
-
- VLOG(2) << "(logid=" << log_id << ") pack done.";
-
- for (int i = 0; i < out->size(); ++i) {
- int64_t *src_ptr = static_cast(in->at(i).data.data());
- out->at(i).data.Resize(out->at(i).lod[0].back() * sizeof(int64_t));
- out->at(i).shape = {out->at(i).lod[0].back(), 1};
- int64_t *tgt_ptr = static_cast(out->at(i).data.data());
- for (int j = 0; j < out->at(i).lod[0].back(); ++j) {
- tgt_ptr[j] = src_ptr[j];
- }
- }
-
- VLOG(2) << "(logid=" << log_id << ") output done.";
-
- timeline.Pause();
- int64_t end = timeline.TimeStampUS();
- CopyBlobInfo(input_blob, res);
- AddBlobInfo(res, start);
- AddBlobInfo(res, end);
-
- VLOG(2) << "(logid=" << log_id << ") read data from client success";
- return 0;
-}
-
-DEFINE_OP(GeneralCopyOp);
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/op/general_copy_op.h b/core/general-server/op/general_copy_op.h
deleted file mode 100644
index 9b4caadc6a82f1f1a601ab66394b3f629af703ff..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_copy_op.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-#include
-#include
-#include "core/general-server/general_model_service.pb.h"
-#include "core/general-server/op/general_infer_helper.h"
-#include "core/predictor/framework/resource.h"
-#include "paddle_inference_api.h" // NOLINT
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-class GeneralCopyOp
- : public baidu::paddle_serving::predictor::OpWithChannel {
- public:
- typedef std::vector TensorVector;
-
- DECLARE_OP(GeneralCopyOp);
-
- int inference();
-};
-
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/op/general_detection_op.cpp b/core/general-server/op/general_detection_op.cpp
old mode 100755
new mode 100644
index 7c33ec8efa8c6e89a7a778def6342415d19ffa94..46f5ddf1b508681661b69c60a25b6d7d000e6d4e
--- a/core/general-server/op/general_detection_op.cpp
+++ b/core/general-server/op/general_detection_op.cpp
@@ -36,7 +36,6 @@ using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
diff --git a/core/general-server/op/general_detection_op.h b/core/general-server/op/general_detection_op.h
old mode 100755
new mode 100644
diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp
index 13db69e368e775735efef0bad1e335f5d72a915d..8ee5033d976284b149a2a8bde4e64deea636311f 100644
--- a/core/general-server/op/general_dist_kv_infer_op.cpp
+++ b/core/general-server/op/general_dist_kv_infer_op.cpp
@@ -34,10 +34,11 @@ using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
+// DistKV Infer Op: seek cube and then call paddle inference
+// op seq: general_reader-> dist_kv_infer -> general_response
int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector pre_node_names = pre_names();
@@ -52,14 +53,14 @@ int GeneralDistKVInferOp::inference() {
const GeneralBlob *input_blob = get_depend_argument(pre_name);
if (!input_blob) {
LOG(ERROR) << "input_blob is nullptr,error";
- return -1;
+ return -1;
}
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data();
if (!output_blob) {
- LOG(ERROR) << "output_blob is nullptr,error";
+ LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error";
return -1;
}
output_blob->SetLogId(log_id);
@@ -77,8 +78,8 @@ int GeneralDistKVInferOp::inference() {
std::vector unique_keys;
std::unordered_map key_map;
std::vector values;
- int sparse_count = 0;
- int dense_count = 0;
+ int sparse_count = 0; // sparse inputs counts, sparse would seek cube
+ int dense_count = 0; // dense inputs counts, dense would directly call paddle infer
std::vector> dataptr_size_pairs;
size_t key_len = 0;
for (size_t i = 0; i < in->size(); ++i) {
@@ -97,6 +98,7 @@ int GeneralDistKVInferOp::inference() {
}
keys.resize(key_len);
unique_keys.resize(key_len);
+
int key_idx = 0;
for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) {
std::copy(dataptr_size_pairs[i].first,
@@ -120,6 +122,7 @@ int GeneralDistKVInferOp::inference() {
LOG(ERROR) << "cube init error or cube config not given.";
return -1;
}
+
int64_t seek_start = timeline.TimeStampUS();
int ret = cube->seek(table_names[0], unique_keys, &values);
int64_t seek_end = timeline.TimeStampUS();
@@ -131,7 +134,7 @@ int GeneralDistKVInferOp::inference() {
LOG(ERROR) << "cube value return null";
}
//size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
- size_t EMBEDDING_SIZE = 9;
+ size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
TensorVector sparse_out;
sparse_out.resize(sparse_count);
TensorVector dense_out;
@@ -145,6 +148,7 @@ int GeneralDistKVInferOp::inference() {
std::shared_ptr model_config = resource.get_general_model_config().front();
int cube_key_found = 0;
int cube_key_miss = 0;
+
for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
dense_out[dense_idx] = in->at(i);
@@ -194,6 +198,7 @@ int GeneralDistKVInferOp::inference() {
VLOG(2) << "(logid=" << log_id << ") sparse tensor load success.";
timeline.Pause();
VLOG(2) << "dist kv, cube and datacopy time: " << timeline.ElapsedUS();
+
TensorVector infer_in;
infer_in.insert(infer_in.end(), dense_out.begin(), dense_out.end());
infer_in.insert(infer_in.end(), sparse_out.begin(), sparse_out.end());
@@ -201,10 +206,10 @@ int GeneralDistKVInferOp::inference() {
output_blob->_batch_size = batch_size;
int64_t start = timeline.TimeStampUS();
timeline.Start();
-
+ // call paddle inference here
if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) {
- LOG(ERROR) << "Failed do infer in fluid model: " << engine_name();
+ LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name();
return -1;
}
int64_t end = timeline.TimeStampUS();
diff --git a/core/general-server/op/general_dist_kv_quant_infer_op.cpp b/core/general-server/op/general_dist_kv_quant_infer_op.cpp
old mode 100755
new mode 100644
index 756b83d625d04b9d2c6c6faf1ab42eecf5a19073..77036c35519d9355fa5100e57e99b8b1d2916c44
--- a/core/general-server/op/general_dist_kv_quant_infer_op.cpp
+++ b/core/general-server/op/general_dist_kv_quant_infer_op.cpp
@@ -35,7 +35,6 @@ using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
@@ -117,9 +116,6 @@ int GeneralDistKVQuantInferOp::inference() {
std::unordered_map in_out_map;
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
- //TODO:Temporary addition, specific details to be studied by HexToString
- std::shared_ptr model_config =
- resource.get_general_model_config()[0];
int cube_quant_bits = resource.get_cube_quant_bits();
size_t EMBEDDING_SIZE = 0;
if (cube_quant_bits == 0) {
@@ -146,7 +142,7 @@ int GeneralDistKVQuantInferOp::inference() {
sparse_out[sparse_idx].shape.push_back(
sparse_out[sparse_idx].lod[0].back());
sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE);
- sparse_out[sparse_idx].name = model_config->_feed_name[i];
+ sparse_out[sparse_idx].name = in->at(i).name;
sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() *
EMBEDDING_SIZE * sizeof(float));
// END HERE
diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp
old mode 100755
new mode 100644
index 46038e1fe20d5659d3061e3d7490af65f6d54092..00c408a0c5fbe6d886fc3a62285b92ff486aa154
--- a/core/general-server/op/general_infer_op.cpp
+++ b/core/general-server/op/general_infer_op.cpp
@@ -31,7 +31,6 @@ using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
@@ -49,7 +48,7 @@ int GeneralInferOp::inference() {
const GeneralBlob *input_blob = get_depend_argument(pre_name);
if (!input_blob) {
LOG(ERROR) << "input_blob is nullptr,error";
- return -1;
+ return -1;
}
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
@@ -57,7 +56,7 @@ int GeneralInferOp::inference() {
GeneralBlob *output_blob = mutable_data();
if (!output_blob) {
LOG(ERROR) << "output_blob is nullptr,error";
- return -1;
+ return -1;
}
output_blob->SetLogId(log_id);
diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp
index 3e1091dd844f0afd71c8556586f82aafc42c5097..af77df553837c594789b0e9943790fc37fc01c95 100644
--- a/core/general-server/op/general_reader_op.cpp
+++ b/core/general-server/op/general_reader_op.cpp
@@ -30,42 +30,8 @@ using baidu::paddle_serving::Timer;
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32, P_STRING };
-int conf_check(const Request *req,
- const std::shared_ptr &model_config) {
- int var_num = req->insts(0).tensor_array_size();
- if (var_num != model_config->_feed_type.size()) {
- LOG(ERROR) << "feed var number not match: model config["
- << model_config->_feed_type.size() << "] vs. actual[" << var_num
- << "]";
- return -1;
- }
-
- VLOG(2) << "fetch var num in reader op: " << req->fetch_var_names_size();
-
- for (int i = 0; i < var_num; ++i) {
- const Tensor &tensor = req->insts(0).tensor_array(i);
- if (model_config->_feed_type[i] != tensor.elem_type()) {
- LOG(ERROR) << "feed type not match.";
- return -1;
- }
- if (model_config->_feed_shape[i].size() == tensor.shape_size()) {
- for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) {
- tensor.shape(j);
- if (model_config->_feed_shape[i][j] != tensor.shape(j)) {
- LOG(ERROR) << "feed shape not match.";
- return -1;
- }
- }
- } else {
- LOG(ERROR) << "feed shape not match.";
- return -1;
- }
- }
- return 0;
-}
int GeneralReaderOp::inference() {
// read request from client
@@ -93,7 +59,8 @@ int GeneralReaderOp::inference() {
res->SetLogId(log_id);
Timer timeline;
int64_t start = timeline.TimeStampUS();
- int var_num = req->insts(0).tensor_array_size();
+ // var_num means the number of feed_var.
+ int var_num = req->tensor_size();
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num
<< ") start to call load general model_conf op";
@@ -102,19 +69,7 @@ int GeneralReaderOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
- // get the first InferOP's model_config as ReaderOp's model_config by default.
- std::shared_ptr model_config =
- resource.get_general_model_config().front();
- // TODO(guru4elephant): how to do conditional check?
- /*
- int ret = conf_check(req, model_config);
- if (ret != 0) {
- LOG(ERROR) << "model conf of server:";
- resource.print_general_model_config(model_config);
- return 0;
- }
- */
// package tensor
// prepare basic information for input
// specify the memory needed for output tensor_vector
@@ -125,7 +80,7 @@ int GeneralReaderOp::inference() {
int64_t databuf_size = 0;
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor paddleTensor;
- const Tensor &tensor = req->insts(0).tensor_array(i);
+ const Tensor &tensor = req->tensor(i);
data_len = 0;
elem_type = 0;
elem_size = 0;
@@ -172,13 +127,16 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim;
paddleTensor.shape.push_back(dim);
}
- paddleTensor.name = model_config->_feed_name[i];
+ paddleTensor.name = tensor.name();
out->push_back(paddleTensor);
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
databuf_size = data_len * elem_size;
- out->at(i).data.Resize(databuf_size);
+ void *databuf_char = MempoolWrapper::instance().malloc(databuf_size);
+ paddle::PaddleBuf paddleBuf(databuf_char, databuf_size);
+ out->at(i).data = paddleBuf;
+ // out->at(i).data.Resize(databuf_size);
if (out->at(i).lod.size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp
old mode 100755
new mode 100644
index d8fece0f7e25a967a6a72f41a9090b0977bf252a..9f6c8aabd72c7e1e9b8ff933c807ee7fcdc0662f
--- a/core/general-server/op/general_response_op.cpp
+++ b/core/general-server/op/general_response_op.cpp
@@ -34,7 +34,6 @@ using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
@@ -49,7 +48,6 @@ int GeneralResponseOp::inference() {
get_depend_argument(pre_node_names[0])->GetLogId();
const Request *req = dynamic_cast(get_request_message());
- // response inst with only fetch_var_names
Response *res = mutable_data();
Timer timeline;
@@ -63,7 +61,8 @@ int GeneralResponseOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
- //get the last InferOP's model_config as ResponseOp's model_config by default.
+ // get the last InferOP's model_config as ResponseOp's model_config by
+ // default.
std::shared_ptr model_config =
resource.get_general_model_config().back();
@@ -71,10 +70,23 @@ int GeneralResponseOp::inference() {
<< ") max body size : " << brpc::fLU64::FLAGS_max_body_size;
std::vector fetch_index;
- fetch_index.resize(req->fetch_var_names_size());
- for (int i = 0; i < req->fetch_var_names_size(); ++i) {
- fetch_index[i] =
- model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
+ // this is based on GetOutPutNames() is ordered map.
+ // and the order of Output is the same as the prototxt FetchVar.
+ // otherwise, you can only get the Output by the corresponding of
+ // Name -- Alias_name.
+ if (req->fetch_var_names_size() > 0) {
+ fetch_index.resize(req->fetch_var_names_size());
+ for (int i = 0; i < req->fetch_var_names_size(); ++i) {
+ fetch_index[i] =
+ model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
+ }
+ } else {
+ fetch_index.resize(model_config->_fetch_alias_name.size());
+ for (int i = 0; i < model_config->_fetch_alias_name.size(); ++i) {
+ fetch_index[i] =
+ model_config
+ ->_fetch_alias_name_to_index[model_config->_fetch_alias_name[i]];
+ }
}
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
@@ -95,40 +107,41 @@ int GeneralResponseOp::inference() {
ModelOutput *output = res->add_outputs();
// To get the order of model return values
output->set_engine_name(pre_name);
- FetchInst *fetch_inst = output->add_insts();
+ var_idx = 0;
+ // idx is the real index of FetchVar.
+ // idx is not the index of FetchList.
+ // fetch_index is the real index in FetchVar of Fetchlist
+ // for example, FetchVar = {0:A, 1:B, 2:C}
+ // FetchList = {0:C,1:A}, at this situation.
+ // fetch_index = [2,0], C`index = 2 and A`index = 0
for (auto &idx : fetch_index) {
- Tensor *tensor = fetch_inst->add_tensor_array();
- //tensor->set_elem_type(1);
- if (model_config->_is_lod_fetch[idx]) {
- VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
- << model_config->_fetch_name[idx] << " is lod_tensor";
- for (int k = 0; k < in->at(idx).shape.size(); ++k) {
- VLOG(2) << "(logid=" << log_id << ") shape[" << k
- << "]: " << in->at(idx).shape[k];
- tensor->add_shape(in->at(idx).shape[k]);
- }
- } else {
- VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
- << model_config->_fetch_name[idx] << " is tensor";
- for (int k = 0; k < in->at(idx).shape.size(); ++k) {
- VLOG(2) << "(logid=" << log_id << ") shape[" << k
- << "]: " << in->at(idx).shape[k];
- tensor->add_shape(in->at(idx).shape[k]);
+ Tensor *tensor = output->add_tensor();
+ tensor->set_name(in->at(idx).name);
+ tensor->set_alias_name(model_config->_fetch_alias_name[idx]);
+ for (int k = 0; k < in->at(idx).shape.size(); ++k) {
+ VLOG(2) << "(logid=" << log_id << ") shape[" << k
+ << "]: " << in->at(idx).shape[k];
+ tensor->add_shape(in->at(idx).shape[k]);
+ }
+ std::string str_tensor_type = "is tensor";
+ if (model_config->_is_lod_fetch[idx] && in->at(idx).lod.size() > 0) {
+ str_tensor_type = "is lod_tensor";
+ for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
+ tensor->add_lod(in->at(idx).lod[0][j]);
}
}
- }
+ VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
+ << model_config->_fetch_name[idx] << str_tensor_type;
- var_idx = 0;
- for (auto &idx : fetch_index) {
cap = 1;
for (int j = 0; j < in->at(idx).shape.size(); ++j) {
cap *= in->at(idx).shape[j];
}
- FetchInst *fetch_p = output->mutable_insts(0);
auto dtype = in->at(idx).dtype;
if (dtype == paddle::PaddleDType::INT64) {
+ tensor->set_elem_type(0);
VLOG(2) << "(logid=" << log_id << ") Prepare int64 var ["
<< model_config->_fetch_name[idx] << "].";
int64_t *data_ptr = static_cast(in->at(idx).data.data());
@@ -137,35 +150,24 @@ int GeneralResponseOp::inference() {
// `Swap` method is faster than `{}` method.
google::protobuf::RepeatedField tmp_data(data_ptr,
data_ptr + cap);
- fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap(
- &tmp_data);
+ output->mutable_tensor(var_idx)->mutable_int64_data()->Swap(&tmp_data);
} else if (dtype == paddle::PaddleDType::FLOAT32) {
+ tensor->set_elem_type(1);
VLOG(2) << "(logid=" << log_id << ") Prepare float var ["
<< model_config->_fetch_name[idx] << "].";
-
+
float *data_ptr = static_cast(in->at(idx).data.data());
google::protobuf::RepeatedField tmp_data(data_ptr,
data_ptr + cap);
- fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap(
- &tmp_data);
+ output->mutable_tensor(var_idx)->mutable_float_data()->Swap(&tmp_data);
} else if (dtype == paddle::PaddleDType::INT32) {
-
+ tensor->set_elem_type(2);
VLOG(2) << "(logid=" << log_id << ")Prepare int32 var ["
<< model_config->_fetch_name[idx] << "].";
int32_t *data_ptr = static_cast(in->at(idx).data.data());
google::protobuf::RepeatedField tmp_data(data_ptr,
data_ptr + cap);
- fetch_p->mutable_tensor_array(var_idx)->mutable_int_data()->Swap(
- &tmp_data);
- }
-
- if (model_config->_is_lod_fetch[idx]) {
- if (in->at(idx).lod.size() > 0) {
- for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
- fetch_p->mutable_tensor_array(var_idx)->add_lod(
- in->at(idx).lod[0][j]);
- }
- }
+ output->mutable_tensor(var_idx)->mutable_int_data()->Swap(&tmp_data);
}
VLOG(2) << "(logid=" << log_id << ") fetch var ["
@@ -205,4 +207,4 @@ DEFINE_OP(GeneralResponseOp);
} // namespace serving
} // namespace paddle_serving
-} // namespace baidu
\ No newline at end of file
+} // namespace baidu
diff --git a/core/general-server/op/general_text_reader_op.cpp b/core/general-server/op/general_text_reader_op.cpp
deleted file mode 100755
index 6c305c18c0cb56bc5dd841c9c6a09807c6dbf518..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_text_reader_op.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "core/general-server/op/general_text_reader_op.h"
-#include
-#include
-#include
-#include
-#include "core/predictor/framework/infer.h"
-#include "core/predictor/framework/memory.h"
-#include "core/util/include/timer.h"
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-using baidu::paddle_serving::Timer;
-using baidu::paddle_serving::predictor::MempoolWrapper;
-using baidu::paddle_serving::predictor::general_model::Tensor;
-using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FeedInst;
-using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
-
-int GeneralTextReaderOp::inference() {
- // reade request from client
- const Request *req = dynamic_cast(get_request_message());
- uint64_t log_id = req->log_id();
-
- int batch_size = req->insts_size();
- int input_var_num = 0;
-
- std::vector elem_type;
- std::vector elem_size;
- std::vector capacity;
-
- GeneralBlob *res = mutable_data();
-
- if (!res) {
- LOG(ERROR) << "(logid=" << log_id
- << ") Failed get op tls reader object output";
- }
-
- TensorVector *out = &res->tensor_vector;
- res->SetBatchSize(batch_size);
- res->SetLogId(log_id);
-
- if (batch_size <= 0) {
- LOG(ERROR) << "(logid=" << log_id << ") Batch size < 0";
- return -1;
- }
-
- Timer timeline;
- int64_t start = timeline.TimeStampUS();
-
- int var_num = req->insts(0).tensor_array_size();
- VLOG(2) << "(logid=" << log_id << ") var num: " << var_num;
-
- VLOG(2) << "(logid=" << log_id
- << ") start to call load general model_conf op";
- baidu::paddle_serving::predictor::Resource &resource =
- baidu::paddle_serving::predictor::Resource::instance();
-
- VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
- std::shared_ptr model_config =
- resource.get_general_model_config()[0];
-
- VLOG(2) << "(logid=" << log_id << ") print general model config done.";
-
- elem_type.resize(var_num);
- elem_size.resize(var_num);
- capacity.resize(var_num);
- for (int i = 0; i < var_num; ++i) {
- paddle::PaddleTensor lod_tensor;
- elem_type[i] = req->insts(0).tensor_array(i).elem_type();
- VLOG(2) << "(logid=" << log_id << ") var[" << i
- << "] has elem type: " << elem_type[i];
- if (elem_type[i] == 0) { // int64
- elem_size[i] = sizeof(int64_t);
- lod_tensor.dtype = paddle::PaddleDType::INT64;
- } else {
- elem_size[i] = sizeof(float);
- lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
- }
-
- if (req->insts(0).tensor_array(i).shape(0) == -1) {
- lod_tensor.lod.resize(1);
- lod_tensor.lod[0].push_back(0);
- VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
- } else {
- lod_tensor.shape.push_back(batch_size);
- capacity[i] = 1;
- for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
- int dim = req->insts(0).tensor_array(i).shape(k);
- VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
- << "]: " << dim;
- capacity[i] *= dim;
- lod_tensor.shape.push_back(dim);
- }
- VLOG(2) << "(logid=" << log_id << ") var[" << i
- << "] is tensor, capacity: " << capacity[i];
- }
- lod_tensor.name = model_config->_feed_name[i];
- out->push_back(lod_tensor);
- }
-
- for (int i = 0; i < var_num; ++i) {
- if (out->at(i).lod.size() == 1) {
- for (int j = 0; j < batch_size; ++j) {
- const Tensor &tensor = req->insts(j).tensor_array(i);
- int data_len = tensor.int_data_size();
- int cur_len = out->at(i).lod[0].back();
- out->at(i).lod[0].push_back(cur_len + data_len);
- }
- out->at(i).data.Resize(out->at(i).lod[0].back() * elem_size[i]);
- out->at(i).shape = {out->at(i).lod[0].back(), 1};
- VLOG(2) << "(logid=" << log_id << ") var[" << i
- << "] is lod_tensor and len=" << out->at(i).lod[0].back();
- } else {
- out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
- VLOG(2) << "(logid=" << log_id << ") var[" << i
- << "] is tensor and capacity=" << batch_size * capacity[i];
- }
- }
-
- for (int i = 0; i < var_num; ++i) {
- if (elem_type[i] == 0) {
- int64_t *dst_ptr = static_cast(out->at(i).data.data());
- int offset = 0;
- for (int j = 0; j < batch_size; ++j) {
- for (int k = 0; k < req->insts(j).tensor_array(i).int_data_size();
- ++k) {
- dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
- }
- if (out->at(i).lod.size() == 1) {
- offset = out->at(i).lod[0][j + 1];
- } else {
- offset += capacity[i];
- }
- }
- } else {
- float *dst_ptr = static_cast(out->at(i).data.data());
- int offset = 0;
- for (int j = 0; j < batch_size; ++j) {
- for (int k = 0; k < req->insts(j).tensor_array(i).int_data_size();
- ++k) {
- dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
- }
- if (out->at(i).lod.size() == 1) {
- offset = out->at(i).lod[0][j + 1];
- } else {
- offset += capacity[i];
- }
- }
- }
- }
-
- int64_t end = timeline.TimeStampUS();
- res->p_size = 0;
- AddBlobInfo(res, start);
- AddBlobInfo(res, end);
-
- VLOG(2) << "(logid=" << log_id << ") read data from client success";
- return 0;
-}
-DEFINE_OP(GeneralTextReaderOp);
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/op/general_text_reader_op.h b/core/general-server/op/general_text_reader_op.h
deleted file mode 100644
index af822993dc37fae23c1fa584d640cbfe8d9950c8..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_text_reader_op.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-#include
-#include
-#include "core/general-server/general_model_service.pb.h"
-#include "core/general-server/load_general_model_service.pb.h"
-#include "core/general-server/op/general_infer_helper.h"
-#include "core/predictor/framework/resource.h"
-#include "paddle_inference_api.h" // NOLINT
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-class GeneralTextReaderOp
- : public baidu::paddle_serving::predictor::OpWithChannel {
- public:
- typedef std::vector TensorVector;
-
- DECLARE_OP(GeneralTextReaderOp);
-
- int inference();
-};
-
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/op/general_text_response_op.cpp b/core/general-server/op/general_text_response_op.cpp
deleted file mode 100755
index 03ab08cd361ea9eb8060c4ba5372d319a34df1f6..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_text_response_op.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "core/general-server/op/general_text_response_op.h"
-#include
-#include
-#include
-#include
-#include "core/predictor/framework/infer.h"
-#include "core/predictor/framework/memory.h"
-#include "core/predictor/framework/resource.h"
-#include "core/util/include/timer.h"
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-using baidu::paddle_serving::Timer;
-using baidu::paddle_serving::predictor::MempoolWrapper;
-using baidu::paddle_serving::predictor::general_model::Tensor;
-using baidu::paddle_serving::predictor::general_model::Response;
-using baidu::paddle_serving::predictor::general_model::Request;
-using baidu::paddle_serving::predictor::general_model::FetchInst;
-using baidu::paddle_serving::predictor::general_model::ModelOutput;
-using baidu::paddle_serving::predictor::InferManager;
-using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
-
-int GeneralTextResponseOp::inference() {
- VLOG(2) << "Going to run inference";
- const std::vector pre_node_names = pre_names();
- VLOG(2) << "pre node names size: " << pre_node_names.size();
- const GeneralBlob *input_blob;
- uint64_t log_id =
- get_depend_argument(pre_node_names[0])->GetLogId();
-
- const Request *req = dynamic_cast(get_request_message());
- // response inst with only fetch_var_names
- Response *res = mutable_data();
-
- Timer timeline;
- int64_t start = timeline.TimeStampUS();
-
- VLOG(2) << "(logid=" << log_id
- << ") start to call load general model_conf op";
- baidu::paddle_serving::predictor::Resource &resource =
- baidu::paddle_serving::predictor::Resource::instance();
-
- VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
- std::shared_ptr model_config =
- resource.get_general_model_config().back();
-
- std::vector fetch_index;
- fetch_index.resize(req->fetch_var_names_size());
- for (int i = 0; i < req->fetch_var_names_size(); ++i) {
- fetch_index[i] =
- model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
- }
-
- for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
- const std::string &pre_name = pre_node_names[pi];
- VLOG(2) << "(logid=" << log_id << ") pre names[" << pi << "]: " << pre_name
- << " (" << pre_node_names.size() << ")";
- input_blob = get_depend_argument(pre_name);
- if (!input_blob) {
- LOG(ERROR) << "(logid=" << log_id
- << ") Failed mutable depended argument, op: " << pre_name;
- return -1;
- }
-
- const TensorVector *in = &input_blob->tensor_vector;
- int batch_size = input_blob->GetBatchSize();
- VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
-
- ModelOutput *output = res->add_outputs();
- output->set_engine_name(
- pre_name); // To get the order of model return values
- for (int i = 0; i < batch_size; ++i) {
- FetchInst *fetch_inst = output->add_insts();
- for (auto &idx : fetch_index) {
- Tensor *tensor = fetch_inst->add_tensor_array();
- // currently only response float tensor or lod_tensor
- tensor->set_elem_type(1);
- if (model_config->_is_lod_fetch[idx]) {
- VLOG(2) << "(logid=" << log_id << ") out[" << idx << " is lod_tensor";
- tensor->add_shape(-1);
- } else {
- VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] is tensor";
- for (int k = 1; k < in->at(idx).shape.size(); ++k) {
- VLOG(2) << "(logid=" << log_id << ") shape[" << k - 1
- << "]: " << in->at(idx).shape[k];
- tensor->add_shape(in->at(idx).shape[k]);
- }
- }
- }
- }
-
- int var_idx = 0;
- for (auto &idx : fetch_index) {
- float *data_ptr = static_cast(in->at(idx).data.data());
- int cap = 1;
- for (int j = 1; j < in->at(idx).shape.size(); ++j) {
- cap *= in->at(idx).shape[j];
- }
- if (model_config->_is_lod_fetch[idx]) {
- for (int j = 0; j < batch_size; ++j) {
- for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
- k++) {
- output->mutable_insts(j)
- ->mutable_tensor_array(var_idx)
- ->add_float_data(data_ptr[k]);
- }
- }
- } else {
- for (int j = 0; j < batch_size; ++j) {
- for (int k = j * cap; k < (j + 1) * cap; ++k) {
- output->mutable_insts(j)
- ->mutable_tensor_array(var_idx)
- ->add_float_data(data_ptr[k]);
- }
- }
- }
- var_idx++;
- }
- }
-
- if (req->profile_server()) {
- int64_t end = timeline.TimeStampUS();
- // TODO(barriery): multi-model profile_time.
- // At present, only the response_op is multi-input, so here we get
- // the profile_time by hard coding. It needs to be replaced with
- // a more elegant way.
- for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
- input_blob = get_depend_argument(pre_node_names[pi]);
- VLOG(2) << "(logid=" << log_id
- << ") p size for input blob: " << input_blob->p_size;
- int profile_time_idx = -1;
- if (pi == 0) {
- profile_time_idx = 0;
- } else {
- profile_time_idx = input_blob->p_size - 2;
- }
- for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
- res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
- }
- }
- // TODO(guru4elephant): find more elegant way to do this
- res->add_profile_time(start);
- res->add_profile_time(end);
- }
-
- return 0;
-}
-DEFINE_OP(GeneralTextResponseOp);
-
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/op/general_text_response_op.h b/core/general-server/op/general_text_response_op.h
deleted file mode 100644
index 334d98476e67f745635f7d66d7b8682de62da355..0000000000000000000000000000000000000000
--- a/core/general-server/op/general_text_response_op.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-#include
-#include
-#include "core/general-server/general_model_service.pb.h"
-#include "core/general-server/op/general_infer_helper.h"
-#include "paddle_inference_api.h" // NOLINT
-
-namespace baidu {
-namespace paddle_serving {
-namespace serving {
-
-class GeneralTextResponseOp
- : public baidu::paddle_serving::predictor::OpWithChannel<
- baidu::paddle_serving::predictor::general_model::Response> {
- public:
- typedef std::vector TensorVector;
-
- DECLARE_OP(GeneralTextResponseOp);
-
- int inference();
-};
-
-} // namespace serving
-} // namespace paddle_serving
-} // namespace baidu
diff --git a/core/general-server/proto/general_model_service.proto b/core/general-server/proto/general_model_service.proto
old mode 100644
new mode 100755
index e7dd5fccf54be43db8e65a9ed1112ceaece93700..8fedb60e97ec5b81263687b47ff0794880da8671
--- a/core/general-server/proto/general_model_service.proto
+++ b/core/general-server/proto/general_model_service.proto
@@ -20,21 +20,20 @@ package baidu.paddle_serving.predictor.general_model;
option cc_generic_services = true;
message Tensor {
- repeated bytes data = 1;
+ repeated string data = 1;
repeated int32 int_data = 2;
repeated int64 int64_data = 3;
repeated float float_data = 4;
- optional int32 elem_type = 5;
- repeated int32 shape = 6;
- repeated int32 lod = 7; // only for fetch tensor currently
+ optional int32 elem_type =
+ 5; // 0 means int64, 1 means float32, 2 means int32, 3 means string
+ repeated int32 shape = 6; // shape should include batch
+ repeated int32 lod = 7; // only for fetch tensor currently
+ optional string name = 8; // get from the Model prototxt
+ optional string alias_name = 9; // get from the Model prototxt
};
-message FeedInst { repeated Tensor tensor_array = 1; };
-
-message FetchInst { repeated Tensor tensor_array = 1; };
-
message Request {
- repeated FeedInst insts = 1;
+ repeated Tensor tensor = 1;
repeated string fetch_var_names = 2;
optional bool profile_server = 3 [ default = false ];
required uint64 log_id = 4 [ default = 0 ];
@@ -46,7 +45,7 @@ message Response {
};
message ModelOutput {
- repeated FetchInst insts = 1;
+ repeated Tensor tensor = 1;
optional string engine_name = 2;
}
diff --git a/core/pdcodegen/src/pdcodegen.cpp b/core/pdcodegen/src/pdcodegen.cpp
index c505ca66385dd363ad0a76470012f07a925bcd17..a99828ee3466a32d45dcabb61a2700f9362539d4 100644
--- a/core/pdcodegen/src/pdcodegen.cpp
+++ b/core/pdcodegen/src/pdcodegen.cpp
@@ -280,6 +280,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast(cntl_base);\n"
+ " cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);\n"
" uint64_t log_id = request->log_id();\n"
" cntl->set_log_id(log_id);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
@@ -322,6 +323,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast(cntl_base);\n"
+ " cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);\n"
" uint64_t log_id = equest->log_id();\n"
" cntl->set_log_id(log_id);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
@@ -1023,6 +1025,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast(cntl_base);\n"
+ " cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);\n"
" uint64_t log_id = request->log_id();\n"
" cntl->set_log_id(log_id);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
@@ -1067,6 +1070,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast(cntl_base);\n"
+ " cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);\n"
" uint64_t log_id = request->log_id();\n"
" cntl->set_log_id(log_id);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
diff --git a/core/predictor/common/constant.cpp b/core/predictor/common/constant.cpp
index 5fa1277de1a4b0d33d14a9c33d3cb4b280bc3b5c..70f0096ba002ebb8f185cd73f8fe4f8d4d06b83f 100644
--- a/core/predictor/common/constant.cpp
+++ b/core/predictor/common/constant.cpp
@@ -25,7 +25,7 @@ DEFINE_int32(port, 8010, "");
DEFINE_string(workflow_path, "./conf", "");
DEFINE_string(workflow_file, "workflow.prototxt", "");
DEFINE_string(inferservice_path, "./conf", "");
-DEFINE_string(inferservice_file, "service.prototxt", "");
+DEFINE_string(inferservice_file, "infer_service.prototxt", "");
DEFINE_string(logger_path, "./conf", "");
DEFINE_string(logger_file, "log.conf", "");
DEFINE_string(resource_path, "./conf", "");
diff --git a/core/predictor/framework/bsf-inl-tensor.h b/core/predictor/framework/bsf-inl-tensor.h
deleted file mode 100644
index b7c725b443281f355addffb8f2fcb36651b6d9b6..0000000000000000000000000000000000000000
--- a/core/predictor/framework/bsf-inl-tensor.h
+++ /dev/null
@@ -1,373 +0,0 @@
-// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-
-#ifdef BCLOUD
-#include
-#else
-#include
-#endif
-
-#include
-#include
-#include
-#include
-#include "core/predictor/common/inner_common.h"
-#include "core/predictor/framework/infer_data.h"
-#include "core/predictor/framework/memory.h"
-
-#include
-
-namespace im {
-namespace bsf {
-
-template <>
-struct Task {
- typedef Task
- TaskT;
- typedef baidu::paddle_serving::predictor::Tensor Tensor;
- typedef baidu::paddle_serving::predictor::Tensor InType;
- typedef baidu::paddle_serving::predictor::Tensor OutType;
- typedef baidu::paddle_serving::predictor::BatchTensor BatchTensor;
- typedef baidu::paddle_serving::predictor::BatchTensor InArrayT;
- typedef baidu::paddle_serving::predictor::BatchTensor OutArrayT;
-
- struct Segment {
- Segment(void* p, size_t b, size_t s) : ptr(p), begin(b), size(s) {}
- void* ptr;
- size_t begin;
- size_t size;
- };
-
- int read_fd;
- int write_fd;
-
- pid_t owner_tid;
-
- const InArrayT* in;
- OutArrayT* out;
-
- size_t rem;
- size_t size;
-
- butil::atomic index;
-
- const BatchTensor* get(bool is_in) const {
- if (is_in) {
- return in;
- } else {
- return out;
- }
- }
-
- BatchTensor* get(bool is_in) {
- if (is_in) {
- return const_cast(in);
- } else {
- return out;
- }
- }
-
- Task() {
- read_fd = -1;
- write_fd = -1;
- owner_tid = -1;
- in = NULL;
- out = NULL;
- rem = -1;
- size = -1;
- index.store(0, butil::memory_order_relaxed);
- }
-};
-
-template <>
-class BatchTasks> {
- public:
- typedef baidu::paddle_serving::predictor::Tensor Tensor;
- typedef baidu::paddle_serving::predictor::Tensor InType;
- typedef baidu::paddle_serving::predictor::Tensor OutType;
- typedef baidu::paddle_serving::predictor::DataBuf DataBuf;
- typedef baidu::paddle_serving::predictor::MempoolWrapper MempoolWrapper;
-
- typedef Task
- TaskT;
- typedef TaskMeta TaskMetaT;
- typedef TaskT::InArrayT InArrayT;
- typedef TaskT::OutArrayT OutArrayT;
-
- explicit BatchTasks(size_t batch_size, bool batch_align = false)
- : _batch_size(batch_size),
- _rem_size(batch_size),
- _batch_align(batch_align) {
- _batch_in.clear();
- _batch_out.clear();
- _tasks.clear();
- }
-
- ~BatchTasks() {
- _batch_in.clear();
- _batch_out.clear();
- _tasks.clear();
- }
-
- static bool check_valid(const InArrayT& in,
- OutArrayT& out, // NOLINT
- bool align) { // NOLINT
- if (align) {
- if (out.count() <= 0 || out.size() <= 0) {
- LOG(ERROR) << "Out tensor is empty, when aligned";
- return false;
- }
-
- if (out.size() != in.size()) {
- LOG(ERROR) << "In/Out tensor size not eq: " << out.size()
- << "!=" << in.size();
- return false;
- }
-
- for (size_t fi = 0, shape0 = 0; fi < out.count(); ++fi) {
- if (!out[fi].valid()) {
- LOG(ERROR) << "Out[" << fi << "] tensor not valid";
- return false;
- }
-
- if (out.size() != out[fi].shape0()) {
- LOG(ERROR) << "Shape0 not consistency, " << out.size()
- << "!=" << out[fi].shape0() << ", " << fi;
- return false;
- }
- }
- }
-
- return true;
- }
-
- size_t append_task(TaskT* task) {
- size_t add = std::min(task->rem, _rem_size);
- if (!_batch_align) {
- add = task->rem;
- }
- TaskMetaT tm(task, task->in->size() - task->rem, add);
- _tasks.push_back(tm);
-
- task->rem -= add;
- _rem_size -= add;
- return _rem_size;
- }
-
- void merge_tasks() {
- merge_input();
- merge_output();
- }
-
- void merge_input() {
- if (_tasks.size() <= 0 || _tasks[0].task->in->count() <= 0) {
- return;
- }
-
- if (_tasks.size() == 1 && !_batch_align) {
- TaskMetaT& tm = _tasks[0];
- _batch_in = *(tm.task->in);
- return;
- }
-
- merge_tensor(true);
- }
-
- void merge_output() {
- if (_batch_align) {
- if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) {
- return;
- }
- }
-
- if (_tasks.size() <= 0 || _tasks[0].task->out->count() <= 0) {
- return;
- }
-
- TaskMetaT& tm = _tasks[0];
- if (_tasks.size() == 1 && !_batch_align) {
- _batch_out = *(tm.task->out);
- return;
- }
-
- if (tm.task->out->size() <= 0) {
- // shape is empty
- _batch_out = *(tm.task->out);
- return;
- }
-
- if ((*tm.task->out)[0].data.data() == 0 ||
- (*tm.task->out)[0].data.size() == 0) {
- _batch_out = *(tm.task->out);
- return;
- }
-
- merge_tensor(false);
- }
-
- void merge_tensor(bool is_in) {
- // accumulate batch size from fetched tasks
- size_t batch_size = 0;
- for (size_t ti = 0; ti < _tasks.size(); ++ti) {
- TaskMetaT& tm = _tasks[ti];
- size_t add = tm.end - tm.begin;
- batch_size += add;
- }
-
- // merge all instanses in each tensor data
- size_t tensor_count = _tasks[0].task->get(is_in)->count();
- for (size_t fi = 0; fi < tensor_count; ++fi) {
- const Tensor& head = (*(_tasks[0].task->get(is_in)))[fi];
- Tensor batch_tensor;
- batch_tensor.name = head.name;
- batch_tensor.type = head.type;
- batch_tensor.shape.push_back(batch_size);
-
- size_t ins_ele_count = 1;
- for (size_t si = 1; si < head.shape.size(); ++si) {
- batch_tensor.shape.push_back(head.shape[si]);
- ins_ele_count *= head.shape[si];
- }
-
- size_t tensor_ele_count = ins_ele_count * batch_size;
- size_t ins_byte = ins_ele_count * head.ele_byte();
-
- size_t tensor_byte = tensor_ele_count * head.ele_byte();
- void* data_buf = MempoolWrapper::instance().malloc(tensor_byte);
- if (!data_buf) {
- LOG(ERROR) << "Malloc failed, size: " << tensor_byte;
- return;
- }
-
- size_t data_byte = 0;
- for (size_t ti = 0; ti < _tasks.size(); ++ti) {
- TaskMetaT& tm = _tasks[ti];
- size_t acc_byte = ins_byte * (tm.end - tm.begin);
- if (data_byte + acc_byte > tensor_byte) {
- LOG(ERROR) << "Invalid bytes: " << data_byte << " + " << acc_byte
- << " >= " << tensor_byte;
- return;
- }
-
- const Tensor& tensor = (*(tm.task->get(is_in)))[fi];
- memcpy(
- reinterpret_cast(data_buf) + data_byte,
- reinterpret_cast(tensor.data.data()) + tm.begin * ins_byte,
- acc_byte);
- data_byte += acc_byte;
- }
-
- if (data_byte != tensor_byte) {
- LOG(ERROR) << "Invalid tensor byte: " << data_byte
- << " != " << tensor_byte;
- return;
- }
-
- batch_tensor.data =
- DataBuf(reinterpret_cast(data_buf), tensor_byte);
- if (is_in) {
- _batch_in.push_back(batch_tensor);
- } else {
- _batch_out.push_back(batch_tensor);
- }
- }
-
- LOG(INFO) << "merge input(" << is_in << ") samples: " << batch_size
- << " from " << _tasks.size() << " pvs";
- }
-
- void notify_tasks() {
- if (_batch_out.size() != _batch_in.size()) {
- LOG(ERROR) << "batch size not consistency: " << _batch_out.size()
- << " != " << _batch_in.size();
- return;
- }
-
- size_t tensor_count = _batch_out.count();
- size_t batch_size = _batch_out.size();
- for (size_t fi = 0; fi < tensor_count; ++fi) {
- const Tensor& tensor = _batch_out[fi];
- size_t ins_byte = tensor.ele_byte();
- for (size_t si = 1; si < tensor.shape.size(); ++si) {
- ins_byte *= tensor.shape[si];
- }
-
- for (size_t ti = 0, bi = 0, add = 0; ti < _tasks.size();
- ++ti, bi += add) {
- OutArrayT* dst = _tasks[ti].task->out;
- add = _tasks[ti].end - _tasks[ti].begin;
- size_t offset_src = ins_byte * bi;
- size_t add_byte = add * ins_byte;
-
- if (_batch_align) { // merge all batchs
- size_t offset_dst = ins_byte * _tasks[ti].begin;
- void* ptr = const_cast((*dst)[fi].data.data());
- memcpy(
- reinterpret_cast(ptr) + offset_dst,
- reinterpret_cast(_batch_out[fi].data.data()) + offset_src,
- add_byte);
- } else { // overwrite
- if (dst->count() <= 0) {
- dst->push_back(_batch_out[fi]);
- } else {
- (*dst)[fi] = _batch_out[fi];
- }
-
- (*dst)[fi].shape[0] = add;
- (*dst)[fi].data = DataBuf(
- reinterpret_cast(_batch_out[fi].data.data()) + offset_src,
- add_byte);
- }
- }
- }
-
- for (size_t ti = 0; ti < _tasks.size(); ++ti) {
- TaskT* task = _tasks[ti].task;
- size_t begin = _tasks[ti].begin;
- size_t end = _tasks[ti].end;
- size_t add = end - begin;
-
- size_t index = task->index.fetch_add(add);
- if ((index + add) >= task->in->size()) {
- char c = 0;
- while (write(task->write_fd, &c, 1) != 1 && errno == EINTR) {
- }
- butil::return_object(task);
- }
- }
- }
-
- const typename TaskT::InArrayT& in() const { return _batch_in; }
-
- typename TaskT::OutArrayT& out() { return _batch_out; }
-
- size_t task_size() { return _tasks.size(); }
-
- private:
- std::vector _tasks;
- InArrayT _batch_in;
- OutArrayT _batch_out;
- size_t _batch_size;
- size_t _rem_size;
- bool _batch_align;
-};
-
-} // namespace bsf
-} // namespace im
diff --git a/core/predictor/framework/bsf-inl.h b/core/predictor/framework/bsf-inl.h
index 1193ce4860e595598b738adab738c7af9664cc26..1f5d272d2875ee878f09ac2882364afe9fd899fb 100644
--- a/core/predictor/framework/bsf-inl.h
+++ b/core/predictor/framework/bsf-inl.h
@@ -24,6 +24,7 @@
#include
#include "core/predictor/common/inner_common.h"
+#include "core/predictor/framework/memory.h"
namespace im {
namespace bsf {
@@ -35,7 +36,7 @@ void* TaskExecutor::thread_entry(void* args) {
static_cast*>(context->executor);
executor->work(context);
- return NULL;
+ return nullptr;
}
template
@@ -70,7 +71,7 @@ int TaskExecutor::start(uint32_t thread_num, uint32_t init_timeout_sec) {
_thread_contexts.push_back(&contexts[i]);
}
- int init_timeout = init_timeout_sec * 1000 * 1000;
+ size_t init_timeout = init_timeout_sec * 1000 * 1000;
bool has_error = false;
bool has_timeout = true;
@@ -102,7 +103,7 @@ int TaskExecutor::start(uint32_t thread_num, uint32_t init_timeout_sec) {
}
// 100ms
- const int sleep_interval = 100 * 1000;
+ const size_t sleep_interval = 100 * 1000;
usleep(sleep_interval);
init_timeout -= sleep_interval;
}
@@ -125,18 +126,21 @@ void TaskExecutor::stop() {
}
template
-TaskHandler TaskExecutor::schedule(const InArrayT& in,
- OutArrayT& out) { // NOLINT
+TaskHandler TaskExecutor::schedule(
+ const void* inVectorT_ptr,
+ void* outVectorT_ptr) { // NOLINT
TaskT* task = butil::get_object();
if (!task) {
LOG(ERROR) << "Failed get TaskT from object pool";
return TaskHandler::valid_handle();
}
+ /*
if (!BatchTasks::check_valid(in, out, _batch_align)) {
LOG(ERROR) << "Invalid input & output";
return TaskHandler::valid_handle();
}
+ */
int fds[2];
int rc = pipe(fds);
@@ -150,10 +154,9 @@ TaskHandler TaskExecutor::schedule(const InArrayT& in,
task->write_fd = fds[1];
task->owner_tid = ::syscall(SYS_gettid);
- task->in = ∈
- task->out = &out;
- task->rem = in.size();
- task->size = in.size();
+ task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
+ task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
+ task->rem = task->batch_size();
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut);
@@ -163,8 +166,13 @@ TaskHandler TaskExecutor::schedule(const InArrayT& in,
return TaskHandler(*task);
}
+// this function is accessed by multi thread.
+// so AutoMutex at first.
+// so batch.append_task is thread safe.
+// you dont need to add extra lock in append_task()
template
-bool TaskExecutor::fetch_batch(BatchTasks& batch) { // NOLINT
+bool TaskExecutor::move_task_to_batch(
+ BatchTasks& batch) { // NOLINT
AutoMutex lock(_mut);
while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut);
@@ -187,8 +195,30 @@ bool TaskExecutor::fetch_batch(BatchTasks& batch) { // NOLINT
return true;
}
+// this function is accessed by multi thread.
+// move_task_to_batch have add lock inside the function.
+// Packaging 1 TaskT as 1 or Several TaskMeta.
+// TaskT is from the SingleTon TaskExecutor`s _task_queue
+// although TaskMeta is a local variable, but several TaskMeta may points to
+// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
+// put TaskMeta to the local variable BatchTasks batch.
+
+// batch.merge_tasks() and batch.notify_tasks() has no lock.
+// BatchTasks batch itself is a local variable, it`s thread safe.
+// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta
+// you need to pay attention to that.
+// Multi-Thread deal with different TaskMeta(cause it`s created as local
+// variable)
+// But different TaskMeta may points to the same TaskT
+// which is get from the SingleTon TaskExecutor`s _task_queue.
+
template
int TaskExecutor::work(ThreadContext* context) {
+ if (MempoolWrapper::instance().thread_initialize() != 0) {
+ LOG(ERROR) << "Failed thread initialize mempool";
+ return -1;
+ }
+
if (_thread_init_fn != NULL) {
if (_thread_init_fn(context->user_thread_context) != 0) {
LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
@@ -207,10 +237,15 @@ int TaskExecutor::work(ThreadContext* context) {
}
}
+ if (MempoolWrapper::instance().thread_clear() != 0) {
+ LOG(ERROR) << "Failed thread clear mempool";
+ return -1;
+ }
+
BatchTasks batch(_batch_size, _batch_align);
- if (fetch_batch(batch)) {
+ if (move_task_to_batch(batch)) {
batch.merge_tasks();
- _fn(batch.in(), batch.out());
+ _fn(&batch.in(), &batch.out());
batch.notify_tasks();
}
}
@@ -219,9 +254,10 @@ int TaskExecutor::work(ThreadContext* context) {
}
template
-bool TaskManager::schedule(const InArrayT& in,
- OutArrayT& out) { // NOLINT
- TaskHandler handler = _executor.schedule(in, out);
+bool TaskManager::schedule(const void* in,
+ void* out) { // NOLINT
+ TaskHandler handler =
+ TaskExecutorVector::instance()[_model_index].schedule(in, out);
if (handler.valid()) {
_task_owned = handler;
diff --git a/core/predictor/framework/bsf.h b/core/predictor/framework/bsf.h
index 36a00c381130c191de713e5024c7247d64cb96e7..7a8629e75b87aec889a1cce98b6392dddad32ce0 100644
--- a/core/predictor/framework/bsf.h
+++ b/core/predictor/framework/bsf.h
@@ -16,7 +16,7 @@
#include
#include
-#include
+#include
#include
#ifdef BCLOUD
@@ -29,46 +29,186 @@
#include "boost/function.hpp"
+#include "core/predictor/framework/memory.h"
+#include "paddle_inference_api.h"
+
namespace im {
namespace bsf {
static const size_t DEFAULT_BATCH_SIZE = 100;
+// InItemT is paddle::PaddleTensor
+// InVectorT std::vector
+// InVectorT means different feedvar, but not batch.
+// Batch is already inside the paddle::PaddleTensor.
+
+// size_t `rem` records how many batch have not been put in BatchTasks.
+// `rem` don`t need to be atomic, cause the operation `put` is synchronous.
+// actually, the reason is that lock have been added outside the operation
+// `put`.
+
+// size_t `index` records how many batch have been processing completed.
+// `index` need to be atomic, cause the operation 'notify' is asynchronous.
template
struct Task {
- typedef std::vector InArrayT;
- typedef std::vector OutArrayT;
+ typedef std::vector InVectorT;
+ typedef std::vector