未验证 提交 e61515bf 编写于 作者: T TeslaZhao 提交者: GitHub

Merge branch 'PaddlePaddle:develop' into develop

......@@ -39,4 +39,8 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kvtool.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/tool/kv_to_seqfile.py DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool/source DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
......@@ -212,7 +212,7 @@ class slim_hash_map {
int copy_data_from(const slim_hash_map& rhs) {
destroy();
LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize;
if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) {
......@@ -231,7 +231,7 @@ class slim_hash_map {
<< sizeof(hash_node_t) * BLOCK_SIZE;
return -1;
}
LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE;
memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE);
......@@ -265,11 +265,13 @@ class slim_hash_map {
}
size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]);
int node_cnt = 0;
while (node != NULL && node->data.first != key) {
LOG(INFO) << "node link get:" << node->data.first;
node_cnt++;
node = get_node(node->next);
}
LOG(INFO) << "key: " << key << " , found count: " << node_cnt;
if (node == NULL) {
return end();
}
......@@ -390,7 +392,6 @@ class slim_hash_map {
if (node != NULL) {
return node->data.second;
}
return add_node(index, key)->data.second;
}
void clear() {
......@@ -399,16 +400,16 @@ class slim_hash_map {
m_nFreeEntries = 0;
m_nSize = 0;
}
bool load(const char* file) {
bool load(const char* file, uint32_t block_id) {
// clear();
// bias = 0 means base mode, bias = K means patch mode, and base dict has size K
int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast<char*>(malloc(size * 100000));
LOG(INFO) << "current block id: " << block_id;
if (fp == NULL || buf == NULL) {
return false;
}
size_t read_count;
bool err = false;
key_t key;
......@@ -423,6 +424,8 @@ class slim_hash_map {
for (int i = 0; i < static_cast<int>(read_count); ++i) {
key = *(reinterpret_cast<key_t*>(buf + i * size));
value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t)));
value = ((uint64_t)block_id << 32) | value;
LOG(INFO) << "slim map key: " << key << " , value: " << value;
(*this)[key] = value;
}
}
......@@ -557,7 +560,6 @@ class slim_hash_map {
}
hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize;
if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr);
......@@ -569,7 +571,7 @@ class slim_hash_map {
}
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
//LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum;
if (block >= m_nBlockNum) {
try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
......@@ -581,7 +583,6 @@ class slim_hash_map {
return NULL;
}
}
uint32_t addr = m_nNextEntry;
++m_nNextEntry;
hash_node_t* node = get_node(addr);
......
......@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path,
bool in_mem,
const std::string& v_path) {
TIME_FLAG(load_start);
int ret = load_index(dict_path, v_path);
if (ret != E_OK) {
LOG(WARNING) << "load index failed";
return ret;
}
LOG(INFO) << "load index in mem mode: " << in_mem ;
if (in_mem) {
ret = load_data(dict_path, v_path);
if (ret != E_OK) {
......@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path);
index_n_path.append(v_path);
index_n_path.append("/index.n");
uint32_t cur_block_id = 0;
if (_base_dict) cur_block_id = _base_dict->_block_set.size();
LOG(INFO) << "index file path: " << index_n_path;
//ERR HERE
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"),
&fclose);
if (pf.get() == NULL) {
......@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
} else {
if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
LOG(ERROR) << "copy data from old index failed in patch mode";
return E_DATA_ERROR;
}
file_idx = 0;
LOG(INFO)
<< "index check file len failed in patch mode, set file_idx to 0";
<< "index check fail, direct copy";
}
}
LOG(INFO) << "resize slim table, new count: " << count/2;
_slim_table.resize(count / 2);
char file[1024];
......@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
dict_path.c_str(),
v_path.c_str(),
file_idx);
LOG(INFO) << "load file str: " << file;
if (stat(file, &fstat) < 0) {
if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist";
......@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
<< (uint64_t)fstat.st_size;
return E_DATA_ERROR;
}
LOG(INFO) << "loading from index." << file_idx;
if (!_slim_table.load(file) || _slim_table.size() > count) {
LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size();
if (!_slim_table.load(file, cur_block_id)) {
return E_DATA_ERROR;
}
......@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
}
int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
if (_base_dict) {
_block_set = _base_dict->_block_set;
LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size;
for (size_t i = 0; i < _block_set.size(); ++i) {
block_size.push_back(_block_set[i].size);
total_data_size += _block_set[i].size;
}
}
std::string data_n_path(dict_path);
......@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
......@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR;
}
block_size.push_back(size);
LOG(INFO) << "new block size: " << size;
total_data_size += size;
}
g_data_size << (total_data_size / 1024 / 1024);
......@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
pf = NULL;
uint32_t old_size = _block_set.size();
LOG(INFO) << "load data old size: " << old_size;
for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) {
old_size = 0;
break;
}
}
_block_set.resize(count);
LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size;
_block_set.resize(count + old_size);
for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024];
LOG(INFO) << "load from data." << i;
snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
//snprintf(
// data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size);
FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) {
LOG(WARNING) << "open data file [" << data_path << " failed";
LOG(WARNING) << "open data file [" << data_path << " ]failed";
_block_set[i].s_data.reset();
_block_set[i].size = 0;
continue;
}
_block_set[i].s_data.reset(
reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
_block_set[i].s_data.reset(reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed";
fclose(data_file);
return E_OOM;
}
_block_set[i].size = block_size[i];
if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()),
sizeof(char),
_block_set[i].size,
......@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
fclose(data_file);
return E_DATA_ERROR;
}
LOG(INFO) << "load new data to BlockSet succ";
for (size_t ii = 0; ii < 20; ++ii) {
LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]);
}
fclose(data_file);
}
......@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag);
LOG(INFO) << "search key: " << id << " , addr: " << addr;
if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size;
char* block_data = NULL;
block_data = _block_set[id].s_data.get();
if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) {
......@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
<< default_buffer_size;
return false;
}
LOG(INFO) << "seek key: " << key << " , addr: " << addr;
memcpy(buff,
(block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t));
......
......@@ -17,68 +17,56 @@ package transfer
import (
"fmt"
"github.com/Badangel/logex"
"os"
"time"
"transfer/dict"
)
func Start() {
go BackupTransfer()
logex.Notice(">>> starting server...")
addr := ":" + Port
err := startHttp(addr)
if err != nil {
logex.Fatalf("start http(addr=%v) failed: %v", addr, err)
os.Exit(255)
}
logex.Notice(">>> start server succ")
BackupTransfer()
}
func BackupTransfer() {
for {
//trigger
version, err := TriggerStart(Dict.DonefileAddress)
if err != nil {
logex.Fatalf("[trigger err]trigger err:%v ", err)
fmt.Printf("[error]trigger err:%v \n", err)
break
}
logex.Noticef("[trigger] get version:%v \n", version)
if version.Id == 0 {
logex.Noticef("[sleep]no new version, sleep 5 min")
fmt.Printf("[sleep]no new version, wait 5 min\n")
time.Sleep(5 * time.Minute)
continue
}
//trigger
version, err := TriggerStart(Dict.DonefileAddress)
if err != nil {
logex.Fatalf("[trigger err]trigger err:%v ", err)
fmt.Printf("[error]trigger err:%v \n", err)
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
return
}
logex.Noticef("[trigger] get version:%v \n", version)
Dict.WaitVersionInfo = version
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile()
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile()
//builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err)
}
//builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err)
}
if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
if Dict.WaitVersionInfo.Mode == dict.DELTA {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
//deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
//deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
}
......@@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
Wget(addr, donefileAddr)
addr = donefileAddr
}
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err := ioutil.ReadFile(baseDonefile)
VersionLen := len(Dict.CurrentVersionInfo)
version.DictName = Dict.DictName
if err != nil {
fmt.Printf("[trigrer]read files err:%v \n", err)
logex.Fatalf("[trigrer]read files err:%v ", err)
fmt.Printf("get into mode check here\n")
if Dict.DictMode == dict.BASE_ONLY {
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err_0 := ioutil.ReadFile(baseDonefile)
if err_0 != nil {
fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
} else {
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
index := len(lines) - 1
......@@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
version.Mode = dict.BASE
return
}
}
if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 {
}
}
if Dict.DictMode == dict.BASR_DELTA {
patchDonefile := addr + "/patch.txt"
fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile)
logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile)
contents, err = ioutil.ReadFile(patchDonefile)
if err != nil {
fmt.Printf("read files err:%v \n", err)
contents, err_0 := ioutil.ReadFile(patchDonefile)
if err_0 != nil {
fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
fmt.Printf("[trigger]get patch lines here\n")
for index := 0; index < len(lines)-1; index++ {
if len(lines[index]) < 3 {
logex.Noticef("[trigrer]get patch donfile info error")
......@@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id)
newKey, _ := strconv.Atoi(donefileInfo.Key)
if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key {
fmt.Printf("[trigger]read patch id: %d, key: %d\n", newId, newKey)
if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Id {
version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion
version.Version = deployVersion
version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend
version.Depend = deployVersion
version.Mode = dict.DELTA
return
}
......
......@@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) {
func Wget(ftpPath string, downPath string) {
var params []string
params = append(params, "-P")
params = append(params, "--limit-rate=100m")
params = append(params, "-P")
params = append(params, downPath)
params = append(params, "-r")
params = append(params, "-N")
......@@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) {
if err != nil {
fmt.Printf("wget exe: %v\n", err)
}
}
\ No newline at end of file
}
......@@ -169,7 +169,7 @@ int GeneralDistKVInferOp::inference() {
// call paddle inference here
if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name();
LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name();
return -1;
}
int64_t end = timeline.TimeStampUS();
......
......@@ -2,3 +2,16 @@ set(seq_gen_src ${CMAKE_CURRENT_LIST_DIR}/seq_generator.cpp ${CMAKE_CURRENT_LIS
LIST(APPEND seq_gen_src ${PROTO_SRCS})
add_executable(seq_generator ${seq_gen_src})
target_link_libraries(seq_generator protobuf -lpthread)
set(seq_reader_src ${CMAKE_CURRENT_LIST_DIR}/seq_reader.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp)
add_executable(seq_reader ${seq_reader_src})
install(TARGETS seq_reader
RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin
ARCHIVE DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/lib
LIBRARY DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/so
)
install(TARGETS seq_reader RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
install(TARGETS seq_generator RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/tool)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/time.h>
#include <limits.h>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
std::string string_to_hex(const std::string& input) {
static const char* const lut = "0123456789ABCDEF";
size_t len = input.length();
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = input[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
void printSeq(std::string file, int limit) {
SequenceFileRecordReader reader(file.c_str());
if (reader.open() != 0) {
std::cerr << "open file failed! " << file;
return;
}
if (reader.read_header() != 0) {
std::cerr << "read header error! " << file;
reader.close();
return;
}
Record record(reader.get_header());
int total_count = 0;
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t *>(const_cast<char *>(record.key.data()));
total_count++;
int64_t value_length = record.record_len - record.key_len;
std::cout << "key: " << key << " , value: " << string_to_hex(record.value.c_str()) << std::endl;
if (total_count >= limit) {
break;
}
}
if (reader.close() != 0) {
std::cerr << "close file failed! " << file;
return;
}
}
int main(int argc, char **argv) {
if (argc != 3 && argc != 2) {
std::cout << "Seq Reader Usage:" << std::endl;
std::cout << "get all keys: ./seq_reader $FILENAME " << std::endl;
std::cout << "get some keys: ./seq_reader $FILENAME $KEY_NUM" << std::endl;
return -1;
}
if (argc == 3 || argc == 2) {
const char* filename_str = argv[1];
std::cout << "cstr filename is " << filename_str << std::endl;
std::string filename = filename_str;
std::cout << "filename is " << filename << std::endl;
if (argc == 3) {
const char* key_num_str = argv[2];
int key_limit = std::stoi(key_num_str);
printSeq(filename, key_limit);
} else {
printSeq(filename, INT_MAX);
}
}
return 0;
}
......@@ -81,7 +81,7 @@ export PATH=$PATH:$GOPATH/bin
## Get go packages
```shell
go env -w GO111MODULE=on
go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn,direct
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2
......
......@@ -80,7 +80,7 @@ export PATH=$PATH:$GOPATH/bin
## 获取 Go packages
```shell
go env -w GO111MODULE=on
go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn,direct
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v1.15.2
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v1.15.2
......
## 如果获得稀疏参数索引Cube所需的模型输入
### 背景知识
推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。
稀疏参数索引的模型格式是SequenceFile,源自Hadoop生态的键值对格式文件。
为了方便调试,我们给出了从特定格式的可读文本文件到SequenceFile格式文件的转换工具,以及SequenceFile格式文件与可阅读文字的转换。
用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。
用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。
### 预备知识
- 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md)
### 用法
在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。
#### 生成SequenceFile
`output/tool/`下,修改`output/tool/source/file.txt`,该文件每一行对应一个键值对,用冒号`:`区分key和value部分。
例如:
```
1676869128226002114:48241 37064 91 -539 114 51 -122 269 229 -134 -282
1657749292782759014:167 40 98 27 117 10 -29 15 74 67 -54
```
执行
```
python kv_to_seqfile.py
```
即可生成`data`文件夹,我们看下它的结构
```
.
├── 20210805095422
│   └── base
│   └── feature
└── donefile
└── base.txt
```
其中`20210805095422/base/feature` 就是SequenceFile格式文件,donefile保存在`donefile/base.txt`
#### 查看SequenceFile
我们使用`seq_reader`工具来解读SequenceFile格式文件。
```
./seq_reader 20210805095422/base/feature 10 # 阅读开头的10个KV对
./seq_reader 20210805095422/base/feature # 阅读所有KV对
```
结果
```
key: 1676869128226002114 , value: 343832343109333730363409093931092D35333909313134093531092D3132320932363909323239092D313334092D323832
key: 1657749292782759014 , value: 3136370934300909393809323709313137093130092D3239093135093734093637092D3534
```
其中value 我们目前都以16进制的形式打印。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册