Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
8b652a00
M
milvus
项目概览
milvus
/
milvus
12 个月 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8b652a00
编写于
11月 26, 2019
作者:
G
groot
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
#530 BuildIndex stop when do build index and search simultaneously
上级
11d1320b
变更
12
显示空白变更内容
内联
并排
Showing
12 changed file
with
157 addition
and
19 deletion
+157
-19
CHANGELOG.md
CHANGELOG.md
+1
-0
core/src/cache/Cache.inl
core/src/cache/Cache.inl
+2
-1
core/src/db/DBImpl.cpp
core/src/db/DBImpl.cpp
+12
-5
core/src/db/engine/ExecutionEngineImpl.cpp
core/src/db/engine/ExecutionEngineImpl.cpp
+5
-0
core/src/db/meta/Meta.h
core/src/db/meta/Meta.h
+6
-2
core/src/db/meta/MySQLMetaImpl.cpp
core/src/db/meta/MySQLMetaImpl.cpp
+46
-3
core/src/db/meta/MySQLMetaImpl.h
core/src/db/meta/MySQLMetaImpl.h
+5
-2
core/src/db/meta/SqliteMetaImpl.cpp
core/src/db/meta/SqliteMetaImpl.cpp
+48
-3
core/src/db/meta/SqliteMetaImpl.h
core/src/db/meta/SqliteMetaImpl.h
+5
-2
core/src/utils/CommonUtil.cpp
core/src/utils/CommonUtil.cpp
+23
-0
core/src/utils/CommonUtil.h
core/src/utils/CommonUtil.h
+3
-0
core/unittest/db/test_meta.cpp
core/unittest/db/test_meta.cpp
+1
-1
未找到文件。
CHANGELOG.md
浏览文件 @
8b652a00
...
...
@@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
\#
486 - gpu no usage during index building
-
\#
509 - IVF_PQ index build trapped into dead loop caused by invalid params
-
\#
513 - Unittest DELETE_BY_RANGE sometimes failed
-
\#
523 - Erase file data from cache once the file is marked as deleted
-
\#
527 - faiss benchmark not compatible with faiss 1.6.0
-
\#
530 - BuildIndex stop when do build index and search simultaneously
...
...
core/src/cache/Cache.inl
浏览文件 @
8b652a00
...
...
@@ -115,7 +115,8 @@ Cache<ItemObj>::erase(const std::string& key) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_
<< " bytes";
lru_.erase(key);
}
...
...
core/src/db/DBImpl.cpp
浏览文件 @
8b652a00
...
...
@@ -112,7 +112,7 @@ DBImpl::Stop() {
bg_timer_thread_
.
join
();
if
(
options_
.
mode_
!=
DBOptions
::
MODE
::
CLUSTER_READONLY
)
{
meta_ptr_
->
CleanUp
();
meta_ptr_
->
CleanUp
ShadowFiles
();
}
// ENGINE_LOG_TRACE << "DB service stop";
...
...
@@ -777,11 +777,18 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_
->
Archive
();
int
ttl
=
5
*
meta
::
M_SEC
;
// default: file will be deleted after 5 minutes
{
uint64_t
ttl
=
10
*
meta
::
SECOND
;
// default: file data will be erase from cache after few seconds
meta_ptr_
->
CleanUpCacheWithTTL
(
ttl
);
}
{
uint64_t
ttl
=
5
*
meta
::
M_SEC
;
// default: file will be deleted after few minutes
if
(
options_
.
mode_
==
DBOptions
::
MODE
::
CLUSTER_WRITABLE
)
{
ttl
=
meta
::
D_SEC
;
}
meta_ptr_
->
CleanUpFilesWithTTL
(
ttl
);
}
// ENGINE_LOG_TRACE << " Background compaction thread exit";
}
...
...
core/src/db/engine/ExecutionEngineImpl.cpp
浏览文件 @
8b652a00
...
...
@@ -257,6 +257,11 @@ ExecutionEngineImpl::PhysicalSize() const {
Status
ExecutionEngineImpl
::
Serialize
()
{
auto
status
=
write_index
(
index_
,
location_
);
// here we reset index size by file size,
// since some index type(such as SQ8) data size become smaller after serialized
index_
->
set_size
(
PhysicalSize
());
return
status
;
}
...
...
core/src/db/meta/Meta.h
浏览文件 @
8b652a00
...
...
@@ -118,9 +118,13 @@ class Meta {
Archive
()
=
0
;
virtual
Status
CleanUp
()
=
0
;
CleanUp
ShadowFiles
()
=
0
;
virtual
Status
CleanUpFilesWithTTL
(
uint16_t
)
=
0
;
virtual
Status
CleanUpCacheWithTTL
(
uint64_t
seconds
)
=
0
;
virtual
Status
CleanUpFilesWithTTL
(
uint64_t
seconds
)
=
0
;
virtual
Status
DropAll
()
=
0
;
...
...
core/src/db/meta/MySQLMetaImpl.cpp
浏览文件 @
8b652a00
...
...
@@ -20,6 +20,7 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
...
...
@@ -292,7 +293,7 @@ MySQLMetaImpl::Initialize() {
// step 5: create meta tables
try
{
if
(
mode_
!=
DBOptions
::
MODE
::
CLUSTER_READONLY
)
{
CleanUp
();
CleanUp
ShadowFiles
();
}
{
...
...
@@ -1710,7 +1711,7 @@ MySQLMetaImpl::Size(uint64_t& result) {
}
Status
MySQLMetaImpl
::
CleanUp
()
{
MySQLMetaImpl
::
CleanUp
ShadowFiles
()
{
try
{
mysqlpp
::
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab_
);
...
...
@@ -1752,7 +1753,49 @@ MySQLMetaImpl::CleanUp() {
}
Status
MySQLMetaImpl
::
CleanUpFilesWithTTL
(
uint16_t
seconds
)
{
MySQLMetaImpl
::
CleanUpCacheWithTTL
(
uint64_t
seconds
)
{
auto
now
=
utils
::
GetMicroSecTimeStamp
();
// erase deleted/backup files from cache
try
{
server
::
MetricCollector
metric
;
mysqlpp
::
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab_
);
if
(
connectionPtr
==
nullptr
)
{
return
Status
(
DB_ERROR
,
"Failed to connect to meta server(mysql)"
);
}
mysqlpp
::
Query
cleanUpFilesWithTTLQuery
=
connectionPtr
->
query
();
cleanUpFilesWithTTLQuery
<<
"SELECT id, table_id, file_id, date"
<<
" FROM "
<<
META_TABLEFILES
<<
" WHERE file_type IN ("
<<
std
::
to_string
(
TableFileSchema
::
TO_DELETE
)
<<
","
<<
std
::
to_string
(
TableFileSchema
::
BACKUP
)
<<
")"
<<
" AND updated_time < "
<<
std
::
to_string
(
now
-
seconds
*
US_PS
)
<<
";"
;
mysqlpp
::
StoreQueryResult
res
=
cleanUpFilesWithTTLQuery
.
store
();
TableFileSchema
table_file
;
std
::
vector
<
std
::
string
>
idsToDelete
;
for
(
auto
&
resRow
:
res
)
{
table_file
.
id_
=
resRow
[
"id"
];
// implicit conversion
resRow
[
"table_id"
].
to_string
(
table_file
.
table_id_
);
resRow
[
"file_id"
].
to_string
(
table_file
.
file_id_
);
table_file
.
date_
=
resRow
[
"date"
];
utils
::
GetTableFilePath
(
options_
,
table_file
);
server
::
CommonUtil
::
EraseFromCache
(
table_file
.
location_
);
}
}
catch
(
std
::
exception
&
e
)
{
return
HandleException
(
"GENERAL ERROR WHEN CLEANING UP FILES WITH TTL"
,
e
.
what
());
}
return
Status
::
OK
();
}
Status
MySQLMetaImpl
::
CleanUpFilesWithTTL
(
uint64_t
seconds
)
{
auto
now
=
utils
::
GetMicroSecTimeStamp
();
std
::
set
<
std
::
string
>
table_ids
;
...
...
core/src/db/meta/MySQLMetaImpl.h
浏览文件 @
8b652a00
...
...
@@ -117,10 +117,13 @@ class MySQLMetaImpl : public Meta {
Size
(
uint64_t
&
result
)
override
;
Status
CleanUp
()
override
;
CleanUp
ShadowFiles
()
override
;
Status
CleanUpFilesWithTTL
(
uint16_t
seconds
)
override
;
CleanUpCacheWithTTL
(
uint64_t
seconds
)
override
;
Status
CleanUpFilesWithTTL
(
uint64_t
seconds
)
override
;
Status
DropAll
()
override
;
...
...
core/src/db/meta/SqliteMetaImpl.cpp
浏览文件 @
8b652a00
...
...
@@ -20,6 +20,7 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
...
...
@@ -154,7 +155,7 @@ SqliteMetaImpl::Initialize() {
ConnectorPtr
->
open_forever
();
// thread safe option
ConnectorPtr
->
pragma
.
journal_mode
(
journal_mode
::
WAL
);
// WAL => write ahead log
CleanUp
();
CleanUp
ShadowFiles
();
return
Status
::
OK
();
}
...
...
@@ -1231,7 +1232,7 @@ SqliteMetaImpl::Size(uint64_t& result) {
}
Status
SqliteMetaImpl
::
CleanUp
()
{
SqliteMetaImpl
::
CleanUp
ShadowFiles
()
{
try
{
server
::
MetricCollector
metric
;
...
...
@@ -1269,7 +1270,51 @@ SqliteMetaImpl::CleanUp() {
}
Status
SqliteMetaImpl
::
CleanUpFilesWithTTL
(
uint16_t
seconds
)
{
SqliteMetaImpl
::
CleanUpCacheWithTTL
(
uint64_t
seconds
)
{
auto
now
=
utils
::
GetMicroSecTimeStamp
();
// erase deleted/backup files from cache
try
{
server
::
MetricCollector
metric
;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std
::
lock_guard
<
std
::
mutex
>
meta_lock
(
meta_mutex_
);
std
::
vector
<
int
>
file_types
=
{
(
int
)
TableFileSchema
::
TO_DELETE
,
(
int
)
TableFileSchema
::
BACKUP
,
};
auto
files
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
&
TableFileSchema
::
table_id_
,
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
date_
),
where
(
in
(
&
TableFileSchema
::
file_type_
,
file_types
)
and
c
(
&
TableFileSchema
::
updated_time_
)
<
now
-
seconds
*
US_PS
));
for
(
auto
&
file
:
files
)
{
TableFileSchema
table_file
;
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
table_id_
=
std
::
get
<
1
>
(
file
);
table_file
.
file_id_
=
std
::
get
<
2
>
(
file
);
table_file
.
date_
=
std
::
get
<
3
>
(
file
);
utils
::
GetTableFilePath
(
options_
,
table_file
);
server
::
CommonUtil
::
EraseFromCache
(
table_file
.
location_
);
}
}
catch
(
std
::
exception
&
e
)
{
return
HandleException
(
"Encounter exception when clean cache"
,
e
.
what
());
}
return
Status
::
OK
();
}
Status
SqliteMetaImpl
::
CleanUpFilesWithTTL
(
uint64_t
seconds
)
{
auto
now
=
utils
::
GetMicroSecTimeStamp
();
std
::
set
<
std
::
string
>
table_ids
;
...
...
core/src/db/meta/SqliteMetaImpl.h
浏览文件 @
8b652a00
...
...
@@ -117,10 +117,13 @@ class SqliteMetaImpl : public Meta {
Archive
()
override
;
Status
CleanUp
()
override
;
CleanUp
ShadowFiles
()
override
;
Status
CleanUpFilesWithTTL
(
uint16_t
seconds
)
override
;
CleanUpCacheWithTTL
(
uint64_t
seconds
)
override
;
Status
CleanUpFilesWithTTL
(
uint64_t
seconds
)
override
;
Status
DropAll
()
override
;
...
...
core/src/utils/CommonUtil.cpp
浏览文件 @
8b652a00
...
...
@@ -16,6 +16,9 @@
// under the License.
#include "utils/CommonUtil.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "server/Config.h"
#include "utils/Log.h"
#include <dirent.h>
...
...
@@ -27,6 +30,7 @@
#include <unistd.h>
#include <iostream>
#include <thread>
#include <vector>
#include "boost/filesystem.hpp"
...
...
@@ -222,5 +226,24 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) {
time_integer
=
mktime
(
&
time_struct
);
}
void
CommonUtil
::
EraseFromCache
(
const
std
::
string
&
item_key
)
{
if
(
item_key
.
empty
())
{
SERVER_LOG_ERROR
<<
"Empty key cannot be erased from cache"
;
return
;
}
cache
::
CpuCacheMgr
::
GetInstance
()
->
EraseItem
(
item_key
);
#ifdef MILVUS_GPU_VERSION
server
::
Config
&
config
=
server
::
Config
::
GetInstance
();
std
::
vector
<
int64_t
>
gpus
;
Status
s
=
config
.
GetGpuResourceConfigSearchResources
(
gpus
);
for
(
auto
&
gpu
:
gpus
)
{
cache
::
GpuCacheMgr
::
GetInstance
(
gpu
)
->
EraseItem
(
item_key
);
}
#endif
}
}
// namespace server
}
// namespace milvus
core/src/utils/CommonUtil.h
浏览文件 @
8b652a00
...
...
@@ -56,6 +56,9 @@ class CommonUtil {
ConvertTime
(
time_t
time_integer
,
tm
&
time_struct
);
static
void
ConvertTime
(
tm
time_struct
,
time_t
&
time_integer
);
static
void
EraseFromCache
(
const
std
::
string
&
item_key
);
};
}
// namespace server
...
...
core/unittest/db/test_meta.cpp
浏览文件 @
8b652a00
...
...
@@ -329,7 +329,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
status
=
impl_
->
CreateTableFile
(
table_file
);
table_file
.
file_type_
=
milvus
::
engine
::
meta
::
TableFileSchema
::
NEW
;
status
=
impl_
->
UpdateTableFile
(
table_file
);
status
=
impl_
->
CleanUp
();
status
=
impl_
->
CleanUp
ShadowFiles
();
ASSERT_TRUE
(
status
.
ok
());
status
=
impl_
->
DropTable
(
table_id
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录