diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp index 2794d93b4515fb98894d79f75829deeb4b77095a..870f045d43ccf38a73d53e048e1ee435950f8c36 100644 --- a/core/general-server/op/general_dist_kv_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -130,14 +130,19 @@ int GeneralDistKVInferOp::inference() { // fitler cache keys size_t hit_counts = 0; + int64_t seek_cache_start = timeline.TimeStampUS(); CubeCache *p_cube_cache = InferManager::instance().get_cube_cache(engine_name().c_str()); if (p_cube_cache != nullptr) { for (size_t i = 0; i < unique_keys_count; ++i) { rec::mcube::CubeValue *hit_val = p_cube_cache->get_data(unique_keys[i]); if (hit_val) { - LOG(WARNING) << "Hit one cache. key:" << unique_keys[i]; + // LOG(WARNING) << "Hit one cache. key:" << unique_keys[i]; key_map[unique_keys[i]] = hit_val; + if (hit_counts % 100 == 0) { + LOG(WARNING) << "hit cache! key:" << unique_keys[i] + << " value:" << hit_val->buff; + } unique_keys[i] = 0; ++hit_counts; } @@ -156,8 +161,10 @@ int GeneralDistKVInferOp::inference() { } } } - LOG(WARNING) << "Hit " << hit_counts - << " keys in cube cache, unique_keys:" << unique_keys.size(); + int64_t seek_cache_end = timeline.TimeStampUS(); + VLOG(2) << "cache hit " << hit_counts + << " keys in cube cache, last unique_keys:" << unique_keys.size() + << " , seek_time:" << seek_cache_end - seek_cache_start; // seek sparse params rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); @@ -170,7 +177,8 @@ int GeneralDistKVInferOp::inference() { int ret = cube->seek(table_names[0], unique_keys, &values); int64_t seek_end = timeline.TimeStampUS(); VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret - << " seek_time: " << seek_end - seek_start; + << " , unique_key: " << unique_keys.size() + << " , seek_time: " << seek_end - seek_start; for (size_t i = 0; i < unique_keys.size(); ++i) { key_map[unique_keys[i]] = &values[i]; @@ -179,7 +187,8 @@ int GeneralDistKVInferOp::inference() { LOG(ERROR) << "cube value return null"; } // size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); - size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); + // size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); + size_t EMBEDDING_SIZE = 9; TensorVector sparse_out; sparse_out.resize(sparse_count); TensorVector dense_out; @@ -194,18 +203,12 @@ int GeneralDistKVInferOp::inference() { resource.get_general_model_config().front(); int cube_key_found = 0; int cube_key_miss = 0; - for (size_t i = 0; i < in->size(); ++i) { - VLOG(2) << "i: " << i << ", sparse_idx: " << sparse_idx - << ", dense_idx: " << dense_idx; - VLOG(2) << "i: " << i << ", dtype: " << in->at(i).dtype; if (in->at(i).dtype != paddle::PaddleDType::INT64) { dense_out[dense_idx] = in->at(i); ++dense_idx; continue; } - VLOG(2) << "in->size: " << in->size() << ", "; - VLOG(2) << "lod.size: " << in->at(i).lod.size(); sparse_out[sparse_idx].lod.resize(in->at(i).lod.size()); for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) { sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size()); @@ -228,21 +231,17 @@ int GeneralDistKVInferOp::inference() { for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; uint64_t cur_key = keys[cube_val_idx]; - VLOG(2) << "(logid=" << log_id << ") x: " << x - << ", sparse_idx: " << sparse_idx << " cur_key: " << cur_key - << ", cube_val_idx:" << cube_val_idx; rec::mcube::CubeValue *cur_val = key_map[cur_key]; if (cur_val->buff.size() == 0) { memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE); - VLOG(3) << "(logid=" << log_id - << ") cube key not found: " << keys[cube_val_idx]; ++cube_key_miss; ++cube_val_idx; continue; } - VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] - << " , cube value len:" << cur_val->buff.size(); - memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size()); + + // The data generated by pslib has 10 bytes of information to be filtered + // out + memcpy(data_ptr, cur_val->buff.data() + 10, cur_val->buff.size() - 10); // VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << // data_ptr[1] << ", " < +#include #include #include -#include #include -#include "core/predictor/framework/cache.h" #include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h" namespace baidu { @@ -30,8 +30,9 @@ int CubeCache::clear() { delete (it->second); it->second = nullptr; } - _map_cache.clear(); } + _map_cache.clear(); + return 0; } @@ -44,6 +45,7 @@ rec::mcube::CubeValue* CubeCache::get_data(uint64_t key) { } int CubeCache::reload_data(const std::string& cache_path) { + LOG(INFO) << "cube cache is loading data, path: " << cache_path; DIR* dp = nullptr; struct dirent* dirp = nullptr; struct stat st; @@ -71,7 +73,7 @@ int CubeCache::reload_data(const std::string& cache_path) { } // Match the file whose name prefix is ​​'part-' if (std::string(dirp->d_name).find("part-") != std::string::npos) { - SequenceFileRecordReader reader(dirp->d_name); + SequenceFileRecordReader reader(cache_path + "/" + dirp->d_name); if (reader.open() != 0) { LOG(ERROR) << "open file failed! " << dirp->d_name; @@ -97,7 +99,7 @@ int CubeCache::reload_data(const std::string& cache_path) { } rec::mcube::CubeValue* new_value = new rec::mcube::CubeValue(); new_value->error = 0; - new_value->buff = record.value; + new_value->buff.swap(record.value); _map_cache.insert(std::make_pair(key, new_value)); } diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 411e6d94566e49ec21ab9e9abc60f3e30e0f29ad..45014d28d0034ec402bbd9b21eac3e832da7c1f9 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -15,6 +15,7 @@ #pragma once #include #include +#include #include #include #include @@ -169,8 +170,10 @@ class ReloadableInferEngine : public InferEngine { uint32_t _infer_batch_size; // Need to align batch_size in inferring - bool _infer_batch_align; + bool _infer_overrun; + // allow to split request in inferring + bool _allow_split_request; // model version uint64_t _version; }; @@ -181,19 +184,23 @@ struct ModelData { ModelData() : current_idx(1) { cores[0] = nullptr; cores[1] = nullptr; + caches[0] = nullptr; + caches[1] = nullptr; } ~ModelData() { delete cores[0]; delete cores[1]; + delete caches[0]; + delete caches[1]; } void* get_core() { return cores[current_idx]->get(); } - CubeCache* get_cache() { return &caches[current_idx]; } + CubeCache* get_cache() { return caches[current_idx]; } EngineCore* cores[2]; - CubeCache caches[2]; + CubeCache* caches[2]; uint32_t current_idx; }; @@ -239,31 +246,46 @@ class DBReloadableInferEngine : public ReloadableInferEngine { delete md->cores[next_idx]; } md->cores[next_idx] = new (std::nothrow) EngineCore; + if (nullptr == md->cores[next_idx]) { + LOG(ERROR) << "Allocating memory failed. "; + return -1; + } size_t gpu_ids_num = conf.gpu_ids_size(); im::bsf::AutoMutex lock(_mutex); int gpu_id = -1; if (gpu_ids_num > 0) { gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num); } + LOG(WARNING) << "Loading EngineCore[" << next_idx << "] ..."; if (!md->cores[next_idx] || md->cores[next_idx]->create(conf, gpu_id) != 0) { LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } _gpu_index++; - LOG(WARNING) << "Reload EngineCore[" << next_idx << "] finish."; + LOG(WARNING) << "Loading EngineCore[" << next_idx << "] done."; // reload cube cache + if (nullptr == md->caches[next_idx]) { + md->caches[next_idx] = new (std::nothrow) CubeCache; + } + + if (nullptr == md->caches[next_idx]) { + LOG(ERROR) << "Allocating memory failed."; + return -1; + } + LOG(WARNING) << "Loading cube cache[" << next_idx << "] ..."; std::string model_path = conf.model_dir(); if (access(model_path.c_str(), F_OK) == 0) { std::string cube_cache_path = model_path + "cube_cache"; - int reload_cache_ret = md->caches[next_idx].reload_data(cube_cache_path); - LOG(WARNING) << "Reload cube cache[" << next_idx << "] finish."; + int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path); + LOG(WARNING) << "Loading cube cache[" << next_idx << "] done."; } else { LOG(ERROR) << "model_path " << model_path << " is not exits. Ignore cube cache!"; } + // switch current_idx md->current_idx = next_idx; LOG(WARNING) << "Reload model and cube cache done. switching to current_idx[" @@ -369,12 +391,20 @@ class CloneDBReloadableInferEngine virtual int load_data(ModelData* md, const configure::EngineDesc& conf) { + int tid = syscall(SYS_gettid); uint32_t next_idx = (md->current_idx + 1) % 2; if (md->cores[next_idx]) { delete md->cores[next_idx]; } md->cores[next_idx] = new (std::nothrow) EngineCore; + if (nullptr == md->caches[next_idx]) { + md->caches[next_idx] = new (std::nothrow) CubeCache; + } + if (nullptr == md->cores[next_idx] || nullptr == md->caches[next_idx]) { + LOG(ERROR) << "Allocating memory fail."; + return -1; + } // params.dump(); // gpu_ids_num > 0 is always true. // if use CPU, gpu_ids = [-1]. @@ -390,20 +420,34 @@ class CloneDBReloadableInferEngine } else { gpu_ids_num = 1; } + // _gpu_index will be set to be 0, when load() or proc_initial() is called. // _gpu_index < gpu_ids_num, means there are predictors still not create // on some GPU card. // so we need to create the predictor. // _gpu_index >= gpu_ids_num, means each GPU card has already create one. // so we need to clone the predictor. + LOG(WARNING) << "tid:" << tid << " Loading clone model ..."; if (DBReloadableInferEngine::_gpu_index < gpu_ids_num) { - if (!md->cores[next_idx] || - md->cores[next_idx]->create(conf, gpu_id) != 0) { + // create cores + if (md->cores[next_idx]->create(conf, gpu_id) != 0) { LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } + // create caches + std::string model_path = conf.model_dir(); + if (access(model_path.c_str(), F_OK) == 0) { + std::string cube_cache_path = model_path + "cube_cache"; + int reload_cache_ret = + md->caches[next_idx]->reload_data(cube_cache_path); + LOG(WARNING) << "create cube cache[" << next_idx << "] done."; + } else { + LOG(WARNING) << "model_path " << model_path + << " is not exits. Ignore cube cache!"; + } + DBReloadableInferEngine::_gpu_index++; - md->current_idx = next_idx; + // md->current_idx = next_idx; if (_cloneTemplate.size() < DBReloadableInferEngine::_gpu_index) { _cloneTemplate.push_back(md); @@ -414,18 +458,27 @@ class CloneDBReloadableInferEngine } else { int template_index = DBReloadableInferEngine::_gpu_index % _cloneTemplate.size(); - if (!md->cores[next_idx] || - md->cores[next_idx]->clone( + + // clone cores + if (md->cores[next_idx]->clone( _cloneTemplate[template_index]->get_core()) != 0) { LOG(ERROR) << "Failed clone model from core"; return -1; } + // clone caches + md->caches[next_idx] = _cloneTemplate[template_index]->get_cache(); + LOG(WARNING) << "tid:" << tid << " clone caches done"; + DBReloadableInferEngine::_gpu_index++; - md->current_idx = next_idx; - LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx - << "]."; } + // switch current_idx + md->current_idx = next_idx; + LOG(WARNING) + << "[" << tid + << "] Reload clone model and cube cache done. switching to current_idx[" + << next_idx << "]"; + return 0; } @@ -645,6 +698,8 @@ class InferManager { const char* file, std::shared_ptr engine_index_ptr); + int set_taskexecutor_num(size_t total_engine_num); + int thrd_initialize(); int thrd_clear();