提交 b9c6fd2f 编写于 作者: J jinhai

Merge branch 'branch-0.3.1' into 'branch-0.3.1'

MS-256 - Add more cache config

See merge request megasearch/milvus!251

Former-commit-id: 8f66e64370706c3ca0d9169f17467695377df851
......@@ -33,6 +33,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-242 - Clean up cmake and change MAKE_BUILD_ARGS to be user defined variable
- MS-245 - Improve search result transfer performance
- MS-248 - Support AddVector/SearchVector profiling
- MS-256 - Add more cache config
## New Feature
- MS-180 - Add new mem manager
......
......@@ -34,6 +34,8 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # how much memory should be free when cache is full, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
engine_config:
nprobe: 10
......
......@@ -13,9 +13,12 @@ namespace zilliz {
namespace milvus {
namespace cache {
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85;
Cache::Cache(int64_t capacity, uint64_t cache_max_count)
: usage_(0),
capacity_(capacity),
freemem_percent_(DEFAULT_THRESHHOLD_PERCENT),
lru_(cache_max_count) {
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
......@@ -64,15 +67,13 @@ void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
usage_ += data_ptr->size();
}
// AGENT_LOG_DEBUG << "Insert into LRU(" << (capacity_ > 0 ? std::to_string(usage_ * 100 / capacity_) : "Nan")
// << "%, +" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "):"
// << " " << key;
SERVER_LOG_DEBUG << "Insert " << key << " size:" << data_ptr->size() << " into cache, usage: " << usage_;
}
if (usage_ > capacity_) {
// AGENT_LOG_TRACE << "Current usage " << usage_
// << " exceeds cache capacity " << capacity_
// << ", start free memory";
SERVER_LOG_DEBUG << "Current usage " << usage_
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
free_memory();
}
}
......@@ -86,12 +87,9 @@ void Cache::erase(const std::string& key) {
const CacheObjPtr& obj_ptr = lru_.get(key);
const DataObjPtr& data_ptr = obj_ptr->data_;
usage_ -= data_ptr->size();
// AGENT_LOG_DEBUG << "Erase from LRU(" << (capacity_ > 0 ? std::to_string(usage_*100/capacity_) : "Nan")
// << "%, -" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "): "
// << (data_ptr->flags().get_flag(DataObjAttr::kPinned) ? "Pinned " : "")
// << (data_ptr->flags().get_flag(DataObjAttr::kValid) ? "Valid " : "")
// << "(ref:" << obj_ptr->ref_ << ") "
// << key;
SERVER_LOG_DEBUG << "Erase " << key << " from cache";
lru_.erase(key);
}
......@@ -99,7 +97,7 @@ void Cache::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
// AGENT_LOG_DEBUG << "Clear LRU !";
SERVER_LOG_DEBUG << "Clear cache !";
}
#if 0 /* caiyd 20190221, need more testing before enable */
......@@ -162,7 +160,7 @@ void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr
void Cache::free_memory() {
if (usage_ <= capacity_) return;
int64_t threshhold = capacity_ * THRESHHOLD_PERCENT;
int64_t threshhold = capacity_ * freemem_percent_;
int64_t delta_size = usage_ - threshhold;
std::set<std::string> key_array;
......@@ -183,7 +181,7 @@ void Cache::free_memory() {
}
}
// AGENT_LOG_DEBUG << "to be released memory size: " << released_size;
SERVER_LOG_DEBUG << "to be released memory size: " << released_size;
for (auto& key : key_array) {
erase(key);
......@@ -193,28 +191,15 @@ void Cache::free_memory() {
}
void Cache::print() {
int64_t still_pinned_count = 0;
int64_t total_pinned_size = 0;
int64_t total_valid_empty_size = 0;
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
if (data_ptr != nullptr) {
total_pinned_size += data_ptr->size();
++still_pinned_count;
} else {
total_valid_empty_size += data_ptr->size();
}
}
cache_count = lru_.size();
}
SERVER_LOG_DEBUG << "[Still Pinned count]: " << still_pinned_count;
SERVER_LOG_DEBUG << "[Pinned Memory total size(byte)]: " << total_pinned_size;
SERVER_LOG_DEBUG << "[valid_empty total size(byte)]: " << total_valid_empty_size;
SERVER_LOG_DEBUG << "[free memory size(byte)]: " << capacity_ - total_pinned_size - total_valid_empty_size;
SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count;
SERVER_LOG_DEBUG << "[Cache usage(byte)]: " << usage_;
SERVER_LOG_DEBUG << "[Cache capacity(byte)]: " << capacity_;
}
} // cache
......
......@@ -18,7 +18,6 @@ namespace milvus {
namespace cache {
const std::string SWAP_DIR = ".CACHE";
const float THRESHHOLD_PERCENT = 0.75;
class Cache {
private:
......@@ -45,6 +44,9 @@ public:
int64_t capacity() const { return capacity_; } //unit: BYTE
void set_capacity(int64_t capacity); //unit: BYTE
double freemem_percent() const { return freemem_percent_; };
void set_freemem_percent(double percent) { freemem_percent_ = percent; }
size_t size() const;
bool exists(const std::string& key);
DataObjPtr get(const std::string& key);
......@@ -57,6 +59,7 @@ public:
private:
int64_t usage_;
int64_t capacity_;
double freemem_percent_;
LRU<std::string, CacheObjPtr> lru_;
mutable std::mutex mutex_;
......
......@@ -6,6 +6,7 @@
#include "CpuCacheMgr.h"
#include "server/ServerConfig.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
......@@ -16,6 +17,14 @@ CpuCacheMgr::CpuCacheMgr() {
int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16);
cap *= 1024*1024*1024;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85);
if(free_percent > 0.0 && free_percent <= 1.0) {
cache_->set_freemem_percent(free_percent);
} else {
SERVER_LOG_ERROR << "Invalid cache_free_percent: " << free_percent <<
", defaultly set to " << cache_->freemem_percent();
}
}
}
......
......@@ -92,6 +92,8 @@ DBImpl::DBImpl(const Options& options)
ENGINE_LOG_INFO << "StartTimerTasks";
StartTimerTasks();
}
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -359,8 +361,9 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
//current disable this line to avoid memory
//index->Cache();
if(options_.insert_cache_immediately_) {
index->Cache();
}
return status;
}
......@@ -455,7 +458,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
try {
//step 1: load index
to_index->Load();
to_index->Load(options_.insert_cache_immediately_);
//step 2: create table file
meta::TableFileSchema table_file;
......@@ -499,8 +502,9 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id_;
//current disable this line to avoid memory
//index->Cache();
if(options_.insert_cache_immediately_) {
index->Cache();
}
} catch (std::exception& ex) {
std::string msg = "Build index encounter exception" + std::string(ex.what());
......
......@@ -39,7 +39,7 @@ public:
virtual Status Serialize() = 0;
virtual Status Load() = 0;
virtual Status Load(bool to_cache = true) = 0;
virtual Status Merge(const std::string& location) = 0;
......
......@@ -79,18 +79,17 @@ Status FaissExecutionEngine::Serialize() {
return Status::OK();
}
Status FaissExecutionEngine::Load() {
Status FaissExecutionEngine::Load(bool to_cache) {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
bool already_in_cache = (index != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index) {
index = read_index(location_);
to_cache = true;
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
}
pIndex_ = index->data();
if (to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
......@@ -98,7 +97,6 @@ Status FaissExecutionEngine::Load() {
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4;
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size);
// server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(total_size/double(total_time));
......
......@@ -44,7 +44,7 @@ public:
Status Serialize() override;
Status Load() override;
Status Load(bool to_cache) override;
Status Merge(const std::string& location) override;
......
......@@ -87,7 +87,9 @@ Status MemVectors::Serialize(std::string &table_id) {
<< " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M
<< " M";
active_engine_->Cache();
if(options_.insert_cache_immediately_) {
active_engine_->Cache();
}
return status;
}
......
......@@ -98,7 +98,9 @@ Status MemTableFile::Serialize() {
LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << (double) size / (double) M << " M";
execution_engine_->Cache();
if(options_.insert_cache_immediately_) {
execution_engine_->Cache();
}
return status;
}
......
......@@ -63,7 +63,9 @@ struct Options {
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
size_t insert_buffer_size = 4 * ONE_GB;
bool insert_cache_immediately_ = false;
}; // Options
......
......@@ -32,7 +32,7 @@ public:
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
if(index_files.find(loader->file_->id_) != index_files.end()){
ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext";
index_files.erase(loader->file_->id_);
loader->search_contexts_.push_back(context);
}
......@@ -40,7 +40,7 @@ public:
//index_files still contains some index files, create new loader
for(auto& pair : index_files) {
ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_;
IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
new_loader->search_contexts_.push_back(context);
new_loader->file_ = pair.second;
......
......@@ -31,7 +31,7 @@ SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) {
return false;
}
SERVER_LOG_INFO << "SearchContext " << identity_ << " add index file: " << index_file->id_;
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " add index file: " << index_file->id_;
map_index_files_[index_file->id_] = index_file;
return true;
......@@ -42,7 +42,7 @@ SearchContext::IndexSearchDone(size_t index_id) {
std::unique_lock <std::mutex> lock(mtx_);
map_index_files_.erase(index_id);
done_cond_.notify_all();
SERVER_LOG_INFO << "SearchContext " << identity_ << " finish index file: " << index_id;
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " finish index file: " << index_id;
}
void
......
......@@ -41,7 +41,7 @@ IndexLoadTask::IndexLoadTask()
}
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
ENGINE_LOG_INFO << "Loading index(" << file_->id_ << ") from location: " << file_->location_;
ENGINE_LOG_DEBUG << "Loading index(" << file_->id_ << ") from location: " << file_->location_;
server::TimeRecorder rc("Load index");
//step 1: load index
......@@ -53,7 +53,7 @@ std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
rc.Record("load index file to memory");
size_t file_size = index_ptr->PhysicalSize();
LOG(DEBUG) << "Index file type " << file_->file_type_ << " Of Size: "
ENGINE_LOG_DEBUG << "Index file type " << file_->file_type_ << " Of Size: "
<< file_size/(1024*1024) << " M";
CollectFileMetrics(file_->file_type_, file_size);
......
......@@ -51,7 +51,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
return nullptr;
}
SERVER_LOG_INFO << "Searching in index(" << index_id_<< ") with "
SERVER_LOG_DEBUG << "Searching in index(" << index_id_<< ") with "
<< search_contexts_.size() << " tasks";
server::TimeRecorder rc("DoSearch index(" + std::to_string(index_id_) + ")");
......
......@@ -16,19 +16,19 @@ namespace server {
DBWrapper::DBWrapper() {
zilliz::milvus::engine::Options opt;
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = db_config.GetValue(CONFIG_DB_URL);
std::string db_path = db_config.GetValue(CONFIG_DB_PATH);
opt.meta.path = db_path + "/db";
std::string db_slave_path = config.GetValue(CONFIG_DB_SLAVE_PATH);
std::string db_slave_path = db_config.GetValue(CONFIG_DB_SLAVE_PATH);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta.slave_paths);
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
int64_t index_size = db_config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
int64_t insert_buffer_size = config.GetInt64Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
int64_t insert_buffer_size = db_config.GetInt64Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
if (insert_buffer_size >= 1) {
opt.insert_buffer_size = insert_buffer_size * engine::ONE_GB;
}
......@@ -37,6 +37,9 @@ DBWrapper::DBWrapper() {
kill(0, SIGUSR1);
}
ConfigNode& cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE);
opt.insert_cache_immediately_ = cache_config.GetBoolValue(CONFIG_INSERT_CACHE_IMMEDIATELY, false);
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (mode == "single") {
......@@ -55,8 +58,8 @@ DBWrapper::DBWrapper() {
//set archive config
engine::ArchiveConf::CriteriaT criterial;
int64_t disk = config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0);
int64_t days = config.GetInt64Value(CONFIG_DB_ARCHIVE_DAYS, 0);
int64_t disk = db_config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0);
int64_t days = db_config.GetInt64Value(CONFIG_DB_ARCHIVE_DAYS, 0);
if(disk > 0) {
criterial[engine::ARCHIVE_CONF_DISK] = disk;
}
......
......@@ -34,6 +34,8 @@ static const std::string CONFIG_LOG = "log_config";
static const std::string CONFIG_CACHE = "cache_config";
static const std::string CONFIG_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const std::string CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const std::string CACHE_FREE_PERCENT = "cache_free_percent";
static const std::string CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const std::string CONFIG_LICENSE = "license_config";
static const std::string CONFIG_LICENSE_PATH = "license_path";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册