提交 a5cdb95f 编写于 作者: T TeslaZhao

add cube cache

上级 0e58dbfb
......@@ -130,14 +130,19 @@ int GeneralDistKVInferOp::inference() {
// fitler cache keys
size_t hit_counts = 0;
int64_t seek_cache_start = timeline.TimeStampUS();
CubeCache *p_cube_cache =
InferManager::instance().get_cube_cache(engine_name().c_str());
if (p_cube_cache != nullptr) {
for (size_t i = 0; i < unique_keys_count; ++i) {
rec::mcube::CubeValue *hit_val = p_cube_cache->get_data(unique_keys[i]);
if (hit_val) {
LOG(WARNING) << "Hit one cache. key:" << unique_keys[i];
// LOG(WARNING) << "Hit one cache. key:" << unique_keys[i];
key_map[unique_keys[i]] = hit_val;
if (hit_counts % 100 == 0) {
LOG(WARNING) << "hit cache! key:" << unique_keys[i]
<< " value:" << hit_val->buff;
}
unique_keys[i] = 0;
++hit_counts;
}
......@@ -156,8 +161,10 @@ int GeneralDistKVInferOp::inference() {
}
}
}
LOG(WARNING) << "Hit " << hit_counts
<< " keys in cube cache, unique_keys:" << unique_keys.size();
int64_t seek_cache_end = timeline.TimeStampUS();
VLOG(2) << "cache hit " << hit_counts
<< " keys in cube cache, last unique_keys:" << unique_keys.size()
<< " , seek_time:" << seek_cache_end - seek_cache_start;
// seek sparse params
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
......@@ -170,7 +177,8 @@ int GeneralDistKVInferOp::inference() {
int ret = cube->seek(table_names[0], unique_keys, &values);
int64_t seek_end = timeline.TimeStampUS();
VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret
<< " seek_time: " << seek_end - seek_start;
<< " , unique_key: " << unique_keys.size()
<< " , seek_time: " << seek_end - seek_start;
for (size_t i = 0; i < unique_keys.size(); ++i) {
key_map[unique_keys[i]] = &values[i];
......@@ -179,7 +187,8 @@ int GeneralDistKVInferOp::inference() {
LOG(ERROR) << "cube value return null";
}
// size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
// size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
size_t EMBEDDING_SIZE = 9;
TensorVector sparse_out;
sparse_out.resize(sparse_count);
TensorVector dense_out;
......@@ -194,18 +203,12 @@ int GeneralDistKVInferOp::inference() {
resource.get_general_model_config().front();
int cube_key_found = 0;
int cube_key_miss = 0;
for (size_t i = 0; i < in->size(); ++i) {
VLOG(2) << "i: " << i << ", sparse_idx: " << sparse_idx
<< ", dense_idx: " << dense_idx;
VLOG(2) << "i: " << i << ", dtype: " << in->at(i).dtype;
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
dense_out[dense_idx] = in->at(i);
++dense_idx;
continue;
}
VLOG(2) << "in->size: " << in->size() << ", ";
VLOG(2) << "lod.size: " << in->at(i).lod.size();
sparse_out[sparse_idx].lod.resize(in->at(i).lod.size());
for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) {
sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size());
......@@ -228,21 +231,17 @@ int GeneralDistKVInferOp::inference() {
for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) {
float *data_ptr = dst_ptr + x * EMBEDDING_SIZE;
uint64_t cur_key = keys[cube_val_idx];
VLOG(2) << "(logid=" << log_id << ") x: " << x
<< ", sparse_idx: " << sparse_idx << " cur_key: " << cur_key
<< ", cube_val_idx:" << cube_val_idx;
rec::mcube::CubeValue *cur_val = key_map[cur_key];
if (cur_val->buff.size() == 0) {
memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE);
VLOG(3) << "(logid=" << log_id
<< ") cube key not found: " << keys[cube_val_idx];
++cube_key_miss;
++cube_val_idx;
continue;
}
VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx]
<< " , cube value len:" << cur_val->buff.size();
memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size());
// The data generated by pslib has 10 bytes of information to be filtered
// out
memcpy(data_ptr, cur_val->buff.data() + 10, cur_val->buff.size() - 10);
// VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " <<
// data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", "
// <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", "
......@@ -255,8 +254,6 @@ int GeneralDistKVInferOp::inference() {
bool cube_fail = (cube_key_found == 0);
if (cube_fail) {
LOG(WARNING) << "(logid=" << log_id << ") cube seek fail";
// CopyBlobInfo(input_blob, output_blob);
// return 0;
}
VLOG(2) << "(logid=" << log_id << ") cube key found: " << cube_key_found
<< " , cube key miss: " << cube_key_miss;
......
......@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License
#include "core/predictor/framework/cache.h"
#include <dirent.h>
#include <sys/stat.h>
#include <fstream>
#include <string>
#include <sys/stat.h>
#include <utility>
#include "core/predictor/framework/cache.h"
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
namespace baidu {
......@@ -30,8 +30,9 @@ int CubeCache::clear() {
delete (it->second);
it->second = nullptr;
}
_map_cache.clear();
}
_map_cache.clear();
return 0;
}
......@@ -44,6 +45,7 @@ rec::mcube::CubeValue* CubeCache::get_data(uint64_t key) {
}
int CubeCache::reload_data(const std::string& cache_path) {
LOG(INFO) << "cube cache is loading data, path: " << cache_path;
DIR* dp = nullptr;
struct dirent* dirp = nullptr;
struct stat st;
......@@ -71,7 +73,7 @@ int CubeCache::reload_data(const std::string& cache_path) {
}
// Match the file whose name prefix is ​​'part-'
if (std::string(dirp->d_name).find("part-") != std::string::npos) {
SequenceFileRecordReader reader(dirp->d_name);
SequenceFileRecordReader reader(cache_path + "/" + dirp->d_name);
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << dirp->d_name;
......@@ -97,7 +99,7 @@ int CubeCache::reload_data(const std::string& cache_path) {
}
rec::mcube::CubeValue* new_value = new rec::mcube::CubeValue();
new_value->error = 0;
new_value->buff = record.value;
new_value->buff.swap(record.value);
_map_cache.insert(std::make_pair(key, new_value));
}
......
......@@ -15,6 +15,7 @@
#pragma once
#include <pthread.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include <functional>
......@@ -169,8 +170,10 @@ class ReloadableInferEngine : public InferEngine {
uint32_t _infer_batch_size;
// Need to align batch_size in inferring
bool _infer_batch_align;
bool _infer_overrun;
// allow to split request in inferring
bool _allow_split_request;
// model version
uint64_t _version;
};
......@@ -181,19 +184,23 @@ struct ModelData {
ModelData() : current_idx(1) {
cores[0] = nullptr;
cores[1] = nullptr;
caches[0] = nullptr;
caches[1] = nullptr;
}
~ModelData() {
delete cores[0];
delete cores[1];
delete caches[0];
delete caches[1];
}
void* get_core() { return cores[current_idx]->get(); }
CubeCache* get_cache() { return &caches[current_idx]; }
CubeCache* get_cache() { return caches[current_idx]; }
EngineCore* cores[2];
CubeCache caches[2];
CubeCache* caches[2];
uint32_t current_idx;
};
......@@ -239,31 +246,46 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
if (nullptr == md->cores[next_idx]) {
LOG(ERROR) << "Allocating memory failed. ";
return -1;
}
size_t gpu_ids_num = conf.gpu_ids_size();
im::bsf::AutoMutex lock(_mutex);
int gpu_id = -1;
if (gpu_ids_num > 0) {
gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num);
}
LOG(WARNING) << "Loading EngineCore[" << next_idx << "] ...";
if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1;
}
_gpu_index++;
LOG(WARNING) << "Reload EngineCore[" << next_idx << "] finish.";
LOG(WARNING) << "Loading EngineCore[" << next_idx << "] done.";
// reload cube cache
if (nullptr == md->caches[next_idx]) {
md->caches[next_idx] = new (std::nothrow) CubeCache;
}
if (nullptr == md->caches[next_idx]) {
LOG(ERROR) << "Allocating memory failed.";
return -1;
}
LOG(WARNING) << "Loading cube cache[" << next_idx << "] ...";
std::string model_path = conf.model_dir();
if (access(model_path.c_str(), F_OK) == 0) {
std::string cube_cache_path = model_path + "cube_cache";
int reload_cache_ret = md->caches[next_idx].reload_data(cube_cache_path);
LOG(WARNING) << "Reload cube cache[" << next_idx << "] finish.";
int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path);
LOG(WARNING) << "Loading cube cache[" << next_idx << "] done.";
} else {
LOG(ERROR) << "model_path " << model_path
<< " is not exits. Ignore cube cache!";
}
// switch current_idx
md->current_idx = next_idx;
LOG(WARNING)
<< "Reload model and cube cache done. switching to current_idx["
......@@ -369,12 +391,20 @@ class CloneDBReloadableInferEngine
virtual int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) {
int tid = syscall(SYS_gettid);
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
if (nullptr == md->caches[next_idx]) {
md->caches[next_idx] = new (std::nothrow) CubeCache;
}
if (nullptr == md->cores[next_idx] || nullptr == md->caches[next_idx]) {
LOG(ERROR) << "Allocating memory fail.";
return -1;
}
// params.dump();
// gpu_ids_num > 0 is always true.
// if use CPU, gpu_ids = [-1].
......@@ -390,20 +420,34 @@ class CloneDBReloadableInferEngine
} else {
gpu_ids_num = 1;
}
// _gpu_index will be set to be 0, when load() or proc_initial() is called.
// _gpu_index < gpu_ids_num, means there are predictors still not create
// on some GPU card.
// so we need to create the predictor.
// _gpu_index >= gpu_ids_num, means each GPU card has already create one.
// so we need to clone the predictor.
LOG(WARNING) << "tid:" << tid << " Loading clone model ...";
if (DBReloadableInferEngine<EngineCore>::_gpu_index < gpu_ids_num) {
if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) {
// create cores
if (md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1;
}
// create caches
std::string model_path = conf.model_dir();
if (access(model_path.c_str(), F_OK) == 0) {
std::string cube_cache_path = model_path + "cube_cache";
int reload_cache_ret =
md->caches[next_idx]->reload_data(cube_cache_path);
LOG(WARNING) << "create cube cache[" << next_idx << "] done.";
} else {
LOG(WARNING) << "model_path " << model_path
<< " is not exits. Ignore cube cache!";
}
DBReloadableInferEngine<EngineCore>::_gpu_index++;
md->current_idx = next_idx;
// md->current_idx = next_idx;
if (_cloneTemplate.size() <
DBReloadableInferEngine<EngineCore>::_gpu_index) {
_cloneTemplate.push_back(md);
......@@ -414,18 +458,27 @@ class CloneDBReloadableInferEngine
} else {
int template_index = DBReloadableInferEngine<EngineCore>::_gpu_index %
_cloneTemplate.size();
if (!md->cores[next_idx] ||
md->cores[next_idx]->clone(
// clone cores
if (md->cores[next_idx]->clone(
_cloneTemplate[template_index]->get_core()) != 0) {
LOG(ERROR) << "Failed clone model from core";
return -1;
}
// clone caches
md->caches[next_idx] = _cloneTemplate[template_index]->get_cache();
LOG(WARNING) << "tid:" << tid << " clone caches done";
DBReloadableInferEngine<EngineCore>::_gpu_index++;
md->current_idx = next_idx;
LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx
<< "].";
}
// switch current_idx
md->current_idx = next_idx;
LOG(WARNING)
<< "[" << tid
<< "] Reload clone model and cube cache done. switching to current_idx["
<< next_idx << "]";
return 0;
}
......@@ -645,6 +698,8 @@ class InferManager {
const char* file,
std::shared_ptr<int> engine_index_ptr);
int set_taskexecutor_num(size_t total_engine_num);
int thrd_initialize();
int thrd_clear();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册