Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
30022e8f
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
30022e8f
编写于
7月 19, 2019
作者:
G
groot
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-256 - Add more cache config
Former-commit-id: eba2a015a331acce5296ce9087f74f840daa5093
上级
76b67cc8
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
69 addition
and
56 deletion
+69
-56
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/conf/server_config.template
cpp/conf/server_config.template
+2
-0
cpp/src/cache/Cache.cpp
cpp/src/cache/Cache.cpp
+18
-33
cpp/src/cache/Cache.h
cpp/src/cache/Cache.h
+4
-1
cpp/src/cache/CpuCacheMgr.cpp
cpp/src/cache/CpuCacheMgr.cpp
+9
-0
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+9
-5
cpp/src/db/ExecutionEngine.h
cpp/src/db/ExecutionEngine.h
+1
-1
cpp/src/db/FaissExecutionEngine.cpp
cpp/src/db/FaissExecutionEngine.cpp
+3
-5
cpp/src/db/FaissExecutionEngine.h
cpp/src/db/FaissExecutionEngine.h
+1
-1
cpp/src/db/MemManager.cpp
cpp/src/db/MemManager.cpp
+3
-1
cpp/src/db/MemTableFile.cpp
cpp/src/db/MemTableFile.cpp
+3
-1
cpp/src/db/Options.h
cpp/src/db/Options.h
+2
-0
cpp/src/server/DBWrapper.cpp
cpp/src/server/DBWrapper.cpp
+11
-8
cpp/src/server/ServerConfig.h
cpp/src/server/ServerConfig.h
+2
-0
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
30022e8f
...
...
@@ -33,6 +33,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-242 - Clean up cmake and change MAKE_BUILD_ARGS to be user defined variable
-
MS-245 - Improve search result transfer performance
-
MS-248 - Support AddVector/SearchVector profiling
-
MS-256 - Add more cache config
## New Feature
-
MS-180 - Add new mem manager
...
...
cpp/conf/server_config.template
浏览文件 @
30022e8f
...
...
@@ -34,6 +34,8 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # how much memory should be free when cache is full, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
engine_config:
nprobe: 10
...
...
cpp/src/cache/Cache.cpp
浏览文件 @
30022e8f
...
...
@@ -13,9 +13,12 @@ namespace zilliz {
namespace
milvus
{
namespace
cache
{
constexpr
double
DEFAULT_THRESHHOLD_PERCENT
=
0.85
;
Cache
::
Cache
(
int64_t
capacity
,
uint64_t
cache_max_count
)
:
usage_
(
0
),
capacity_
(
capacity
),
freemem_percent_
(
DEFAULT_THRESHHOLD_PERCENT
),
lru_
(
cache_max_count
)
{
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
...
...
@@ -64,15 +67,13 @@ void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
usage_
+=
data_ptr
->
size
();
}
// AGENT_LOG_DEBUG << "Insert into LRU(" << (capacity_ > 0 ? std::to_string(usage_ * 100 / capacity_) : "Nan")
// << "%, +" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "):"
// << " " << key;
SERVER_LOG_DEBUG
<<
"Insert "
<<
key
<<
" size:"
<<
data_ptr
->
size
()
<<
" into cache, usage: "
<<
usage_
;
}
if
(
usage_
>
capacity_
)
{
// AGENT_LOG_TRACE
<< "Current usage " << usage_
//
<< " exceeds cache capacity " << capacity_
//
<< ", start free memory";
SERVER_LOG_DEBUG
<<
"Current usage "
<<
usage_
<<
" exceeds cache capacity "
<<
capacity_
<<
", start free memory"
;
free_memory
();
}
}
...
...
@@ -86,12 +87,9 @@ void Cache::erase(const std::string& key) {
const
CacheObjPtr
&
obj_ptr
=
lru_
.
get
(
key
);
const
DataObjPtr
&
data_ptr
=
obj_ptr
->
data_
;
usage_
-=
data_ptr
->
size
();
// AGENT_LOG_DEBUG << "Erase from LRU(" << (capacity_ > 0 ? std::to_string(usage_*100/capacity_) : "Nan")
// << "%, -" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "): "
// << (data_ptr->flags().get_flag(DataObjAttr::kPinned) ? "Pinned " : "")
// << (data_ptr->flags().get_flag(DataObjAttr::kValid) ? "Valid " : "")
// << "(ref:" << obj_ptr->ref_ << ") "
// << key;
SERVER_LOG_DEBUG
<<
"Erase "
<<
key
<<
" from cache"
;
lru_
.
erase
(
key
);
}
...
...
@@ -99,7 +97,7 @@ void Cache::clear() {
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
lru_
.
clear
();
usage_
=
0
;
// AGENT_LOG_DEBUG << "Clear LRU
!";
SERVER_LOG_DEBUG
<<
"Clear cache
!"
;
}
#if 0 /* caiyd 20190221, need more testing before enable */
...
...
@@ -162,7 +160,7 @@ void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr
void
Cache
::
free_memory
()
{
if
(
usage_
<=
capacity_
)
return
;
int64_t
threshhold
=
capacity_
*
THRESHHOLD_PERCENT
;
int64_t
threshhold
=
capacity_
*
freemem_percent_
;
int64_t
delta_size
=
usage_
-
threshhold
;
std
::
set
<
std
::
string
>
key_array
;
...
...
@@ -183,7 +181,7 @@ void Cache::free_memory() {
}
}
// AGENT
_LOG_DEBUG << "to be released memory size: " << released_size;
SERVER
_LOG_DEBUG
<<
"to be released memory size: "
<<
released_size
;
for
(
auto
&
key
:
key_array
)
{
erase
(
key
);
...
...
@@ -193,28 +191,15 @@ void Cache::free_memory() {
}
void
Cache
::
print
()
{
int64_t
still_pinned_count
=
0
;
int64_t
total_pinned_size
=
0
;
int64_t
total_valid_empty_size
=
0
;
size_t
cache_count
=
0
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
for
(
auto
it
=
lru_
.
begin
();
it
!=
lru_
.
end
();
++
it
)
{
auto
&
obj_ptr
=
it
->
second
;
const
auto
&
data_ptr
=
obj_ptr
->
data_
;
if
(
data_ptr
!=
nullptr
)
{
total_pinned_size
+=
data_ptr
->
size
();
++
still_pinned_count
;
}
else
{
total_valid_empty_size
+=
data_ptr
->
size
();
}
}
cache_count
=
lru_
.
size
();
}
SERVER_LOG_DEBUG
<<
"[Still Pinned count]: "
<<
still_pinned_count
;
SERVER_LOG_DEBUG
<<
"[Pinned Memory total size(byte)]: "
<<
total_pinned_size
;
SERVER_LOG_DEBUG
<<
"[valid_empty total size(byte)]: "
<<
total_valid_empty_size
;
SERVER_LOG_DEBUG
<<
"[free memory size(byte)]: "
<<
capacity_
-
total_pinned_size
-
total_valid_empty_size
;
SERVER_LOG_DEBUG
<<
"[Cache item count]: "
<<
cache_count
;
SERVER_LOG_DEBUG
<<
"[Cache usage(byte)]: "
<<
usage_
;
SERVER_LOG_DEBUG
<<
"[Cache capacity(byte)]: "
<<
capacity_
;
}
}
// cache
...
...
cpp/src/cache/Cache.h
浏览文件 @
30022e8f
...
...
@@ -18,7 +18,6 @@ namespace milvus {
namespace
cache
{
const
std
::
string
SWAP_DIR
=
".CACHE"
;
const
float
THRESHHOLD_PERCENT
=
0.75
;
class
Cache
{
private:
...
...
@@ -45,6 +44,9 @@ public:
int64_t
capacity
()
const
{
return
capacity_
;
}
//unit: BYTE
void
set_capacity
(
int64_t
capacity
);
//unit: BYTE
double
freemem_percent
()
const
{
return
freemem_percent_
;
};
void
set_freemem_percent
(
double
percent
)
{
freemem_percent_
=
percent
;
}
size_t
size
()
const
;
bool
exists
(
const
std
::
string
&
key
);
DataObjPtr
get
(
const
std
::
string
&
key
);
...
...
@@ -57,6 +59,7 @@ public:
private:
int64_t
usage_
;
int64_t
capacity_
;
double
freemem_percent_
;
LRU
<
std
::
string
,
CacheObjPtr
>
lru_
;
mutable
std
::
mutex
mutex_
;
...
...
cpp/src/cache/CpuCacheMgr.cpp
浏览文件 @
30022e8f
...
...
@@ -6,6 +6,7 @@
#include "CpuCacheMgr.h"
#include "server/ServerConfig.h"
#include "utils/Log.h"
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -16,6 +17,14 @@ CpuCacheMgr::CpuCacheMgr() {
int64_t
cap
=
config
.
GetInt64Value
(
server
::
CONFIG_CPU_CACHE_CAPACITY
,
16
);
cap
*=
1024
*
1024
*
1024
;
cache_
=
std
::
make_shared
<
Cache
>
(
cap
,
1UL
<<
32
);
double
free_percent
=
config
.
GetDoubleValue
(
server
::
CACHE_FREE_PERCENT
,
0.85
);
if
(
free_percent
>
0.0
&&
free_percent
<=
1.0
)
{
cache_
->
set_freemem_percent
(
free_percent
);
}
else
{
SERVER_LOG_ERROR
<<
"Invalid cache_free_percent: "
<<
free_percent
<<
", defaultly set to "
<<
cache_
->
freemem_percent
();
}
}
}
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
30022e8f
...
...
@@ -92,6 +92,8 @@ DBImpl::DBImpl(const Options& options)
ENGINE_LOG_INFO
<<
"StartTimerTasks"
;
StartTimerTasks
();
}
}
Status
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
...
...
@@ -359,8 +361,9 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
ENGINE_LOG_DEBUG
<<
"New merged file "
<<
table_file
.
file_id_
<<
" of size="
<<
index
->
PhysicalSize
()
/
(
1024
*
1024
)
<<
" M"
;
//current disable this line to avoid memory
//index->Cache();
if
(
options_
.
insert_cache_immediately_
)
{
index
->
Cache
();
}
return
status
;
}
...
...
@@ -455,7 +458,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
try
{
//step 1: load index
to_index
->
Load
();
to_index
->
Load
(
options_
.
insert_cache_immediately_
);
//step 2: create table file
meta
::
TableFileSchema
table_file
;
...
...
@@ -499,8 +502,9 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
<<
index
->
PhysicalSize
()
/
(
1024
*
1024
)
<<
" M"
<<
" from file "
<<
to_remove
.
file_id_
;
//current disable this line to avoid memory
//index->Cache();
if
(
options_
.
insert_cache_immediately_
)
{
index
->
Cache
();
}
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"Build index encounter exception"
+
std
::
string
(
ex
.
what
());
...
...
cpp/src/db/ExecutionEngine.h
浏览文件 @
30022e8f
...
...
@@ -39,7 +39,7 @@ public:
virtual
Status
Serialize
()
=
0
;
virtual
Status
Load
()
=
0
;
virtual
Status
Load
(
bool
to_cache
=
true
)
=
0
;
virtual
Status
Merge
(
const
std
::
string
&
location
)
=
0
;
...
...
cpp/src/db/FaissExecutionEngine.cpp
浏览文件 @
30022e8f
...
...
@@ -79,18 +79,17 @@ Status FaissExecutionEngine::Serialize() {
return
Status
::
OK
();
}
Status
FaissExecutionEngine
::
Load
()
{
Status
FaissExecutionEngine
::
Load
(
bool
to_cache
)
{
auto
index
=
zilliz
::
milvus
::
cache
::
CpuCacheMgr
::
GetInstance
()
->
GetIndex
(
location_
);
bool
to_cache
=
false
;
bool
already_in_cache
=
(
index
!=
nullptr
)
;
auto
start_time
=
METRICS_NOW_TIME
;
if
(
!
index
)
{
index
=
read_index
(
location_
);
to_cache
=
true
;
ENGINE_LOG_DEBUG
<<
"Disk io from: "
<<
location_
;
}
pIndex_
=
index
->
data
();
if
(
to_cache
)
{
if
(
!
already_in_cache
&&
to_cache
)
{
Cache
();
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
...
...
@@ -98,7 +97,6 @@ Status FaissExecutionEngine::Load() {
server
::
Metrics
::
GetInstance
().
FaissDiskLoadDurationSecondsHistogramObserve
(
total_time
);
double
total_size
=
(
pIndex_
->
d
)
*
(
pIndex_
->
ntotal
)
*
4
;
server
::
Metrics
::
GetInstance
().
FaissDiskLoadSizeBytesHistogramObserve
(
total_size
);
// server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
server
::
Metrics
::
GetInstance
().
FaissDiskLoadIOSpeedGaugeSet
(
total_size
/
double
(
total_time
));
...
...
cpp/src/db/FaissExecutionEngine.h
浏览文件 @
30022e8f
...
...
@@ -44,7 +44,7 @@ public:
Status
Serialize
()
override
;
Status
Load
()
override
;
Status
Load
(
bool
to_cache
)
override
;
Status
Merge
(
const
std
::
string
&
location
)
override
;
...
...
cpp/src/db/MemManager.cpp
浏览文件 @
30022e8f
...
...
@@ -87,7 +87,9 @@ Status MemVectors::Serialize(std::string &table_id) {
<<
" file "
<<
schema_
.
file_id_
<<
" of size "
<<
(
double
)
(
active_engine_
->
Size
())
/
(
double
)
meta
::
M
<<
" M"
;
active_engine_
->
Cache
();
if
(
options_
.
insert_cache_immediately_
)
{
active_engine_
->
Cache
();
}
return
status
;
}
...
...
cpp/src/db/MemTableFile.cpp
浏览文件 @
30022e8f
...
...
@@ -98,7 +98,9 @@ Status MemTableFile::Serialize() {
LOG
(
DEBUG
)
<<
"New "
<<
((
table_file_schema_
.
file_type_
==
meta
::
TableFileSchema
::
RAW
)
?
"raw"
:
"to_index"
)
<<
" file "
<<
table_file_schema_
.
file_id_
<<
" of size "
<<
(
double
)
size
/
(
double
)
M
<<
" M"
;
execution_engine_
->
Cache
();
if
(
options_
.
insert_cache_immediately_
)
{
execution_engine_
->
Cache
();
}
return
status
;
}
...
...
cpp/src/db/Options.h
浏览文件 @
30022e8f
...
...
@@ -63,7 +63,9 @@ struct Options {
size_t
index_trigger_size
=
ONE_GB
;
//unit: byte
DBMetaOptions
meta
;
int
mode
=
MODE
::
SINGLE
;
size_t
insert_buffer_size
=
4
*
ONE_GB
;
bool
insert_cache_immediately_
=
false
;
};
// Options
...
...
cpp/src/server/DBWrapper.cpp
浏览文件 @
30022e8f
...
...
@@ -16,19 +16,19 @@ namespace server {
DBWrapper
::
DBWrapper
()
{
zilliz
::
milvus
::
engine
::
Options
opt
;
ConfigNode
&
config
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_DB
);
opt
.
meta
.
backend_uri
=
config
.
GetValue
(
CONFIG_DB_URL
);
std
::
string
db_path
=
config
.
GetValue
(
CONFIG_DB_PATH
);
ConfigNode
&
db_
config
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_DB
);
opt
.
meta
.
backend_uri
=
db_
config
.
GetValue
(
CONFIG_DB_URL
);
std
::
string
db_path
=
db_
config
.
GetValue
(
CONFIG_DB_PATH
);
opt
.
meta
.
path
=
db_path
+
"/db"
;
std
::
string
db_slave_path
=
config
.
GetValue
(
CONFIG_DB_SLAVE_PATH
);
std
::
string
db_slave_path
=
db_
config
.
GetValue
(
CONFIG_DB_SLAVE_PATH
);
StringHelpFunctions
::
SplitStringByDelimeter
(
db_slave_path
,
";"
,
opt
.
meta
.
slave_paths
);
int64_t
index_size
=
config
.
GetInt64Value
(
CONFIG_DB_INDEX_TRIGGER_SIZE
);
int64_t
index_size
=
db_
config
.
GetInt64Value
(
CONFIG_DB_INDEX_TRIGGER_SIZE
);
if
(
index_size
>
0
)
{
//ensure larger than zero, unit is MB
opt
.
index_trigger_size
=
(
size_t
)
index_size
*
engine
::
ONE_MB
;
}
int64_t
insert_buffer_size
=
config
.
GetInt64Value
(
CONFIG_DB_INSERT_BUFFER_SIZE
,
4
);
int64_t
insert_buffer_size
=
db_
config
.
GetInt64Value
(
CONFIG_DB_INSERT_BUFFER_SIZE
,
4
);
if
(
insert_buffer_size
>=
1
)
{
opt
.
insert_buffer_size
=
insert_buffer_size
*
engine
::
ONE_GB
;
}
...
...
@@ -37,6 +37,9 @@ DBWrapper::DBWrapper() {
kill
(
0
,
SIGUSR1
);
}
ConfigNode
&
cache_config
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_CACHE
);
opt
.
insert_cache_immediately_
=
cache_config
.
GetBoolValue
(
CONFIG_INSERT_CACHE_IMMEDIATELY
,
false
);
ConfigNode
&
serverConfig
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_SERVER
);
std
::
string
mode
=
serverConfig
.
GetValue
(
CONFIG_CLUSTER_MODE
,
"single"
);
if
(
mode
==
"single"
)
{
...
...
@@ -55,8 +58,8 @@ DBWrapper::DBWrapper() {
//set archive config
engine
::
ArchiveConf
::
CriteriaT
criterial
;
int64_t
disk
=
config
.
GetInt64Value
(
CONFIG_DB_ARCHIVE_DISK
,
0
);
int64_t
days
=
config
.
GetInt64Value
(
CONFIG_DB_ARCHIVE_DAYS
,
0
);
int64_t
disk
=
db_
config
.
GetInt64Value
(
CONFIG_DB_ARCHIVE_DISK
,
0
);
int64_t
days
=
db_
config
.
GetInt64Value
(
CONFIG_DB_ARCHIVE_DAYS
,
0
);
if
(
disk
>
0
)
{
criterial
[
engine
::
ARCHIVE_CONF_DISK
]
=
disk
;
}
...
...
cpp/src/server/ServerConfig.h
浏览文件 @
30022e8f
...
...
@@ -34,6 +34,8 @@ static const std::string CONFIG_LOG = "log_config";
static
const
std
::
string
CONFIG_CACHE
=
"cache_config"
;
static
const
std
::
string
CONFIG_CPU_CACHE_CAPACITY
=
"cpu_cache_capacity"
;
static
const
std
::
string
CONFIG_GPU_CACHE_CAPACITY
=
"gpu_cache_capacity"
;
static
const
std
::
string
CACHE_FREE_PERCENT
=
"cache_free_percent"
;
static
const
std
::
string
CONFIG_INSERT_CACHE_IMMEDIATELY
=
"insert_cache_immediately"
;
static
const
std
::
string
CONFIG_LICENSE
=
"license_config"
;
static
const
std
::
string
CONFIG_LICENSE_PATH
=
"license_path"
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录