提交 ddf1fd34 编写于 作者: H HexToString

Merge branch 'develop' of https://github.com/PaddlePaddle/Serving into base

......@@ -11,10 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
#execute_process(COMMAND go env -w GO111MODULE=off)
add_subdirectory(cube-server)
add_subdirectory(cube-api)
add_subdirectory(cube-builder)
#add_subdirectory(cube-transfer)
#add_subdirectory(cube-agent)
add_subdirectory(cube-transfer)
add_subdirectory(cube-agent)
......@@ -15,7 +15,6 @@
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
project(cube-agent Go)
include(cmake/golang.cmake)
ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go)
......
......@@ -22,7 +22,7 @@ include_directories(SYSTEM ${CMAKE_CURRENT_BINARY_DIR}/../)
add_executable(cube-builder src/main.cpp include/cube-builder/util.h src/util.cpp src/builder_job.cpp include/cube-builder/builder_job.h include/cube-builder/define.h src/seqfile_reader.cpp include/cube-builder/seqfile_reader.h include/cube-builder/raw_reader.h include/cube-builder/vtext.h src/crovl_builder_increment.cpp include/cube-builder/crovl_builder_increment.h src/curl_simple.cpp include/cube-builder/curl_simple.h)
add_dependencies(cube-builder jsoncpp boost)
add_dependencies(cube-builder jsoncpp boost brpc)
set(DYNAMIC_LIB
gflags
......@@ -39,4 +39,8 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kvtool.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kv_to_seqfile.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool/source DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
......@@ -212,7 +212,7 @@ class slim_hash_map {
int copy_data_from(const slim_hash_map& rhs) {
destroy();
LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize;
if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) {
......@@ -231,7 +231,7 @@ class slim_hash_map {
<< sizeof(hash_node_t) * BLOCK_SIZE;
return -1;
}
LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE;
memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE);
......@@ -265,11 +265,13 @@ class slim_hash_map {
}
size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]);
int node_cnt = 0;
while (node != NULL && node->data.first != key) {
LOG(INFO) << "node link get:" << node->data.first;
node_cnt++;
node = get_node(node->next);
}
LOG(INFO) << "key: " << key << " , found count: " << node_cnt;
if (node == NULL) {
return end();
}
......@@ -390,7 +392,6 @@ class slim_hash_map {
if (node != NULL) {
return node->data.second;
}
return add_node(index, key)->data.second;
}
void clear() {
......@@ -399,16 +400,16 @@ class slim_hash_map {
m_nFreeEntries = 0;
m_nSize = 0;
}
bool load(const char* file) {
bool load(const char* file, uint32_t block_id) {
// clear();
// bias = 0 means base mode, bias = K means patch mode, and base dict has size K
int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast<char*>(malloc(size * 100000));
LOG(INFO) << "current block id: " << block_id;
if (fp == NULL || buf == NULL) {
return false;
}
size_t read_count;
bool err = false;
key_t key;
......@@ -423,6 +424,8 @@ class slim_hash_map {
for (int i = 0; i < static_cast<int>(read_count); ++i) {
key = *(reinterpret_cast<key_t*>(buf + i * size));
value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t)));
value = ((uint64_t)block_id << 32) | value;
LOG(INFO) << "slim map key: " << key << " , value: " << value;
(*this)[key] = value;
}
}
......@@ -557,7 +560,6 @@ class slim_hash_map {
}
hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize;
if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr);
......@@ -569,7 +571,7 @@ class slim_hash_map {
}
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
//LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum;
if (block >= m_nBlockNum) {
try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
......@@ -581,7 +583,6 @@ class slim_hash_map {
return NULL;
}
}
uint32_t addr = m_nNextEntry;
++m_nNextEntry;
hash_node_t* node = get_node(addr);
......
......@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path,
bool in_mem,
const std::string& v_path) {
TIME_FLAG(load_start);
int ret = load_index(dict_path, v_path);
if (ret != E_OK) {
LOG(WARNING) << "load index failed";
return ret;
}
LOG(INFO) << "load index in mem mode: " << in_mem ;
if (in_mem) {
ret = load_data(dict_path, v_path);
if (ret != E_OK) {
......@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path);
index_n_path.append(v_path);
index_n_path.append("/index.n");
uint32_t cur_block_id = 0;
if (_base_dict) cur_block_id = _base_dict->_block_set.size();
LOG(INFO) << "index file path: " << index_n_path;
//ERR HERE
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"),
&fclose);
if (pf.get() == NULL) {
......@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
} else {
if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
LOG(ERROR) << "copy data from old index failed in patch mode";
return E_DATA_ERROR;
}
file_idx = 0;
LOG(INFO)
<< "index check file len failed in patch mode, set file_idx to 0";
<< "index check fail, direct copy";
}
}
LOG(INFO) << "resize slim table, new count: " << count/2;
_slim_table.resize(count / 2);
char file[1024];
......@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
dict_path.c_str(),
v_path.c_str(),
file_idx);
LOG(INFO) << "load file str: " << file;
if (stat(file, &fstat) < 0) {
if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist";
......@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
<< (uint64_t)fstat.st_size;
return E_DATA_ERROR;
}
LOG(INFO) << "loading from index." << file_idx;
if (!_slim_table.load(file) || _slim_table.size() > count) {
LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size();
if (!_slim_table.load(file, cur_block_id)) {
return E_DATA_ERROR;
}
......@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
}
int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
if (_base_dict) {
_block_set = _base_dict->_block_set;
LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size;
for (size_t i = 0; i < _block_set.size(); ++i) {
block_size.push_back(_block_set[i].size);
total_data_size += _block_set[i].size;
}
}
std::string data_n_path(dict_path);
......@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
......@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
block_size.push_back(size);
LOG(INFO) << "new block size: " << size;
total_data_size += size;
}
g_data_size << (total_data_size / 1024 / 1024);
......@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
pf = NULL;
uint32_t old_size = _block_set.size();
LOG(INFO) << "load data old size: " << old_size;
for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) {
old_size = 0;
break;
}
}
_block_set.resize(count);
LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size;
_block_set.resize(count + old_size);
for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024];
LOG(INFO) << "load from data." << i;
snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
//snprintf(
// data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size);
FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) {
LOG(WARNING) << "open data file [" << data_path << " failed";
LOG(WARNING) << "open data file [" << data_path << " ]failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
_block_set[i].s_data.reset(
reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
_block_set[i].s_data.reset(reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed";
fclose(data_file);
return E_OOM;
}
_block_set[i].size = block_size[i];
if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()),
sizeof(char),
_block_set[i].size,
......@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
fclose(data_file);
return E_DATA_ERROR;
}
LOG(INFO) << "load new data to BlockSet succ";
for (size_t ii = 0; ii < 20; ++ii) {
LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]);
}
fclose(data_file);
}
......@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag);
LOG(INFO) << "search key: " << id << " , addr: " << addr;
if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size;
char* block_data = NULL;
block_data = _block_set[id].s_data.get();
if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) {
......@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
<< default_buffer_size;
return false;
}
LOG(INFO) << "seek key: " << key << " , addr: " << addr;
memcpy(buff,
(block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t));
......
......@@ -18,11 +18,9 @@ project(cube-transfer Go)
include(cmake/golang.cmake)
ExternalGoProject_Add(rfw github.com/mipearson/rfw)
ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go)
add_custom_target(logex
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get github.com/Badangel/logex
DEPENDS rfw)
ExternalGoProject_Add(transfer-rfw github.com/mipearson/rfw)
ExternalGoProject_Add(transfer-docopt-go github.com/docopt/docopt-go)
ExternalGoProject_Add(transfer-logex github.com/Badangel/logex)
add_subdirectory(src)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
......@@ -14,6 +14,6 @@
set(SOURCE_FILE cube-transfer.go)
add_go_executable(cube-transfer ${SOURCE_FILE})
add_dependencies(cube-transfer docopt-go)
add_dependencies(cube-transfer rfw)
add_dependencies(cube-transfer logex)
add_dependencies(cube-transfer transfer-docopt-go)
add_dependencies(cube-transfer transfer-rfw)
add_dependencies(cube-transfer transfer-logex)
......@@ -17,68 +17,56 @@ package transfer
import (
"fmt"
"github.com/Badangel/logex"
"os"
"time"
"transfer/dict"
)
func Start() {
go BackupTransfer()
logex.Notice(">>> starting server...")
addr := ":" + Port
err := startHttp(addr)
if err != nil {
logex.Fatalf("start http(addr=%v) failed: %v", addr, err)
os.Exit(255)
}
logex.Notice(">>> start server succ")
BackupTransfer()
}
func BackupTransfer() {
for {
//trigger
version, err := TriggerStart(Dict.DonefileAddress)
if err != nil {
logex.Fatalf("[trigger err]trigger err:%v ", err)
fmt.Printf("[error]trigger err:%v \n", err)
break
}
logex.Noticef("[trigger] get version:%v \n", version)
if version.Id == 0 {
logex.Noticef("[sleep]no new version, sleep 5 min")
fmt.Printf("[sleep]no new version, wait 5 min\n")
time.Sleep(5 * time.Minute)
continue
}
//trigger
version, err := TriggerStart(Dict.DonefileAddress)
if err != nil {
logex.Fatalf("[trigger err]trigger err:%v ", err)
fmt.Printf("[error]trigger err:%v \n", err)
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
return
}
logex.Noticef("[trigger] get version:%v \n", version)
Dict.WaitVersionInfo = version
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile()
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile()
//builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err)
}
//builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err)
}
if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
if Dict.WaitVersionInfo.Mode == dict.DELTA {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
//deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
//deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
}
......@@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
Wget(addr, donefileAddr)
addr = donefileAddr
}
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err := ioutil.ReadFile(baseDonefile)
VersionLen := len(Dict.CurrentVersionInfo)
version.DictName = Dict.DictName
if err != nil {
fmt.Printf("[trigrer]read files err:%v \n", err)
logex.Fatalf("[trigrer]read files err:%v ", err)
fmt.Printf("get into mode check here\n")
if Dict.DictMode == dict.BASE_ONLY {
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err_0 := ioutil.ReadFile(baseDonefile)
if err_0 != nil {
fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
} else {
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
index := len(lines) - 1
......@@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
version.Mode = dict.BASE
return
}
}
if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 {
}
}
if Dict.DictMode == dict.BASR_DELTA {
patchDonefile := addr + "/patch.txt"
fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile)
logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile)
contents, err = ioutil.ReadFile(patchDonefile)
if err != nil {
fmt.Printf("read files err:%v \n", err)
contents, err_0 := ioutil.ReadFile(patchDonefile)
if err_0 != nil {
fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
fmt.Printf("[trigger]get patch lines here\n")
for index := 0; index < len(lines)-1; index++ {
if len(lines[index]) < 3 {
logex.Noticef("[trigrer]get patch donfile info error")
......@@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id)
newKey, _ := strconv.Atoi(donefileInfo.Key)
if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key {
fmt.Printf("[trigger]read patch id: %d, key: %d\n", newId, newKey)
if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Id {
version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion
version.Version = deployVersion
version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend
version.Depend = deployVersion
version.Mode = dict.DELTA
return
}
......
......@@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) {
func Wget(ftpPath string, downPath string) {
var params []string
params = append(params, "-P")
params = append(params, "--limit-rate=100m")
params = append(params, "-P")
params = append(params, downPath)
params = append(params, "-r")
params = append(params, "-N")
......@@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) {
if err != nil {
fmt.Printf("wget exe: %v\n", err)
}
}
\ No newline at end of file
}
......@@ -37,7 +37,149 @@ using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVInferOp::inference() { return 0; }
// DistKV Infer Op: seek cube and then call paddle inference
// op seq: general_reader-> dist_kv_infer -> general_response
int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
if (!input_blob) {
LOG(ERROR) << "input_blob is nullptr,error";
return -1;
}
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!output_blob) {
LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error";
return -1;
}
output_blob->SetLogId(log_id);
if (!input_blob) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op:" << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
TensorVector *out = &output_blob->tensor_vector;
std::vector<uint64_t> keys;
std::vector<rec::mcube::CubeValue> values;
int sparse_count = 0; // sparse inputs counts, sparse would seek cube
int dense_count = 0; // dense inputs counts, dense would directly call paddle infer
std::vector<std::pair<int64_t *, size_t>> dataptr_size_pairs;
size_t key_len = 0;
for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
++dense_count;
continue;
}
++sparse_count;
size_t elem_num = 1;
for (size_t s = 0; s < in->at(i).shape.size(); ++s) {
elem_num *= in->at(i).shape[s];
}
key_len += elem_num;
int64_t *data_ptr = static_cast<int64_t *>(in->at(i).data.data());
dataptr_size_pairs.push_back(std::make_pair(data_ptr, elem_num));
}
keys.resize(key_len);
VLOG(3) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len;
int key_idx = 0;
for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) {
std::copy(dataptr_size_pairs[i].first,
dataptr_size_pairs[i].first + dataptr_size_pairs[i].second,
keys.begin() + key_idx);
key_idx += dataptr_size_pairs[i].second;
}
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
std::vector<std::string> table_names = cube->get_table_names();
if (table_names.size() == 0) {
LOG(ERROR) << "cube init error or cube config not given.";
return -1;
}
// gather keys and seek cube servers, put results in values
int ret = cube->seek(table_names[0], keys, &values);
VLOG(3) << "(logid=" << log_id << ") cube seek status: " << ret;
if (values.size() != keys.size() || values[0].buff.size() == 0) {
LOG(ERROR) << "cube value return null";
}
// EMBEDDING_SIZE means the length of sparse vector, user can define length here.
size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
TensorVector sparse_out;
sparse_out.resize(sparse_count);
TensorVector dense_out;
dense_out.resize(dense_count);
int cube_val_idx = 0;
int sparse_idx = 0;
int dense_idx = 0;
std::unordered_map<int, int> in_out_map;
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
std::shared_ptr<PaddleGeneralModelConfig> model_config = resource.get_general_model_config().front();
//copy data to tnsor
for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
dense_out[dense_idx] = in->at(i);
++dense_idx;
continue;
}
sparse_out[sparse_idx].lod.resize(in->at(i).lod.size());
for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) {
sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size());
std::copy(in->at(i).lod[x].begin(),
in->at(i).lod[x].end(),
sparse_out[sparse_idx].lod[x].begin());
}
sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32;
sparse_out[sparse_idx].shape.push_back(sparse_out[sparse_idx].lod[0].back());
sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE);
sparse_out[sparse_idx].name = model_config->_feed_name[i];
sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() *
EMBEDDING_SIZE * sizeof(float));
float *dst_ptr = static_cast<float *>(sparse_out[sparse_idx].data.data());
for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) {
float *data_ptr = dst_ptr + x * EMBEDDING_SIZE;
memcpy(data_ptr,
values[cube_val_idx].buff.data(),
values[cube_val_idx].buff.size());
cube_val_idx++;
}
++sparse_idx;
}
VLOG(3) << "(logid=" << log_id << ") sparse tensor load success.";
TensorVector infer_in;
infer_in.insert(infer_in.end(), dense_out.begin(), dense_out.end());
infer_in.insert(infer_in.end(), sparse_out.begin(), sparse_out.end());
int batch_size = input_blob->_batch_size;
output_blob->_batch_size = batch_size;
Timer timeline;
int64_t start = timeline.TimeStampUS();
timeline.Start();
// call paddle inference here
if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name();
return -1;
}
int64_t end = timeline.TimeStampUS();
CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end);
return 0;
}
DEFINE_OP(GeneralDistKVInferOp);
} // namespace serving
......
......@@ -2,3 +2,16 @@ set(seq_gen_src ${CMAKE_CURRENT_LIST_DIR}/seq_generator.cpp ${CMAKE_CURRENT_LIS
LIST(APPEND seq_gen_src ${PROTO_SRCS})
add_executable(seq_generator ${seq_gen_src})
target_link_libraries(seq_generator protobuf -lpthread)
set(seq_reader_src ${CMAKE_CURRENT_LIST_DIR}/seq_reader.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp)
add_executable(seq_reader ${seq_reader_src})
add_dependencies(seq_reader brpc)
install(TARGETS seq_reader
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so
)
install(TARGETS seq_reader RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(TARGETS seq_generator RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/time.h>
#include <limits.h>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
std::string string_to_hex(const std::string& input) {
static const char* const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
void printSeq(std::string file, int limit) {
SequenceFileRecordReader reader(file.c_str());
if (reader.open() != 0) {
std::cerr << "open file failed! " << file;
return;
}
if (reader.read_header() != 0) {
std::cerr << "read header error! " << file;
reader.close();
return;
}
Record record(reader.get_header());
int total_count = 0;
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t *>(const_cast<char *>(record.key.data()));
total_count++;
int64_t value_length = record.record_len - record.key_len;
std::cout << "key: " << key << " , value: " << string_to_hex(record.value.c_str()) << std::endl;
if (total_count >= limit) {
break;
}
}
if (reader.close() != 0) {
std::cerr << "close file failed! " << file;
return;
}
}
int main(int argc, char **argv) {
if (argc != 3 && argc != 2) {
std::cout << "Seq Reader Usage:" << std::endl;
std::cout << "get all keys: ./seq_reader $FILENAME " << std::endl;
std::cout << "get some keys: ./seq_reader $FILENAME $KEY_NUM" << std::endl;
return -1;
}
if (argc == 3 || argc == 2) {
const char* filename_str = argv[1];
std::cout << "cstr filename is " << filename_str << std::endl;
std::string filename = filename_str;
std::cout << "filename is " << filename << std::endl;
if (argc == 3) {
const char* key_num_str = argv[2];
int key_limit = std::stoi(key_num_str);
printSeq(filename, key_limit);
} else {
printSeq(filename, INT_MAX);
}
}
return 0;
}
......@@ -87,6 +87,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2
go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3
go get -u google.golang.org/grpc@v1.33.0
go env -w GO111MODULE=auto
```
......
......@@ -86,6 +86,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2
go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3
go get -u google.golang.org/grpc@v1.33.0
go env -w GO111MODULE=auto
```
......
## 如果获得稀疏参数索引Cube所需的模型输入
### 背景知识
推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。
稀疏参数索引的模型格式是SequenceFile,源自Hadoop生态的键值对格式文件。
为了方便调试,我们给出了从特定格式的可读文本文件到SequenceFile格式文件的转换工具,以及SequenceFile格式文件与可阅读文字的转换。
用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。
用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。
### 预备知识
- 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md)
### 用法
在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。
#### 生成SequenceFile
`output/tool/`下,修改`output/tool/source/file.txt`,该文件每一行对应一个键值对,用冒号`:`区分key和value部分。
例如:
```
1676869128226002114:48241 37064 91 -539 114 51 -122 269 229 -134 -282
1657749292782759014:167 40 98 27 117 10 -29 15 74 67 -54
```
执行
```
python kv_to_seqfile.py
```
即可生成`data`文件夹,我们看下它的结构
```
.
├── 20210805095422
│   └── base
│   └── feature
└── donefile
└── base.txt
```
其中`20210805095422/base/feature` 就是SequenceFile格式文件,donefile保存在`donefile/base.txt`
#### 查看SequenceFile
我们使用`seq_reader`工具来解读SequenceFile格式文件。
```
./seq_reader 20210805095422/base/feature 10 # 阅读开头的10个KV对
./seq_reader 20210805095422/base/feature # 阅读所有KV对
```
结果
```
key: 1676869128226002114 , value: 343832343109333730363409093931092D35333909313134093531092D3132320932363909323239092D313334092D323832
key: 1657749292782759014 , value: 3136370934300909393809323709313137093130092D3239093135093734093637092D3534
```
其中value 我们目前都以16进制的形式打印。
......@@ -81,7 +81,6 @@ if (SERVER)
if(WITH_LITE)
set(VERSION_SUFFIX 2)
endif()
add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r
......
## Criteo CTR with Sparse Parameter Indexing Service
([简体中文](./README_CN.md)|English)
### Get Sample Dataset
go to directory `python/examples/criteo_ctr_with_cube`
```
sh get_data.sh
```
### Download Model and Sparse Parameter Sequence Files
```
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
```
the model will be in ./ctr_server_model_kv and ./ctr_client_config.
### Start Sparse Parameter Indexing Service
```
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
```
Here, the sparse parameter is loaded by cube sparse parameter indexing service Cube.
### Start RPC Predictor, the number of serving thread is 4(configurable in test_server.py)
```
python test_server.py ctr_serving_model_kv
```
### Run Prediction
```
python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data
```
### Benchmark
CPU :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz
Model :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py)
server core/thread num : 4/8
Run
```
bash benchmark.sh
```
1000 batches will be sent by every client
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- |
| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 |
| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 |
| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 |
| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 |
| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 |
the average latency of threads
![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png)
The QPS is
![qps](../../../doc/criteo-cube-benchmark-qps.png)
## 带稀疏参数索引服务的CTR预测服务
(简体中文|[English](./README.md))
### 获取样例数据
进入目录 `python/examples/criteo_ctr_with_cube`
```
sh get_data.sh
```
### 下载模型和稀疏参数序列文件
```
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
```
执行脚本后会在当前目录有ctr_server_model_kv和ctr_client_config文件夹。
### 启动稀疏参数索引服务
```
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
```
此处,模型当中的稀疏参数会被存放在稀疏参数索引服务Cube当中。
### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置)
```
python test_server.py ctr_serving_model_kv
```
### 执行预测
```
python test_client.py ctr_client_conf/serving_client_conf.prototxt ./raw_data
```
### Benchmark
设备 :Intel(R) Xeon(R) CPU 6148 @ 2.40GHz
模型 :[Criteo CTR](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/criteo_ctr_with_cube/network_conf.py)
server core/thread num : 4/8
执行
```
bash benchmark.sh
```
客户端每个线程会发送1000个batch
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | avg_latency | qps |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | ----- |
| 1 | 0.035 | 1.596 | 0.021 | 0.518 | 0.0024 | 0.0025 | 6.774 | 147.7 |
| 2 | 0.034 | 1.780 | 0.027 | 0.463 | 0.0020 | 0.0023 | 6.931 | 288.3 |
| 4 | 0.038 | 2.954 | 0.025 | 0.455 | 0.0019 | 0.0027 | 8.378 | 477.5 |
| 8 | 0.044 | 8.230 | 0.028 | 0.464 | 0.0023 | 0.0034 | 14.191 | 563.8 |
| 16 | 0.048 | 21.037 | 0.028 | 0.455 | 0.0025 | 0.0041 | 27.236 | 587.5 |
平均每个线程耗时图如下
![avg cost](../../../doc/criteo-cube-benchmark-avgcost.png)
每个线程QPS耗时如下
![qps](../../../doc/criteo-cube-benchmark-qps.png)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import sys
import paddle.fluid.incubate.data_generator as dg
class CriteoDataset(dg.MultiSlotDataGenerator):
def setup(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.cont_diff_ = [
20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \
self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % self.hash_dim_])
return dense_feature, sparse_feature, [int(features[0])]
def infer_reader(self, filelist, batch, buf_size):
def local_iter():
for fname in filelist:
with open(fname.strip(), "r") as fin:
for line in fin:
dense_feature, sparse_feature, label = self._process_line(
line)
#yield dense_feature, sparse_feature, label
yield [dense_feature] + sparse_feature + [label]
import paddle
batch_iter = paddle.batch(
paddle.reader.shuffle(
local_iter, buf_size=buf_size),
batch_size=batch)
return batch_iter
def generate_sample(self, line):
def data_iter():
dense_feature, sparse_feature, label = self._process_line(line)
feature_name = ["dense_input"]
for idx in self.categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
return data_iter
if __name__ == "__main__":
criteo_dataset = CriteoDataset()
criteo_dataset.setup(int(sys.argv[1]))
criteo_dataset.run_from_stdin()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
#! /bin/bash
mkdir -p cube_model
mkdir -p cube/data
./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=${PWD}/cube/data -shard_num=1 -only_build=false
cd cube && ./cube
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from __future__ import print_function
from args import parse_args
import os
import paddle.fluid as fluid
import paddle
import sys
from network_conf import dnn_model
dense_feature_dim = 13
paddle.enable_static()
def train():
args = parse_args()
sparse_only = args.sparse_only
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype="int64")
for i in range(1, 27)
]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
#nn_input = None if sparse_only else dense_input
nn_input = dense_input
predict_y, loss, auc_var, batch_auc_var, infer_vars = dnn_model(
nn_input, sparse_input_ids, label, args.embedding_size,
args.sparse_feature_dim)
optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
optimizer.minimize(loss)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_use_var([dense_input] + sparse_input_ids + [label])
python_executable = "python3.6"
pipe_command = "{} criteo_reader.py {}".format(python_executable,
args.sparse_feature_dim)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(128)
thread_num = 10
dataset.set_thread(thread_num)
whole_filelist = [
"raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))
]
print(whole_filelist)
dataset.set_filelist(whole_filelist[:100])
dataset.load_into_memory()
fluid.layers.Print(auc_var)
epochs = 1
for i in range(epochs):
exe.train_from_dataset(
program=fluid.default_main_program(), dataset=dataset, debug=True)
print("epoch {} finished".format(i))
import paddle_serving_client.io as server_io
feed_var_dict = {}
feed_var_dict['dense_input'] = dense_input
for i, sparse in enumerate(sparse_input_ids):
feed_var_dict["embedding_{}.tmp_0".format(i)] = sparse
fetch_var_dict = {"prob": predict_y}
feed_kv_dict = {}
feed_kv_dict['dense_input'] = dense_input
for i, emb in enumerate(infer_vars):
feed_kv_dict["embedding_{}.tmp_0".format(i)] = emb
fetch_var_dict = {"prob": predict_y}
server_io.save_model("ctr_serving_model", "ctr_client_conf", feed_var_dict,
fetch_var_dict, fluid.default_main_program())
server_io.save_model("ctr_serving_model_kv", "ctr_client_conf_kv",
feed_kv_dict, fetch_var_dict,
fluid.default_main_program())
if __name__ == '__main__':
train()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import paddle.fluid as fluid
import math
def dnn_model(dense_input, sparse_inputs, label, embedding_size,
sparse_feature_dim):
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=False,
size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
x = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb, x
def mlp_input_tensor(emb_sums, dense_tensor):
#if isinstance(dense_tensor, fluid.Variable):
# return fluid.layers.concat(emb_sums, axis=1)
#else:
return fluid.layers.concat(emb_sums + [dense_tensor], axis=1)
def mlp(mlp_input):
fc1 = fluid.layers.fc(input=mlp_input,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(mlp_input.shape[1]))))
fc2 = fluid.layers.fc(input=fc1,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
pre = fluid.layers.fc(input=fc3,
size=2,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
return pre
emb_pair_sums = list(map(embedding_layer, sparse_inputs))
emb_sums = [x[1] for x in emb_pair_sums]
infer_vars = [x[0] for x in emb_pair_sums]
mlp_in = mlp_input_tensor(emb_sums, dense_input)
predict = mlp(mlp_in)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label)
auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=label, num_thresholds=2 ** 12, slide_steps=20)
return predict, avg_cost, auc_var, batch_auc_var, infer_vars
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_client import Client
import sys
import os
import criteo as criteo
import time
from paddle_serving_client.metric import auc
import numpy as np
py_version = sys.version_info[0]
client = Client()
client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9292"])
batch = 1
buf_size = 100
dataset = criteo.CriteoDataset()
dataset.setup(1000001)
test_filelists = ["{}/part-0".format(sys.argv[2])]
reader = dataset.infer_reader(test_filelists, batch, buf_size)
label_list = []
prob_list = []
start = time.time()
for ei in range(10000):
if py_version == 2:
data = reader().next()
else:
data = reader().__next__()
feed_dict = {}
feed_dict['dense_input'] = data[0][0]
for i in range(1, 27):
feed_dict["embedding_{}.tmp_0".format(i - 1)] = np.array(data[0][i]).reshape(-1)
feed_dict["embedding_{}.tmp_0.lod".format(i - 1)] = [0, len(data[0][i])]
fetch_map = client.predict(feed=feed_dict, fetch=["prob"])
print(fetch_map)
prob_list.append(fetch_map['prob'][0][1])
label_list.append(data[0][-1][0])
print(auc(label_list, prob_list))
end = time.time()
print(end - start)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import os
import sys
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer')
response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_dist_kv_infer_op)
op_seq_maker.add_op(response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1])
server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server()
......@@ -46,7 +46,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["save_infer_model/scale_0.tmp_1"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"inputs": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["save_infer_model/scale_0.tmp_0"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"inputs": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["save_infer_model/scale_0.tmp_1"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -46,7 +46,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["score"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -49,7 +49,7 @@ class ImagenetOp(Op):
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["prediction"]
result = {"label": [], "prob": []}
for score in score_list:
......
......@@ -19,6 +19,7 @@ import cv2
from paddle_serving_app.reader import *
import base64
class FasterRCNNOp(Op):
def init_op(self):
self.img_preprocess = Sequential([
......@@ -38,22 +39,30 @@ class FasterRCNNOp(Op):
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
im = self.img_preprocess(im)
imgs.append({
"image": im[np.newaxis,:],
"im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:],
"image": im[np.newaxis, :],
"im_shape":
np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :],
})
feed_dict = {
"image": np.concatenate([x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0)
"image": np.concatenate(
[x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate(
[x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate(
[x["scale_factor"] for x in imgs], axis=0)
}
#for key in feed_dict.keys():
# print(key, feed_dict[key].shape)
return feed_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
#print(fetch_dict)
res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))}
res_dict = {
"bbox_result":
str(self.img_postprocess(
fetch_dict, visualize=False))
}
return res_dict, None, ""
......
......@@ -19,6 +19,7 @@ import cv2
from paddle_serving_app.reader import *
import base64
class PPYoloMbvOp(Op):
def init_op(self):
self.img_preprocess = Sequential([
......@@ -38,23 +39,31 @@ class PPYoloMbvOp(Op):
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
im = self.img_preprocess(im)
imgs.append({
"image": im[np.newaxis,:],
"im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:],
"image": im[np.newaxis, :],
"im_shape":
np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :],
})
feed_dict = {
"image": np.concatenate([x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0)
"image": np.concatenate(
[x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate(
[x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate(
[x["scale_factor"] for x in imgs], axis=0)
}
for key in feed_dict.keys():
print(key, feed_dict[key].shape)
return feed_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
#print(fetch_dict)
res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))}
res_dict = {
"bbox_result":
str(self.img_postprocess(
fetch_dict, visualize=False))
}
return res_dict, None, ""
......
......@@ -19,6 +19,7 @@ import cv2
from paddle_serving_app.reader import *
import base64
class Yolov3Op(Op):
def init_op(self):
self.img_preprocess = Sequential([
......@@ -38,22 +39,30 @@ class Yolov3Op(Op):
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
im = self.img_preprocess(im)
imgs.append({
"image": im[np.newaxis,:],
"im_shape": np.array(list(im.shape[1:])).reshape(-1)[np.newaxis,:],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis,:],
"image": im[np.newaxis, :],
"im_shape":
np.array(list(im.shape[1:])).reshape(-1)[np.newaxis, :],
"scale_factor": np.array([1.0, 1.0]).reshape(-1)[np.newaxis, :],
})
feed_dict = {
"image": np.concatenate([x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate([x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate([x["scale_factor"] for x in imgs], axis=0)
"image": np.concatenate(
[x["image"] for x in imgs], axis=0),
"im_shape": np.concatenate(
[x["im_shape"] for x in imgs], axis=0),
"scale_factor": np.concatenate(
[x["scale_factor"] for x in imgs], axis=0)
}
#for key in feed_dict.keys():
# print(key, feed_dict[key].shape)
return feed_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
#print(fetch_dict)
res_dict = {"bbox_result": str(self.img_postprocess(fetch_dict, visualize=False))}
res_dict = {
"bbox_result":
str(self.img_postprocess(
fetch_dict, visualize=False))
}
return res_dict, None, ""
......
......@@ -4,7 +4,7 @@ This document will takes Imagenet service as an example to introduce how to use
## Get model
```
sh get_model.sh
sh get_data.sh
```
## Start server
......
......@@ -4,7 +4,7 @@
## 获取模型
```
sh get_model.sh
sh get_data.sh
```
## 启动服务
......
......@@ -43,9 +43,11 @@ class BertOp(Op):
print(key, feed_dict[key].shape)
return feed_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
fetch_dict["pooled_output"] = str(fetch_dict["pooled_output"])
return fetch_dict, None, ""
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
new_dict = {}
new_dict["pooled_output"] = str(fetch_dict["pooled_output"])
new_dict["sequence_output"] = str(fetch_dict["sequence_output"])
return new_dict, None, ""
class BertService(WebService):
......
......@@ -42,7 +42,7 @@ class ImagenetOp(Op):
img = self.seq(im)
return {"image": img[np.newaxis, :].copy()}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
print(fetch_dict)
score_list = fetch_dict["score"]
result = {"label": [], "prob": []}
......
......@@ -54,7 +54,7 @@ class DetOp(Op):
imgs.append(det_img[np.newaxis, :].copy())
return {"image": np.concatenate(imgs, axis=0)}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
# print(fetch_dict)
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
......@@ -149,7 +149,7 @@ class RecOp(Op):
return feed_list, False, None, ""
def postprocess(self, input_dicts, fetch_data, log_id):
def postprocess(self, input_dicts, fetch_data, data_id, log_id):
res_list = []
if isinstance(fetch_data, dict):
if len(fetch_data) > 0:
......
......@@ -40,9 +40,10 @@ class UciOp(Op):
proc_dict = {}
return input_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
_LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format(
log_id, fetch_dict))
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
_LOGGER.info(
"UciOp::postprocess >>> data_id:{}, log_id:{}, fetch_dict:{}".
format(data_id, log_id, fetch_dict))
fetch_dict["price"] = str(fetch_dict["price"])
return fetch_dict, None, ""
......
......@@ -41,9 +41,10 @@ class UciOp(Op):
return input_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
_LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format(
log_id, fetch_dict))
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
_LOGGER.info(
"UciOp::postprocess >>> data_id:{}, log_id:{}, fetch_dict:{}".
format(data_id, log_id, fetch_dict))
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict, None, ""
......
......@@ -127,7 +127,8 @@ class LocalPredictor(object):
for i, var in enumerate(model_conf.fetch_var):
self.fetch_names_to_idx_[var.alias_name] = i
self.fetch_names_to_type_[var.alias_name] = var.fetch_type
self.fetch_types_[var.alias_name] = var.fetch_type
self.fetch_names_to_type_[var.alias_name] = var.shape
# set precision of inference.
precision_type = paddle_infer.PrecisionType.Float32
......@@ -253,8 +254,27 @@ class LocalPredictor(object):
feed[name] = feed[name].astype("float32")
elif self.feed_types_[name] == 2:
feed[name] = feed[name].astype("int32")
elif self.feed_types_[name] == 3:
feed[name] = feed[name].astype("float64")
elif self.feed_types_[name] == 4:
feed[name] = feed[name].astype("int16")
elif self.feed_types_[name] == 5:
feed[name] = feed[name].astype("float16")
elif self.feed_types_[name] == 6:
feed[name] = feed[name].astype("uint16")
elif self.feed_types_[name] == 7:
feed[name] = feed[name].astype("uint8")
elif self.feed_types_[name] == 8:
feed[name] = feed[name].astype("int8")
elif self.feed_types_[name] == 9:
feed[name] = feed[name].astype("bool")
elif self.feed_types_[name] == 10:
feed[name] = feed[name].astype("complex64")
elif self.feed_types_[name] == 11:
feed[name] = feed[name].astype("complex128")
else:
raise ValueError("local predictor receives wrong data type")
input_tensor_handle = self.predictor.get_input_handle(name)
if "{}.lod".format(name) in feed:
input_tensor_handle.set_lod([feed["{}.lod".format(name)]])
......
......@@ -337,8 +337,6 @@ class Client(object):
string_shape = []
fetch_names = []
counter = 0
for key in fetch_list:
if key in self.fetch_names_:
fetch_names.append(key)
......
......@@ -31,6 +31,21 @@ import paddle.nn.functional as F
import errno
from paddle.jit import to_static
_PADDLE_DTYPE_2_NUMPY_DTYPE = {
core.VarDesc.VarType.BOOL: 'bool',
core.VarDesc.VarType.FP16: 'float16',
core.VarDesc.VarType.BF16: 'uint16',
core.VarDesc.VarType.FP32: 'float32',
core.VarDesc.VarType.FP64: 'float64',
core.VarDesc.VarType.INT8: 'int8',
core.VarDesc.VarType.INT16: 'int16',
core.VarDesc.VarType.INT32: 'int32',
core.VarDesc.VarType.INT64: 'int64',
core.VarDesc.VarType.UINT8: 'uint8',
core.VarDesc.VarType.COMPLEX64: 'complex64',
core.VarDesc.VarType.COMPLEX128: 'complex128',
}
def save_dygraph_model(serving_model_folder, client_config_folder, model):
paddle.jit.save(model, "serving_tmp")
......@@ -57,13 +72,8 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model):
feed_var = model_conf.FeedVar()
feed_var.alias_name = key
feed_var.name = feed_var_dict[key].name
feed_var.feed_type = var_type_conversion(feed_var_dict[key].dtype)
feed_var.is_lod_tensor = feed_var_dict[key].lod_level >= 1
if feed_var_dict[key].dtype == core.VarDesc.VarType.INT64:
feed_var.feed_type = 0
if feed_var_dict[key].dtype == core.VarDesc.VarType.FP32:
feed_var.feed_type = 1
if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32:
feed_var.feed_type = 2
if feed_var.is_lod_tensor:
feed_var.shape.extend([-1])
else:
......@@ -77,13 +87,8 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model):
fetch_var = model_conf.FetchVar()
fetch_var.alias_name = key
fetch_var.name = fetch_var_dict[key].name
fetch_var.fetch_type = var_type_conversion(fetch_var_dict[key].dtype)
fetch_var.is_lod_tensor = 1
if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64:
fetch_var.fetch_type = 0
if fetch_var_dict[key].dtype == core.VarDesc.VarType.FP32:
fetch_var.fetch_type = 1
if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32:
fetch_var.fetch_type = 2
if fetch_var.is_lod_tensor:
fetch_var.shape.extend([-1])
else:
......@@ -119,6 +124,59 @@ def save_dygraph_model(serving_model_folder, client_config_folder, model):
fout.write(config.SerializeToString())
def var_type_conversion(dtype):
"""
Variable type conversion
Args:
dtype: type of core.VarDesc.VarType.xxxxx
(https://github.com/PaddlePaddle/Paddle/blob/release/2.1/python/paddle/framework/dtype.py)
Returns:
(int)type value, -1 is type matching failed.
int64 => 0;
float32 => 1;
int32 => 2;
float64 => 3;
int16 => 4;
float16 => 5;
bfloat16 => 6;
uint8 => 7;
int8 => 8;
bool => 9;
complex64 => 10,
complex128 => 11;
"""
type_val = -1
if dtype == core.VarDesc.VarType.INT64:
type_val = 0
elif dtype == core.VarDesc.VarType.FP32:
type_val = 1
elif dtype == core.VarDesc.VarType.INT32:
type_val = 2
elif dtype == core.VarDesc.VarType.FP64:
type_val = 3
elif dtype == core.VarDesc.VarType.INT16:
type_val = 4
elif dtype == core.VarDesc.VarType.FP16:
type_val = 5
elif dtype == core.VarDesc.VarType.BF16:
type_val = 6
elif dtype == core.VarDesc.VarType.UINT8:
type_val = 7
elif dtype == core.VarDesc.VarType.INT8:
type_val = 8
elif dtype == core.VarDesc.VarType.BOOL:
type_val = 9
elif dtype == core.VarDesc.VarType.COMPLEX64:
type_val = 10
elif dtype == core.VarDesc.VarType.COMPLEX128:
type_val = 11
else:
type_val = -1
return type_val
def save_model(server_model_folder,
client_config_folder,
feed_var_dict,
......@@ -164,18 +222,13 @@ def save_model(server_model_folder,
config = model_conf.GeneralModelConfig()
#int64 = 0; float32 = 1; int32 = 2;
for key in feed_var_dict:
feed_var = model_conf.FeedVar()
feed_var.alias_name = key
feed_var.name = feed_var_dict[key].name
feed_var.feed_type = var_type_conversion(feed_var_dict[key].dtype)
feed_var.is_lod_tensor = feed_var_dict[key].lod_level >= 1
if feed_var_dict[key].dtype == core.VarDesc.VarType.INT64:
feed_var.feed_type = 0
if feed_var_dict[key].dtype == core.VarDesc.VarType.FP32:
feed_var.feed_type = 1
if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32:
feed_var.feed_type = 2
if feed_var.is_lod_tensor:
feed_var.shape.extend([-1])
else:
......@@ -190,14 +243,10 @@ def save_model(server_model_folder,
fetch_var = model_conf.FetchVar()
fetch_var.alias_name = key
fetch_var.name = fetch_var_dict[key].name
#fetch_var.is_lod_tensor = fetch_var_dict[key].lod_level >= 1
fetch_var.is_lod_tensor = 1
if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64:
fetch_var.fetch_type = 0
if fetch_var_dict[key].dtype == core.VarDesc.VarType.FP32:
fetch_var.fetch_type = 1
if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32:
fetch_var.fetch_type = 2
fetch_var.fetch_type = var_type_conversion(fetch_var_dict[key].dtype)
fetch_var.is_lod_tensor = fetch_var_dict[key].lod_level >= 1
#fetch_var.is_lod_tensor = 1
if fetch_var.is_lod_tensor:
fetch_var.shape.extend([-1])
else:
......
......@@ -101,7 +101,6 @@ def is_gpu_mode(unformatted_gpus):
for ids in op_gpu_list:
if int(ids) >= 0:
return True
return False
......
......@@ -140,7 +140,7 @@ class Server(object):
def set_ir_optimize(self, flag=False):
self.ir_optimization = flag
# Multi-Server does not have this Function.
# Multi-Server does not have this Function.
def set_product_name(self, product_name=None):
if product_name == None:
raise ValueError("product_name can't be None.")
......@@ -437,7 +437,6 @@ class Server(object):
def download_bin(self):
os.chdir(self.module_path)
need_download = False
#acquire lock
version_file = open("{}/version.py".format(self.module_path), "r")
......
......@@ -176,7 +176,7 @@ class DAGExecutor(object):
"in_channel must be Channel type, but get {}".
format(type(in_channel)))
os._exit(-1)
in_channel.add_producer(self.name)
self._in_channel = in_channel
_LOGGER.info("[DAG] set in channel succ, name [{}]".format(self.name))
......@@ -669,14 +669,14 @@ class DAG(object):
out_degree_ops)
dag_views = list(reversed(dag_views))
if not self._build_dag_each_worker:
_LOGGER.debug("================== DAG ====================")
_LOGGER.info("================== DAG ====================")
for idx, view in enumerate(dag_views):
_LOGGER.debug("(VIEW {})".format(idx))
_LOGGER.info("(VIEW {})".format(idx))
for op in view:
_LOGGER.debug(" [{}]".format(op.name))
_LOGGER.info(" [{}]".format(op.name))
for out_op in out_degree_ops[op.name]:
_LOGGER.debug(" - {}".format(out_op.name))
_LOGGER.debug("-------------------------------------------")
_LOGGER.info(" - {}".format(out_op.name))
_LOGGER.info("-------------------------------------------")
# create channels and virtual ops
virtual_op_name_gen = NameGenerator("vir")
......@@ -719,6 +719,7 @@ class DAG(object):
channel = self._gen_channel(channel_name_gen)
channels.append(channel)
op.add_input_channel(channel)
_LOGGER.info("op:{} add input channel.".format(op.name))
pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0:
input_channel = channel
......@@ -726,6 +727,8 @@ class DAG(object):
# if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops:
pred_op.add_output_channel(channel)
_LOGGER.info("pred_op:{} add output channel".format(
pred_op.name))
processed_op.add(op.name)
# find same input op to combine channel
for other_op in actual_next_view[o_idx + 1:]:
......@@ -745,6 +748,7 @@ class DAG(object):
output_channel = self._gen_channel(channel_name_gen)
channels.append(output_channel)
last_op.add_output_channel(output_channel)
_LOGGER.info("last op:{} add output channel".format(last_op.name))
pack_func, unpack_func = None, None
pack_func = response_op.pack_response_package
......@@ -752,7 +756,11 @@ class DAG(object):
actual_ops = virtual_ops
for op in used_ops:
if len(op.get_input_ops()) == 0:
#set special features of the request op.
#1.set unpack function.
#2.set output channel.
unpack_func = op.unpack_request_package
op.add_output_channel(input_channel)
continue
actual_ops.append(op)
......
......@@ -58,13 +58,15 @@ class Op(object):
retry=0,
batch_size=None,
auto_batching_timeout=None,
local_service_handler=None):
local_service_handler=None,
jump_to_ops=[]):
# In __init__, all the parameters are just saved and Op is not initialized
if name is None:
name = _op_name_gen.next()
self.name = name # to identify the type of OP, it must be globally unique
self.concurrency = concurrency # amount of concurrency
self.set_input_ops(input_ops)
self.set_jump_to_ops(jump_to_ops)
self._local_service_handler = local_service_handler
self._server_endpoints = server_endpoints
......@@ -99,9 +101,7 @@ class Op(object):
conf: config.yaml
Returns:
None
"""
# init op
if self.concurrency is None:
self.concurrency = conf["concurrency"]
if self._retry is None:
......@@ -372,6 +372,79 @@ class Op(object):
os._exit(-1)
self._input_ops.append(op)
def get_jump_to_ops(self):
return self._jump_to_ops
def set_jump_to_ops(self, ops):
"""
Set jump to ops, then, this op can send channeldata to output channel.
Args:
ops: op list to be jumpped
Returns:
None.
"""
if not isinstance(ops, list):
ops = [] if ops is None else [ops]
self._jump_to_ops = []
for op in ops:
if not isinstance(op, Op):
_LOGGER.critical(
self._log("Failed to set input_ops: input op "
"must be Op type, not {}".format(type(op))))
os._exit(-1)
self._jump_to_ops.append(op)
def is_jump_op(self):
"""
The op has _jump_to_ops members or not.
Args:
None
Returns:
True or False
"""
return len(self._jump_to_ops) > 0
def check_jumping(self, input_data):
"""
Check whether to send data to jump ops.WhileOp needs to rewrite
this interface. this function returns False default.
Args:
input_data: input data to be preprocessed
Returns:
True, send data to the output channel of jump ops
False, send data to output channel.
"""
return False
def get_output_channels_of_jump_ops(self):
"""
Get output channels of jump ops
Args:
None
Returns:
list of channels
"""
channels = []
if self.is_jump_op() is False:
return channels
for op in self._jump_to_ops:
_LOGGER.info("op:{} extend op._get_output_channels:{}".format(
op.name, op._get_output_channels()))
channels.extend(op._get_output_channels())
_LOGGER.info("get_output_channels_of_jump_ops, channels:{}".format(
channels))
return channels
def add_input_channel(self, channel):
"""
Adding one input channel to the Op. Each op have many front op,
......@@ -410,6 +483,7 @@ class Op(object):
os._exit(-1)
channel.add_producer(self.name)
self._outputs.append(channel)
_LOGGER.info("op:{} add output_channel {}".format(self.name, channel))
def clean_output_channels(self):
self._outputs = []
......@@ -424,7 +498,7 @@ class Op(object):
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id, 0 default
data_id: inner unique id, increase auto
log_id: global unique id for RTT, 0 default
Return:
......@@ -484,12 +558,13 @@ class Op(object):
'''
return call_result
def postprocess(self, input_data, fetch_data, log_id=0):
def postprocess(self, input_data, fetch_data, data_id=0, log_id=0):
"""
In postprocess stage, assemble data for next op or output.
Args:
input_data: data returned in preprocess stage, dict(for single predict) or list(for batch predict)
fetch_data: data returned in process stage, dict(for single predict) or list(for batch predict)
data_id: inner unique id, increase auto
log_id: logid, 0 default
Returns:
......@@ -593,7 +668,8 @@ class Op(object):
self.device_type, self.devices, self.mem_optim,
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list))
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops()))
p.daemon = True
p.start()
process.append(p)
......@@ -629,7 +705,8 @@ class Op(object):
self.device_type, self.devices, self.mem_optim,
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list))
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops()))
# When a process exits, it attempts to terminate
# all of its daemonic child processes.
t.daemon = True
......@@ -954,7 +1031,7 @@ class Op(object):
prod_errcode, prod_errinfo = None, None
try:
postped_data, prod_errcode, prod_errinfo = self.postprocess(
parsed_data_dict[data_id], midped_data,
parsed_data_dict[data_id], midped_data, data_id,
logid_dict.get(data_id))
except Exception as e:
error_info = "(data_id={} log_id={}) {} Failed to postprocess: {}".format(
......@@ -1100,7 +1177,8 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer, model_config, workdir, thread_num,
device_type, devices, mem_optim, ir_optim, precision, use_mkldnn,
mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list):
mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list,
is_jump_op, output_channels_of_jump_ops):
"""
_run() is the entry function of OP process / thread model.When client
type is local_predictor in process mode, the CUDA environment needs to
......@@ -1127,6 +1205,8 @@ class Op(object):
mkldnn_cache_capacity: cache capacity of mkldnn, 0 means no limit.
mkldnn_op_list: OP list optimized by mkldnn, None default.
mkldnn_bf16_op_list: OP list optimized by mkldnn bf16, None default.
is_jump_op: OP has jump op list or not, False default.
output_channels_of_jump_ops: all output channels of jump ops.
Returns:
None
......@@ -1267,27 +1347,46 @@ class Op(object):
break
if len(postped_data_dict) == 0:
continue
# push data to channel (if run succ)
start = int(round(_time() * 1000000))
try:
profile_str = profiler.gen_profile_str()
for data_id, postped_data in postped_data_dict.items():
if self._server_use_profile:
sys.stderr.write(profile_str)
self._push_to_output_channels(
data=postped_data,
channels=output_channels,
profile_str=profile_str,
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
after_outchannel_time = _time()
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms".
format(data_id, self.name, (after_outchannel_time -
after_postp_time) * 1000))
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}".
format(data_id, self.name, postped_data.get_all_data()))
if self.is_jump_op() is True and self.check_jumping(
postped_data_dict) is True:
# push data to output channel of ops to be jumped
for data_id, postped_data in postped_data_dict.items():
if self._server_use_profile:
sys.stderr.write(profile_str)
self._push_to_output_channels(
data=postped_data,
channels=output_channels_of_jump_ops,
profile_str=profile_str,
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
after_outchannel_time = _time()
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL OF JUMP OPs! op:{} push cost:{} ms".
format(data_id, self.name, (after_outchannel_time -
after_postp_time) *
1000))
else:
# push data to output channel.
for data_id, postped_data in postped_data_dict.items():
if self._server_use_profile:
sys.stderr.write(profile_str)
self._push_to_output_channels(
data=postped_data,
channels=output_channels,
profile_str=profile_str,
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
after_outchannel_time = _time()
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms".
format(data_id, self.name, (after_outchannel_time -
after_postp_time) *
1000))
except ChannelStopError:
_LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
......@@ -1410,7 +1509,7 @@ class RequestOp(Op):
for idx, key in enumerate(request.key):
dict_data[key] = request.value[idx]
log_id = request.logid
_LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
_LOGGER.debug("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
request.method))
......
......@@ -36,6 +36,7 @@ go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2
go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3
go get -u google.golang.org/grpc@v1.33.0
go env -w GO111MODULE=auto
build_whl_list=(build_cpu_server build_gpu_server build_client build_app)
rpc_model_list=(grpc_fit_a_line grpc_yolov4 pipeline_imagenet bert_rpc_gpu bert_rpc_cpu ResNet50_rpc \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册