未验证 提交 c796b46f 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge pull request #1337 from bjjwwang/cube_062

Cube Server Transfer Update
...@@ -212,7 +212,7 @@ class slim_hash_map { ...@@ -212,7 +212,7 @@ class slim_hash_map {
int copy_data_from(const slim_hash_map& rhs) { int copy_data_from(const slim_hash_map& rhs) {
destroy(); destroy();
LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize;
if (rhs.m_nHashSize > 0) { if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize]; m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) { if (!m_hashTable) {
...@@ -231,7 +231,7 @@ class slim_hash_map { ...@@ -231,7 +231,7 @@ class slim_hash_map {
<< sizeof(hash_node_t) * BLOCK_SIZE; << sizeof(hash_node_t) * BLOCK_SIZE;
return -1; return -1;
} }
LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE;
memcpy(m_blockAddr[m_nBlockNum], memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum], rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE); sizeof(hash_node_t) * BLOCK_SIZE);
...@@ -265,11 +265,13 @@ class slim_hash_map { ...@@ -265,11 +265,13 @@ class slim_hash_map {
} }
size_type index = key % m_nHashSize; size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]); hash_node_t* node = get_node(m_hashTable[index]);
int node_cnt = 0;
while (node != NULL && node->data.first != key) { while (node != NULL && node->data.first != key) {
LOG(INFO) << "node link get:" << node->data.first;
node_cnt++;
node = get_node(node->next); node = get_node(node->next);
} }
LOG(INFO) << "key: " << key << " , found count: " << node_cnt;
if (node == NULL) { if (node == NULL) {
return end(); return end();
} }
...@@ -390,7 +392,6 @@ class slim_hash_map { ...@@ -390,7 +392,6 @@ class slim_hash_map {
if (node != NULL) { if (node != NULL) {
return node->data.second; return node->data.second;
} }
return add_node(index, key)->data.second; return add_node(index, key)->data.second;
} }
void clear() { void clear() {
...@@ -399,16 +400,16 @@ class slim_hash_map { ...@@ -399,16 +400,16 @@ class slim_hash_map {
m_nFreeEntries = 0; m_nFreeEntries = 0;
m_nSize = 0; m_nSize = 0;
} }
bool load(const char* file) { bool load(const char* file, uint32_t block_id) {
// clear(); // clear();
// bias = 0 means base mode, bias = K means patch mode, and base dict has size K
int size = sizeof(key_t) + sizeof(value_t); int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb"); FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast<char*>(malloc(size * 100000)); char* buf = reinterpret_cast<char*>(malloc(size * 100000));
LOG(INFO) << "current block id: " << block_id;
if (fp == NULL || buf == NULL) { if (fp == NULL || buf == NULL) {
return false; return false;
} }
size_t read_count; size_t read_count;
bool err = false; bool err = false;
key_t key; key_t key;
...@@ -423,6 +424,8 @@ class slim_hash_map { ...@@ -423,6 +424,8 @@ class slim_hash_map {
for (int i = 0; i < static_cast<int>(read_count); ++i) { for (int i = 0; i < static_cast<int>(read_count); ++i) {
key = *(reinterpret_cast<key_t*>(buf + i * size)); key = *(reinterpret_cast<key_t*>(buf + i * size));
value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t))); value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t)));
value = ((uint64_t)block_id << 32) | value;
LOG(INFO) << "slim map key: " << key << " , value: " << value;
(*this)[key] = value; (*this)[key] = value;
} }
} }
...@@ -557,7 +560,6 @@ class slim_hash_map { ...@@ -557,7 +560,6 @@ class slim_hash_map {
} }
hash_node_t* add_node(uint32_t index, const key_type& key) { hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize; ++m_nSize;
if (m_nFreeEntries) { if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries; uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr); hash_node_t* node = get_node(addr);
...@@ -569,7 +571,7 @@ class slim_hash_map { ...@@ -569,7 +571,7 @@ class slim_hash_map {
} }
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23); uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
//LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum;
if (block >= m_nBlockNum) { if (block >= m_nBlockNum) {
try { try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE]; m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
...@@ -581,7 +583,6 @@ class slim_hash_map { ...@@ -581,7 +583,6 @@ class slim_hash_map {
return NULL; return NULL;
} }
} }
uint32_t addr = m_nNextEntry; uint32_t addr = m_nNextEntry;
++m_nNextEntry; ++m_nNextEntry;
hash_node_t* node = get_node(addr); hash_node_t* node = get_node(addr);
......
...@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path, ...@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path,
bool in_mem, bool in_mem,
const std::string& v_path) { const std::string& v_path) {
TIME_FLAG(load_start); TIME_FLAG(load_start);
int ret = load_index(dict_path, v_path); int ret = load_index(dict_path, v_path);
if (ret != E_OK) { if (ret != E_OK) {
LOG(WARNING) << "load index failed"; LOG(WARNING) << "load index failed";
return ret; return ret;
} }
LOG(INFO) << "load index in mem mode: " << in_mem ;
if (in_mem) { if (in_mem) {
ret = load_data(dict_path, v_path); ret = load_data(dict_path, v_path);
if (ret != E_OK) { if (ret != E_OK) {
...@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path); std::string index_n_path(dict_path);
index_n_path.append(v_path); index_n_path.append(v_path);
index_n_path.append("/index.n"); index_n_path.append("/index.n");
uint32_t cur_block_id = 0;
if (_base_dict) cur_block_id = _base_dict->_block_set.size();
LOG(INFO) << "index file path: " << index_n_path; LOG(INFO) << "index file path: " << index_n_path;
//ERR HERE
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"), std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"),
&fclose); &fclose);
if (pf.get() == NULL) { if (pf.get() == NULL) {
...@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
} else { } else {
if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
LOG(ERROR) << "copy data from old index failed in patch mode";
return E_DATA_ERROR;
}
file_idx = 0; file_idx = 0;
LOG(INFO) LOG(INFO)
<< "index check file len failed in patch mode, set file_idx to 0"; << "index check fail, direct copy";
} }
} }
LOG(INFO) << "resize slim table, new count: " << count/2;
_slim_table.resize(count / 2); _slim_table.resize(count / 2);
char file[1024]; char file[1024];
...@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
dict_path.c_str(), dict_path.c_str(),
v_path.c_str(), v_path.c_str(),
file_idx); file_idx);
LOG(INFO) << "load file str: " << file;
if (stat(file, &fstat) < 0) { if (stat(file, &fstat) < 0) {
if (errno == ENOENT) { if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist"; LOG(WARNING) << "index." << file_idx << " not exist";
...@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
<< (uint64_t)fstat.st_size; << (uint64_t)fstat.st_size;
return E_DATA_ERROR; return E_DATA_ERROR;
} }
LOG(INFO) << "loading from index." << file_idx; LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size();
if (!_slim_table.load(file) || _slim_table.size() > count) { if (!_slim_table.load(file, cur_block_id)) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
...@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
} }
int Dict::load_data(const std::string& dict_path, const std::string& v_path) { int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
if (_base_dict) { if (_base_dict) {
_block_set = _base_dict->_block_set; _block_set = _base_dict->_block_set;
LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size;
for (size_t i = 0; i < _block_set.size(); ++i) {
block_size.push_back(_block_set[i].size);
total_data_size += _block_set[i].size;
}
} }
std::string data_n_path(dict_path); std::string data_n_path(dict_path);
...@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) { for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0; uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) { if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
...@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
block_size.push_back(size); block_size.push_back(size);
LOG(INFO) << "new block size: " << size;
total_data_size += size; total_data_size += size;
} }
g_data_size << (total_data_size / 1024 / 1024); g_data_size << (total_data_size / 1024 / 1024);
...@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
pf = NULL; pf = NULL;
uint32_t old_size = _block_set.size(); uint32_t old_size = _block_set.size();
LOG(INFO) << "load data old size: " << old_size;
for (size_t i = 0; i < old_size; ++i) { for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) { if (_block_set[i].size != block_size[i]) {
old_size = 0; old_size = 0;
break; break;
} }
} }
_block_set.resize(count); LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size;
_block_set.resize(count + old_size);
for (size_t i = old_size; i < _block_set.size(); ++i) { for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024]; char data_path[1024];
LOG(INFO) << "load from data." << i; LOG(INFO) << "load from data." << i;
snprintf( //snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); // data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size);
FILE* data_file = fopen(data_path, "rb"); FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) { if (data_file == NULL) {
LOG(WARNING) << "open data file [" << data_path << " failed"; LOG(WARNING) << "open data file [" << data_path << " ]failed";
_block_set[i].s_data.reset(); _block_set[i].s_data.reset();
_block_set[i].size = 0; _block_set[i].size = 0;
continue; continue;
} }
_block_set[i].s_data.reset(reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
_block_set[i].s_data.reset(
reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) { if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed"; LOG(ERROR) << "malloc data failed";
fclose(data_file); fclose(data_file);
return E_OOM; return E_OOM;
} }
_block_set[i].size = block_size[i]; _block_set[i].size = block_size[i];
if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()), if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()),
sizeof(char), sizeof(char),
_block_set[i].size, _block_set[i].size,
...@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
fclose(data_file); fclose(data_file);
return E_DATA_ERROR; return E_DATA_ERROR;
} }
LOG(INFO) << "load new data to BlockSet succ";
for (size_t ii = 0; ii < 20; ++ii) {
LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]);
}
fclose(data_file); fclose(data_file);
} }
...@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { ...@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
uint64_t flag = it->second; uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32); uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag); uint64_t addr = (uint32_t)(flag);
LOG(INFO) << "search key: " << id << " , addr: " << addr;
if (_block_set.size() > id) { if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size; uint32_t block_size = _block_set[id].size;
char* block_data = NULL; char* block_data = NULL;
block_data = _block_set[id].s_data.get(); block_data = _block_set[id].s_data.get();
if (block_data && addr + sizeof(uint32_t) <= block_size) { if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr)); uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) { if (addr + len <= block_size && len >= sizeof(uint32_t)) {
...@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { ...@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
<< default_buffer_size; << default_buffer_size;
return false; return false;
} }
LOG(INFO) << "seek key: " << key << " , addr: " << addr;
memcpy(buff, memcpy(buff,
(block_data + addr + sizeof(uint32_t)), (block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t)); len - sizeof(uint32_t));
......
...@@ -17,68 +17,56 @@ package transfer ...@@ -17,68 +17,56 @@ package transfer
import ( import (
"fmt" "fmt"
"github.com/Badangel/logex" "github.com/Badangel/logex"
"os"
"time"
"transfer/dict" "transfer/dict"
) )
func Start() { func Start() {
go BackupTransfer() BackupTransfer()
logex.Notice(">>> starting server...")
addr := ":" + Port
err := startHttp(addr)
if err != nil {
logex.Fatalf("start http(addr=%v) failed: %v", addr, err)
os.Exit(255)
}
logex.Notice(">>> start server succ")
} }
func BackupTransfer() { func BackupTransfer() {
for { //trigger
//trigger version, err := TriggerStart(Dict.DonefileAddress)
version, err := TriggerStart(Dict.DonefileAddress) if err != nil {
if err != nil { logex.Fatalf("[trigger err]trigger err:%v ", err)
logex.Fatalf("[trigger err]trigger err:%v ", err) fmt.Printf("[error]trigger err:%v \n", err)
fmt.Printf("[error]trigger err:%v \n", err) fmt.Print("transfer over!")
break logex.Noticef("[transfer]status machine exit!")
} return
logex.Noticef("[trigger] get version:%v \n", version) }
if version.Id == 0 { logex.Noticef("[trigger] get version:%v \n", version)
logex.Noticef("[sleep]no new version, sleep 5 min")
fmt.Printf("[sleep]no new version, wait 5 min\n")
time.Sleep(5 * time.Minute)
continue
}
Dict.WaitVersionInfo = version Dict.WaitVersionInfo = version
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile() WriteWaitVersionInfoToFile()
//builder //builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string) Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile() WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil { if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err) logex.Fatalf("builder err:%v \n", err)
} }
if Dict.WaitVersionInfo.Mode == dict.BASE { if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile() WriteCurrentVersionInfoToFile()
} }
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) if Dict.WaitVersionInfo.Mode == dict.DELTA {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
//deployer //deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile() WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil { if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err) logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
} }
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
fmt.Print("transfer over!") fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!") logex.Noticef("[transfer]status machine exit!")
} }
...@@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { ...@@ -38,18 +38,19 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
Wget(addr, donefileAddr) Wget(addr, donefileAddr)
addr = donefileAddr addr = donefileAddr
} }
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err := ioutil.ReadFile(baseDonefile)
VersionLen := len(Dict.CurrentVersionInfo) VersionLen := len(Dict.CurrentVersionInfo)
version.DictName = Dict.DictName version.DictName = Dict.DictName
if err != nil { fmt.Printf("get into mode check here\n")
fmt.Printf("[trigrer]read files err:%v \n", err) if Dict.DictMode == dict.BASE_ONLY {
logex.Fatalf("[trigrer]read files err:%v ", err) baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err_0 := ioutil.ReadFile(baseDonefile)
if err_0 != nil {
fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return return
} else { } else {
contentss := string(contents) contentss := string(contents)
lines := strings.Split(contentss, "\n") lines := strings.Split(contentss, "\n")
index := len(lines) - 1 index := len(lines) - 1
...@@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { ...@@ -80,19 +81,21 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
version.Mode = dict.BASE version.Mode = dict.BASE
return return
} }
} }
if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 { }
if Dict.DictMode == dict.BASR_DELTA {
patchDonefile := addr + "/patch.txt" patchDonefile := addr + "/patch.txt"
fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile) fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile)
logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile) logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile)
contents, err = ioutil.ReadFile(patchDonefile) contents, err_0 := ioutil.ReadFile(patchDonefile)
if err != nil { if err_0 != nil {
fmt.Printf("read files err:%v \n", err) fmt.Printf("[trigrer]read files err:%v \n", err_0)
logex.Fatalf("[trigrer]read files err:%v ", err_0)
return return
} else { } else {
contentss := string(contents) contentss := string(contents)
lines := strings.Split(contentss, "\n") lines := strings.Split(contentss, "\n")
fmt.Printf("[trigger]get patch lines here\n")
for index := 0; index < len(lines)-1; index++ { for index := 0; index < len(lines)-1; index++ {
if len(lines[index]) < 3 { if len(lines[index]) < 3 {
logex.Noticef("[trigrer]get patch donfile info error") logex.Noticef("[trigrer]get patch donfile info error")
...@@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { ...@@ -106,14 +109,15 @@ func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
logex.Noticef("[trigrer]donfile info:%v", donefileInfo) logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id) newId, _ := strconv.Atoi(donefileInfo.Id)
newKey, _ := strconv.Atoi(donefileInfo.Key) newKey, _ := strconv.Atoi(donefileInfo.Key)
if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key { fmt.Printf("[trigger]read patch id: %d, key: %d\n", newId, newKey)
if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Id {
version.Id = newId version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key) version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix()) deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion version.CreateTime = deployVersion
version.Version = deployVersion version.Version = deployVersion
version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend version.Depend = deployVersion
version.Mode = dict.DELTA version.Mode = dict.DELTA
return return
} }
......
...@@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) { ...@@ -96,7 +96,8 @@ func ExeCommad(files string, params []string) (err error) {
func Wget(ftpPath string, downPath string) { func Wget(ftpPath string, downPath string) {
var params []string var params []string
params = append(params, "-P") params = append(params, "--limit-rate=100m")
params = append(params, "-P")
params = append(params, downPath) params = append(params, downPath)
params = append(params, "-r") params = append(params, "-r")
params = append(params, "-N") params = append(params, "-N")
...@@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) { ...@@ -110,4 +111,4 @@ func Wget(ftpPath string, downPath string) {
if err != nil { if err != nil {
fmt.Printf("wget exe: %v\n", err) fmt.Printf("wget exe: %v\n", err)
} }
} }
\ No newline at end of file
## 如果获得稀疏参数索引Cube所需的模型输入 ## 如果获得稀疏参数索引Cube所需的模型输入
#### 背景知识 ### 背景知识
推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。 推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。
...@@ -11,11 +11,11 @@ ...@@ -11,11 +11,11 @@
用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。 用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。
用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。 用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。
#### 预备知识 ### 预备知识
- 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md) - 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md)
#### 用法 ### 用法
在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。 在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册