提交 9a69e3b1 编写于 作者: Y Ybjjwwang

fix transfer

上级 ba84e617
...@@ -212,7 +212,7 @@ class slim_hash_map { ...@@ -212,7 +212,7 @@ class slim_hash_map {
int copy_data_from(const slim_hash_map& rhs) { int copy_data_from(const slim_hash_map& rhs) {
destroy(); destroy();
LOG(INFO) << "start copy data, rhs info, mHashSize: " << rhs.m_nHashSize;
if (rhs.m_nHashSize > 0) { if (rhs.m_nHashSize > 0) {
m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize]; m_hashTable = new (std::nothrow) uint32_t[rhs.m_nHashSize];
if (!m_hashTable) { if (!m_hashTable) {
...@@ -231,7 +231,7 @@ class slim_hash_map { ...@@ -231,7 +231,7 @@ class slim_hash_map {
<< sizeof(hash_node_t) * BLOCK_SIZE; << sizeof(hash_node_t) * BLOCK_SIZE;
return -1; return -1;
} }
LOG(INFO) << "copy data, m_nBlockNum: " << m_nBlockNum << " , copy size:" << sizeof(hash_node_t) * BLOCK_SIZE;
memcpy(m_blockAddr[m_nBlockNum], memcpy(m_blockAddr[m_nBlockNum],
rhs.m_blockAddr[m_nBlockNum], rhs.m_blockAddr[m_nBlockNum],
sizeof(hash_node_t) * BLOCK_SIZE); sizeof(hash_node_t) * BLOCK_SIZE);
...@@ -265,11 +265,13 @@ class slim_hash_map { ...@@ -265,11 +265,13 @@ class slim_hash_map {
} }
size_type index = key % m_nHashSize; size_type index = key % m_nHashSize;
hash_node_t* node = get_node(m_hashTable[index]); hash_node_t* node = get_node(m_hashTable[index]);
int node_cnt = 0;
while (node != NULL && node->data.first != key) { while (node != NULL && node->data.first != key) {
LOG(INFO) << "node link get:" << node->data.first;
node_cnt++;
node = get_node(node->next); node = get_node(node->next);
} }
LOG(INFO) << "key: " << key << " , found count: " << node_cnt;
if (node == NULL) { if (node == NULL) {
return end(); return end();
} }
...@@ -390,7 +392,6 @@ class slim_hash_map { ...@@ -390,7 +392,6 @@ class slim_hash_map {
if (node != NULL) { if (node != NULL) {
return node->data.second; return node->data.second;
} }
return add_node(index, key)->data.second; return add_node(index, key)->data.second;
} }
void clear() { void clear() {
...@@ -399,16 +400,16 @@ class slim_hash_map { ...@@ -399,16 +400,16 @@ class slim_hash_map {
m_nFreeEntries = 0; m_nFreeEntries = 0;
m_nSize = 0; m_nSize = 0;
} }
bool load(const char* file) { bool load(const char* file, uint32_t block_id) {
// clear(); // clear();
// bias = 0 means base mode, bias = K means patch mode, and base dict has size K
int size = sizeof(key_t) + sizeof(value_t); int size = sizeof(key_t) + sizeof(value_t);
FILE* fp = fopen(file, "rb"); FILE* fp = fopen(file, "rb");
char* buf = reinterpret_cast<char*>(malloc(size * 100000)); char* buf = reinterpret_cast<char*>(malloc(size * 100000));
LOG(INFO) << "current block id: " << block_id;
if (fp == NULL || buf == NULL) { if (fp == NULL || buf == NULL) {
return false; return false;
} }
size_t read_count; size_t read_count;
bool err = false; bool err = false;
key_t key; key_t key;
...@@ -423,6 +424,8 @@ class slim_hash_map { ...@@ -423,6 +424,8 @@ class slim_hash_map {
for (int i = 0; i < static_cast<int>(read_count); ++i) { for (int i = 0; i < static_cast<int>(read_count); ++i) {
key = *(reinterpret_cast<key_t*>(buf + i * size)); key = *(reinterpret_cast<key_t*>(buf + i * size));
value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t))); value = *(reinterpret_cast<value_t*>(buf + i * size + sizeof(key_t)));
value = ((uint64_t)block_id << 32) | value;
LOG(INFO) << "slim map key: " << key << " , value: " << value;
(*this)[key] = value; (*this)[key] = value;
} }
} }
...@@ -557,7 +560,6 @@ class slim_hash_map { ...@@ -557,7 +560,6 @@ class slim_hash_map {
} }
hash_node_t* add_node(uint32_t index, const key_type& key) { hash_node_t* add_node(uint32_t index, const key_type& key) {
++m_nSize; ++m_nSize;
if (m_nFreeEntries) { if (m_nFreeEntries) {
uint32_t addr = m_nFreeEntries; uint32_t addr = m_nFreeEntries;
hash_node_t* node = get_node(addr); hash_node_t* node = get_node(addr);
...@@ -569,7 +571,7 @@ class slim_hash_map { ...@@ -569,7 +571,7 @@ class slim_hash_map {
} }
uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23); uint32_t block = ((m_nNextEntry & 0xFF800000) >> 23);
//LOG(INFO) << "key: " << key << " here. index: " << index << " , m_nNextEntry: "<< m_nNextEntry << " , block:" << block<< ", m_nBlockNum:" << m_nBlockNum;
if (block >= m_nBlockNum) { if (block >= m_nBlockNum) {
try { try {
m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE]; m_blockAddr[m_nBlockNum++] = new hash_node_t[BLOCK_SIZE];
...@@ -581,7 +583,6 @@ class slim_hash_map { ...@@ -581,7 +583,6 @@ class slim_hash_map {
return NULL; return NULL;
} }
} }
uint32_t addr = m_nNextEntry; uint32_t addr = m_nNextEntry;
++m_nNextEntry; ++m_nNextEntry;
hash_node_t* node = get_node(addr); hash_node_t* node = get_node(addr);
......
...@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path, ...@@ -51,13 +51,12 @@ int Dict::load(const std::string& dict_path,
bool in_mem, bool in_mem,
const std::string& v_path) { const std::string& v_path) {
TIME_FLAG(load_start); TIME_FLAG(load_start);
int ret = load_index(dict_path, v_path); int ret = load_index(dict_path, v_path);
if (ret != E_OK) { if (ret != E_OK) {
LOG(WARNING) << "load index failed"; LOG(WARNING) << "load index failed";
return ret; return ret;
} }
LOG(INFO) << "load index in mem mode: " << in_mem ;
if (in_mem) { if (in_mem) {
ret = load_data(dict_path, v_path); ret = load_data(dict_path, v_path);
if (ret != E_OK) { if (ret != E_OK) {
...@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -81,8 +80,11 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
std::string index_n_path(dict_path); std::string index_n_path(dict_path);
index_n_path.append(v_path); index_n_path.append(v_path);
index_n_path.append("/index.n"); index_n_path.append("/index.n");
uint32_t cur_block_id = 0;
if (_base_dict) cur_block_id = _base_dict->_block_set.size();
LOG(INFO) << "index file path: " << index_n_path; LOG(INFO) << "index file path: " << index_n_path;
//ERR HERE
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"), std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(index_n_path.c_str(), "rb"),
&fclose); &fclose);
if (pf.get() == NULL) { if (pf.get() == NULL) {
...@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -150,12 +152,16 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
} else { } else {
if (_slim_table.copy_data_from(_base_dict->_slim_table) != 0) {
LOG(ERROR) << "copy data from old index failed in patch mode";
return E_DATA_ERROR;
}
file_idx = 0; file_idx = 0;
LOG(INFO) LOG(INFO)
<< "index check file len failed in patch mode, set file_idx to 0"; << "index check fail, direct copy";
} }
} }
LOG(INFO) << "resize slim table, new count: " << count/2;
_slim_table.resize(count / 2); _slim_table.resize(count / 2);
char file[1024]; char file[1024];
...@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -167,6 +173,7 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
dict_path.c_str(), dict_path.c_str(),
v_path.c_str(), v_path.c_str(),
file_idx); file_idx);
LOG(INFO) << "load file str: " << file;
if (stat(file, &fstat) < 0) { if (stat(file, &fstat) < 0) {
if (errno == ENOENT) { if (errno == ENOENT) {
LOG(WARNING) << "index." << file_idx << " not exist"; LOG(WARNING) << "index." << file_idx << " not exist";
...@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -181,8 +188,8 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
<< (uint64_t)fstat.st_size; << (uint64_t)fstat.st_size;
return E_DATA_ERROR; return E_DATA_ERROR;
} }
LOG(INFO) << "loading from index." << file_idx; LOG(INFO) << "loading from index." << file_idx << " . table size: " << _slim_table.size();
if (!_slim_table.load(file) || _slim_table.size() > count) { if (!_slim_table.load(file, cur_block_id)) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
...@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) { ...@@ -193,8 +200,15 @@ int Dict::load_index(const std::string& dict_path, const std::string& v_path) {
} }
int Dict::load_data(const std::string& dict_path, const std::string& v_path) { int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
if (_base_dict) { if (_base_dict) {
_block_set = _base_dict->_block_set; _block_set = _base_dict->_block_set;
LOG(INFO)<< "load data base dict block set size: " << _block_set[0].size;
for (size_t i = 0; i < _block_set.size(); ++i) {
block_size.push_back(_block_set[i].size);
total_data_size += _block_set[i].size;
}
} }
std::string data_n_path(dict_path); std::string data_n_path(dict_path);
...@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -212,8 +226,6 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
std::vector<uint32_t> block_size;
uint64_t total_data_size = 0;
for (uint32_t i = 0; i < count; ++i) { for (uint32_t i = 0; i < count; ++i) {
uint32_t size = 0; uint32_t size = 0;
if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) { if (fread(reinterpret_cast<void*>(&size), sizeof(uint32_t), 1, pf) != 1) {
...@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -222,6 +234,7 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
return E_DATA_ERROR; return E_DATA_ERROR;
} }
block_size.push_back(size); block_size.push_back(size);
LOG(INFO) << "new block size: " << size;
total_data_size += size; total_data_size += size;
} }
g_data_size << (total_data_size / 1024 / 1024); g_data_size << (total_data_size / 1024 / 1024);
...@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -229,36 +242,35 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
pf = NULL; pf = NULL;
uint32_t old_size = _block_set.size(); uint32_t old_size = _block_set.size();
LOG(INFO) << "load data old size: " << old_size;
for (size_t i = 0; i < old_size; ++i) { for (size_t i = 0; i < old_size; ++i) {
if (_block_set[i].size != block_size[i]) { if (_block_set[i].size != block_size[i]) {
old_size = 0; old_size = 0;
break; break;
} }
} }
_block_set.resize(count); LOG(INFO) << "load data block set count: " << count << " , old size: " << old_size;
_block_set.resize(count + old_size);
for (size_t i = old_size; i < _block_set.size(); ++i) { for (size_t i = old_size; i < _block_set.size(); ++i) {
char data_path[1024]; char data_path[1024];
LOG(INFO) << "load from data." << i; LOG(INFO) << "load from data." << i;
snprintf( //snprintf(
data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i); // data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i);
snprintf(data_path, 1024, "%s%s/data.%lu", dict_path.c_str(), v_path.c_str(), i - old_size);
FILE* data_file = fopen(data_path, "rb"); FILE* data_file = fopen(data_path, "rb");
if (data_file == NULL) { if (data_file == NULL) {
LOG(WARNING) << "open data file [" << data_path << " failed"; LOG(WARNING) << "open data file [" << data_path << " ]failed";
_block_set[i].s_data.reset(); _block_set[i].s_data.reset();
_block_set[i].size = 0; _block_set[i].size = 0;
continue; continue;
} }
_block_set[i].s_data.reset(reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
_block_set[i].s_data.reset(
reinterpret_cast<char*>(malloc(block_size[i] * sizeof(char))));
if (_block_set[i].s_data.get() == NULL) { if (_block_set[i].s_data.get() == NULL) {
LOG(ERROR) << "malloc data failed"; LOG(ERROR) << "malloc data failed";
fclose(data_file); fclose(data_file);
return E_OOM; return E_OOM;
} }
_block_set[i].size = block_size[i]; _block_set[i].size = block_size[i];
if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()), if (fread(reinterpret_cast<void*>(_block_set[i].s_data.get()),
sizeof(char), sizeof(char),
_block_set[i].size, _block_set[i].size,
...@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) { ...@@ -267,7 +279,10 @@ int Dict::load_data(const std::string& dict_path, const std::string& v_path) {
fclose(data_file); fclose(data_file);
return E_DATA_ERROR; return E_DATA_ERROR;
} }
LOG(INFO) << "load new data to BlockSet succ";
for (size_t ii = 0; ii < 20; ++ii) {
LOG(INFO) << "data ptr: " << (int)(_block_set[i].s_data.get()[ii]);
}
fclose(data_file); fclose(data_file);
} }
...@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { ...@@ -386,12 +401,11 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
uint64_t flag = it->second; uint64_t flag = it->second;
uint32_t id = (uint32_t)(flag >> 32); uint32_t id = (uint32_t)(flag >> 32);
uint64_t addr = (uint32_t)(flag); uint64_t addr = (uint32_t)(flag);
LOG(INFO) << "search key: " << id << " , addr: " << addr;
if (_block_set.size() > id) { if (_block_set.size() > id) {
uint32_t block_size = _block_set[id].size; uint32_t block_size = _block_set[id].size;
char* block_data = NULL; char* block_data = NULL;
block_data = _block_set[id].s_data.get(); block_data = _block_set[id].s_data.get();
if (block_data && addr + sizeof(uint32_t) <= block_size) { if (block_data && addr + sizeof(uint32_t) <= block_size) {
uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr)); uint32_t len = *(reinterpret_cast<uint32_t*>(block_data + addr));
if (addr + len <= block_size && len >= sizeof(uint32_t)) { if (addr + len <= block_size && len >= sizeof(uint32_t)) {
...@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) { ...@@ -405,6 +419,7 @@ bool Dict::seek(uint64_t key, char* buff, uint64_t* buff_size) {
<< default_buffer_size; << default_buffer_size;
return false; return false;
} }
LOG(INFO) << "seek key: " << key << " , addr: " << addr;
memcpy(buff, memcpy(buff,
(block_data + addr + sizeof(uint32_t)), (block_data + addr + sizeof(uint32_t)),
len - sizeof(uint32_t)); len - sizeof(uint32_t));
......
## 如果获得稀疏参数索引Cube所需的模型输入 ## 如果获得稀疏参数索引Cube所需的模型输入
#### 背景知识 ### 背景知识
推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。 推荐系统需要大规模稀疏参数索引来帮助分布式部署,可在`python/example/criteo_ctr_with_cube`或是[PaddleRec](https://github.com/paddlepaddle/paddlerec)了解推荐模型。
...@@ -11,11 +11,11 @@ ...@@ -11,11 +11,11 @@
用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。 用户在调试Cube服务功能时,可以自定义KV对生成SequenceFile格式文件来进行调试。
用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。 用户在验证Cube的配送正确性时,可以转换SequenceFile格式文件至可读文字来进行比对验证。
#### 预备知识 ### 预备知识
- 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md) - 需要会编译Paddle Serving,参见[编译文档](./COMPILE.md)
#### 用法 ### 用法
在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。 在编译结束后的安装文件,可以得到 seq_reader 和 kv_to_seqfile.py。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册