Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
28a1b252
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
28a1b252
编写于
8月 29, 2019
作者:
S
starlord
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-430 Search no result if index created with FLAT
Former-commit-id: cd4c3674863f084e25cb87fcb278674c94a6876e
上级
4a612047
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
168 addition
and
313 deletion
+168
-313
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+52
-32
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+4
-2
cpp/src/db/meta/Meta.h
cpp/src/db/meta/Meta.h
+7
-5
cpp/src/db/meta/MySQLMetaImpl.cpp
cpp/src/db/meta/MySQLMetaImpl.cpp
+64
-132
cpp/src/db/meta/MySQLMetaImpl.h
cpp/src/db/meta/MySQLMetaImpl.h
+3
-5
cpp/src/db/meta/SqliteMetaImpl.cpp
cpp/src/db/meta/SqliteMetaImpl.cpp
+21
-121
cpp/src/db/meta/SqliteMetaImpl.h
cpp/src/db/meta/SqliteMetaImpl.h
+3
-5
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
+1
-0
cpp/unittest/db/db_tests.cpp
cpp/unittest/db/db_tests.cpp
+0
-2
cpp/unittest/db/meta_tests.cpp
cpp/unittest/db/meta_tests.cpp
+3
-3
cpp/unittest/db/mysql_meta_test.cpp
cpp/unittest/db/mysql_meta_test.cpp
+9
-6
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
28a1b252
...
...
@@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-432 - Search vectors params nprobe need to check max number
-
MS-431 - Search vectors params nprobe: 0/-1, expected result: raise exception
-
MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9))
-
MS-430 - Search no result if index created with FLAT
## Improvement
-
MS-327 - Clean code for milvus
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
28a1b252
...
...
@@ -98,7 +98,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
meta
::
DatePartionedTableFilesSchema
files
;
meta
::
DatesT
dates
;
auto
status
=
meta_ptr_
->
FilesToSearch
(
table_id
,
dates
,
files
);
std
::
vector
<
size_t
>
ids
;
auto
status
=
meta_ptr_
->
FilesToSearch
(
table_id
,
ids
,
dates
,
files
);
if
(
!
status
.
ok
())
{
return
status
;
}
...
...
@@ -173,7 +174,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6
//get all table files from table
meta
::
DatePartionedTableFilesSchema
files
;
auto
status
=
meta_ptr_
->
FilesToSearch
(
table_id
,
dates
,
files
);
std
::
vector
<
size_t
>
ids
;
auto
status
=
meta_ptr_
->
FilesToSearch
(
table_id
,
ids
,
dates
,
files
);
if
(
!
status
.
ok
())
{
return
status
;
}
meta
::
TableFilesSchema
file_id_array
;
...
...
@@ -334,14 +336,8 @@ void DBImpl::StartMetricTask() {
ENGINE_LOG_TRACE
<<
"Metric task finished"
;
}
void
DBImpl
::
StartCompactionTask
()
{
static
uint64_t
compact_clock_tick
=
0
;
compact_clock_tick
++
;
if
(
compact_clock_tick
%
COMPACT_ACTION_INTERVAL
!=
0
)
{
return
;
}
//serialize memory data
Status
DBImpl
::
MemSerialize
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
mem_serialize_mutex_
);
std
::
set
<
std
::
string
>
temp_table_ids
;
mem_mgr_
->
Serialize
(
temp_table_ids
);
for
(
auto
&
id
:
temp_table_ids
)
{
...
...
@@ -352,6 +348,19 @@ void DBImpl::StartCompactionTask() {
SERVER_LOG_DEBUG
<<
"Insert cache serialized"
;
}
return
Status
::
OK
();
}
void
DBImpl
::
StartCompactionTask
()
{
static
uint64_t
compact_clock_tick
=
0
;
compact_clock_tick
++
;
if
(
compact_clock_tick
%
COMPACT_ACTION_INTERVAL
!=
0
)
{
return
;
}
//serialize memory data
MemSerialize
();
//compactiong has been finished?
if
(
!
compact_thread_results_
.
empty
())
{
std
::
chrono
::
milliseconds
span
(
10
);
...
...
@@ -535,39 +544,49 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
return
status
;
}
if
(
utils
::
IsSameIndex
(
old_index
,
index
))
{
ENGINE_LOG_DEBUG
<<
"Same index setting, no need to create index again"
;
return
Status
::
OK
();
}
//step 2: update index info
if
(
!
utils
::
IsSameIndex
(
old_index
,
index
))
{
DropIndex
(
table_id
);
//step 2: drop old index files
DropIndex
(
table_id
);
//step 3: update index info
status
=
meta_ptr_
->
UpdateTableIndexParam
(
table_id
,
index
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Failed to update table index info"
;
return
status
;
status
=
meta_ptr_
->
UpdateTableIndexParam
(
table_id
,
index
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Failed to update table index info"
;
return
status
;
}
}
}
if
(
index
.
engine_type_
==
(
int
)
EngineType
::
FAISS_IDMAP
)
{
ENGINE_LOG_DEBUG
<<
"index type = IDMAP, no need to build index"
;
return
Status
::
OK
();
}
//step 3: wait and build index
//for IDMAP type, only wait all NEW file converted to RAW file
//for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
std
::
vector
<
int
>
file_types
;
if
(
index
.
engine_type_
==
(
int
)
EngineType
::
FAISS_IDMAP
)
{
file_types
=
{
(
int
)
meta
::
TableFileSchema
::
NEW
,
(
int
)
meta
::
TableFileSchema
::
NEW_MERGE
,
};
}
else
{
file_types
=
{
(
int
)
meta
::
TableFileSchema
::
RAW
,
(
int
)
meta
::
TableFileSchema
::
NEW
,
(
int
)
meta
::
TableFileSchema
::
NEW_MERGE
,
(
int
)
meta
::
TableFileSchema
::
NEW_INDEX
,
(
int
)
meta
::
TableFileSchema
::
TO_INDEX
,
};
}
bool
has
=
false
;
auto
status
=
meta_ptr_
->
HasNonIndexFiles
(
table_id
,
ha
s
);
std
::
vector
<
std
::
string
>
file_ids
;
auto
status
=
meta_ptr_
->
FilesByType
(
table_id
,
file_types
,
file_id
s
);
int
times
=
1
;
while
(
has
)
{
while
(
!
file_ids
.
empty
()
)
{
ENGINE_LOG_DEBUG
<<
"Non index files detected! Will build index "
<<
times
;
status
=
meta_ptr_
->
UpdateTableFilesToIndex
(
table_id
);
/* StartBuildIndexTask(true); */
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
std
::
min
(
10
*
1000
,
times
*
100
)));
status
=
meta_ptr_
->
HasNonIndexFiles
(
table_id
,
ha
s
);
status
=
meta_ptr_
->
FilesByType
(
table_id
,
file_types
,
file_id
s
);
times
++
;
}
return
Status
::
OK
();
}
...
...
@@ -576,6 +595,7 @@ Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
}
Status
DBImpl
::
DropIndex
(
const
std
::
string
&
table_id
)
{
ENGINE_LOG_DEBUG
<<
"drop index for table: "
<<
table_id
;
return
meta_ptr_
->
DropTableIndex
(
table_id
);
}
...
...
cpp/src/db/DBImpl.h
浏览文件 @
28a1b252
...
...
@@ -117,8 +117,9 @@ class DBImpl : public DB {
void
StartBuildIndexTask
(
bool
force
=
false
);
void
BackgroundBuildIndex
();
Status
BuildIndex
(
const
meta
::
TableFileSchema
&
);
Status
BuildIndex
(
const
meta
::
TableFileSchema
&
);
Status
MemSerialize
();
private:
const
Options
options_
;
...
...
@@ -129,6 +130,7 @@ class DBImpl : public DB {
MetaPtr
meta_ptr_
;
MemManagerAbstractPtr
mem_mgr_
;
std
::
mutex
mem_serialize_mutex_
;
server
::
ThreadPool
compact_thread_pool_
;
std
::
list
<
std
::
future
<
void
>>
compact_thread_results_
;
...
...
cpp/src/db/meta/Meta.h
浏览文件 @
28a1b252
...
...
@@ -70,10 +70,10 @@ class Meta {
UpdateTableFiles
(
TableFilesSchema
&
files
)
=
0
;
virtual
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
...
...
@@ -88,7 +88,9 @@ class Meta {
FilesToIndex
(
TableFilesSchema
&
)
=
0
;
virtual
Status
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
=
0
;
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>&
file_ids
)
=
0
;
virtual
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
=
0
;
...
...
cpp/src/db/meta/MySQLMetaImpl.cpp
浏览文件 @
28a1b252
...
...
@@ -326,10 +326,16 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
}
}
Status
MySQLMetaImpl
::
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
{
has
=
false
;
Status
MySQLMetaImpl
::
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>
&
file_ids
)
{
if
(
file_types
.
empty
())
{
return
Status
::
Error
(
"file types array is empty"
);
}
try
{
file_ids
.
clear
();
StoreQueryResult
res
;
{
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
);
...
...
@@ -338,34 +344,75 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
return
Status
::
Error
(
"Failed to connect to database server"
);
}
std
::
string
types
;
for
(
auto
type
:
file_types
)
{
if
(
!
types
.
empty
())
{
types
+=
","
;
}
types
+=
std
::
to_string
(
type
);
}
Query
hasNonIndexFilesQuery
=
connectionPtr
->
query
();
//since table_id is a unique column we just need to check whether it exists or not
hasNonIndexFilesQuery
<<
"SELECT EXISTS "
<<
"(SELECT 1 FROM TableFiles "
<<
hasNonIndexFilesQuery
<<
"SELECT file_id, file_type FROM TableFiles "
<<
"WHERE table_id = "
<<
quote
<<
table_id
<<
" AND "
<<
"(file_type = "
<<
std
::
to_string
(
TableFileSchema
::
RAW
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
NEW
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
NEW_MERGE
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
NEW_INDEX
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
TO_INDEX
)
<<
")) "
<<
"AS "
<<
quote
<<
"check"
<<
";"
;
"file_type in ("
<<
types
<<
");"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::
HasNonIndexFiles
: "
<<
hasNonIndexFilesQuery
.
str
();
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::
FilesByType
: "
<<
hasNonIndexFilesQuery
.
str
();
res
=
hasNonIndexFilesQuery
.
store
();
}
//Scoped Connection
int
check
=
res
[
0
][
"check"
];
has
=
(
check
==
1
);
if
(
res
.
num_rows
()
>
0
)
{
int
raw_count
=
0
,
new_count
=
0
,
new_merge_count
=
0
,
new_index_count
=
0
;
int
to_index_count
=
0
,
index_count
=
0
,
backup_count
=
0
;
for
(
auto
&
resRow
:
res
)
{
std
::
string
file_id
;
resRow
[
"file_id"
].
to_string
(
file_id
);
file_ids
.
push_back
(
file_id
);
int32_t
file_type
=
resRow
[
"file_type"
];
switch
(
file_type
)
{
case
(
int
)
TableFileSchema
::
RAW
:
raw_count
++
;
break
;
case
(
int
)
TableFileSchema
::
NEW
:
new_count
++
;
break
;
case
(
int
)
TableFileSchema
::
NEW_MERGE
:
new_merge_count
++
;
break
;
case
(
int
)
TableFileSchema
::
NEW_INDEX
:
new_index_count
++
;
break
;
case
(
int
)
TableFileSchema
::
TO_INDEX
:
to_index_count
++
;
break
;
case
(
int
)
TableFileSchema
::
INDEX
:
index_count
++
;
break
;
case
(
int
)
TableFileSchema
::
BACKUP
:
backup_count
++
;
break
;
default:
break
;
}
}
ENGINE_LOG_DEBUG
<<
"Table "
<<
table_id
<<
" currently has raw files:"
<<
raw_count
<<
" new files:"
<<
new_count
<<
" new_merge files:"
<<
new_merge_count
<<
" new_index files:"
<<
new_index_count
<<
" to_index files:"
<<
to_index_count
<<
" index files:"
<<
index_count
<<
" backup files:"
<<
backup_count
;
}
}
catch
(
const
BadQuery
&
er
)
{
// Handle any query errors
ENGINE_LOG_ERROR
<<
"QUERY ERROR WHEN
CHECKING IF NON INDEX FILES EXISTS
"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"QUERY ERROR WHEN
CHECKING IF NON INDEX FILES EXISTS
"
,
er
.
what
());
ENGINE_LOG_ERROR
<<
"QUERY ERROR WHEN
GET FILE BY TYPE
"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"QUERY ERROR WHEN
GET FILE BY TYPE
"
,
er
.
what
());
}
catch
(
const
Exception
&
er
)
{
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR
<<
"GENERAL ERROR WHEN
CHECKING IF NON INDEX FILES EXISTS
"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"GENERAL ERROR WHEN
CHECKING IF NON INDEX FILES EXISTS
"
,
er
.
what
());
ENGINE_LOG_ERROR
<<
"GENERAL ERROR WHEN
GET FILE BY TYPE
"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"GENERAL ERROR WHEN
GET FILE BY TYPE
"
,
er
.
what
());
}
return
Status
::
OK
();
...
...
@@ -987,121 +1034,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
return
Status
::
OK
();
}
Status
MySQLMetaImpl
::
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
{
files
.
clear
();
try
{
server
::
MetricCollector
metric
;
StoreQueryResult
res
;
{
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
);
if
(
connectionPtr
==
nullptr
)
{
return
Status
::
Error
(
"Failed to connect to database server"
);
}
if
(
partition
.
empty
())
{
Query
filesToSearchQuery
=
connectionPtr
->
query
();
filesToSearchQuery
<<
"SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date "
<<
"FROM TableFiles "
<<
"WHERE table_id = "
<<
quote
<<
table_id
<<
" AND "
<<
"(file_type = "
<<
std
::
to_string
(
TableFileSchema
::
RAW
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
TO_INDEX
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
INDEX
)
<<
");"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::FilesToSearch: "
<<
filesToSearchQuery
.
str
();
res
=
filesToSearchQuery
.
store
();
}
else
{
Query
filesToSearchQuery
=
connectionPtr
->
query
();
std
::
stringstream
partitionListSS
;
for
(
auto
&
date
:
partition
)
{
partitionListSS
<<
std
::
to_string
(
date
)
<<
", "
;
}
std
::
string
partitionListStr
=
partitionListSS
.
str
();
partitionListStr
=
partitionListStr
.
substr
(
0
,
partitionListStr
.
size
()
-
2
);
//remove the last ", "
filesToSearchQuery
<<
"SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date "
<<
"FROM TableFiles "
<<
"WHERE table_id = "
<<
quote
<<
table_id
<<
" AND "
<<
"date IN ("
<<
partitionListStr
<<
") AND "
<<
"(file_type = "
<<
std
::
to_string
(
TableFileSchema
::
RAW
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
TO_INDEX
)
<<
" OR "
<<
"file_type = "
<<
std
::
to_string
(
TableFileSchema
::
INDEX
)
<<
");"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::FilesToSearch: "
<<
filesToSearchQuery
.
str
();
res
=
filesToSearchQuery
.
store
();
}
}
//Scoped Connection
TableSchema
table_schema
;
table_schema
.
table_id_
=
table_id
;
auto
status
=
DescribeTable
(
table_schema
);
if
(
!
status
.
ok
())
{
return
status
;
}
TableFileSchema
table_file
;
for
(
auto
&
resRow
:
res
)
{
table_file
.
id_
=
resRow
[
"id"
];
//implicit conversion
std
::
string
table_id_str
;
resRow
[
"table_id"
].
to_string
(
table_id_str
);
table_file
.
table_id_
=
table_id_str
;
table_file
.
index_file_size_
=
table_schema
.
index_file_size_
;
table_file
.
engine_type_
=
resRow
[
"engine_type"
];
table_file
.
nlist_
=
table_schema
.
nlist_
;
table_file
.
metric_type_
=
table_schema
.
metric_type_
;
std
::
string
file_id
;
resRow
[
"file_id"
].
to_string
(
file_id
);
table_file
.
file_id_
=
file_id
;
table_file
.
file_type_
=
resRow
[
"file_type"
];
table_file
.
file_size_
=
resRow
[
"file_size"
];
table_file
.
row_count_
=
resRow
[
"row_count"
];
table_file
.
date_
=
resRow
[
"date"
];
table_file
.
dimension_
=
table_schema
.
dimension_
;
utils
::
GetTableFilePath
(
options_
,
table_file
);
auto
dateItr
=
files
.
find
(
table_file
.
date_
);
if
(
dateItr
==
files
.
end
())
{
files
[
table_file
.
date_
]
=
TableFilesSchema
();
}
files
[
table_file
.
date_
].
push_back
(
table_file
);
}
}
catch
(
const
BadQuery
&
er
)
{
// Handle any query errors
ENGINE_LOG_ERROR
<<
"QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH"
,
er
.
what
());
}
catch
(
const
Exception
&
er
)
{
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR
<<
"GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH"
,
er
.
what
());
}
return
Status
::
OK
();
}
Status
MySQLMetaImpl
::
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
...
...
cpp/src/db/meta/MySQLMetaImpl.h
浏览文件 @
28a1b252
...
...
@@ -46,7 +46,9 @@ class MySQLMetaImpl : public Meta {
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
override
;
Status
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
override
;
Status
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>
&
file_ids
)
override
;
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
override
;
...
...
@@ -62,10 +64,6 @@ class MySQLMetaImpl : public Meta {
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
...
...
cpp/src/db/meta/SqliteMetaImpl.cpp
浏览文件 @
28a1b252
...
...
@@ -279,28 +279,26 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
return
Status
::
OK
();
}
Status
SqliteMetaImpl
::
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
{
has
=
false
;
Status
SqliteMetaImpl
::
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>&
file_types
,
std
::
vector
<
std
::
string
>&
file_ids
)
{
if
(
file_types
.
empty
())
{
return
Status
::
Error
(
"file types array is empty"
);
}
try
{
std
::
vector
<
int
>
file_types
=
{
(
int
)
TableFileSchema
::
RAW
,
(
int
)
TableFileSchema
::
NEW
,
(
int
)
TableFileSchema
::
NEW_MERGE
,
(
int
)
TableFileSchema
::
NEW_INDEX
,
(
int
)
TableFileSchema
::
TO_INDEX
,
};
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
file_ids
.
clear
();
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
file_type_
),
where
(
in
(
&
TableFileSchema
::
file_type_
,
file_types
)
and
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
));
if
(
selected
.
size
()
>=
1
)
{
has
=
true
;
int
raw_count
=
0
,
new_count
=
0
,
new_merge_count
=
0
,
new_index_count
=
0
,
to_index_count
=
0
;
std
::
vector
<
std
::
string
>
file_ids
;
int
raw_count
=
0
,
new_count
=
0
,
new_merge_count
=
0
,
new_index_count
=
0
;
int
to_index_count
=
0
,
index_count
=
0
,
backup_count
=
0
;
for
(
auto
&
file
:
selected
)
{
file_ids
.
push_back
(
std
::
get
<
0
>
(
file
));
switch
(
std
::
get
<
1
>
(
file
))
{
case
(
int
)
TableFileSchema
::
RAW
:
raw_count
++
;
...
...
@@ -317,14 +315,21 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
case
(
int
)
TableFileSchema
::
TO_INDEX
:
to_index_count
++
;
break
;
case
(
int
)
TableFileSchema
::
INDEX
:
index_count
++
;
break
;
case
(
int
)
TableFileSchema
::
BACKUP
:
backup_count
++
;
break
;
default:
break
;
}
}
ENGINE_LOG_DEBUG
<<
"Table "
<<
table_id
<<
" currently has raw files:"
<<
raw_count
<<
" new files:"
<<
new_count
<<
" new_merge files:"
<<
new_merge_count
<<
" new_index files:"
<<
new_index_count
<<
" to_index files:"
<<
to_index_count
;
<<
" new files:"
<<
new_count
<<
" new_merge files:"
<<
new_merge_count
<<
" new_index files:"
<<
new_index_count
<<
" to_index files:"
<<
to_index_count
<<
" index files:"
<<
index_count
<<
" backup files:"
<<
backup_count
;
}
}
catch
(
std
::
exception
&
e
)
{
...
...
@@ -633,111 +638,6 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
return
Status
::
OK
();
}
Status
SqliteMetaImpl
::
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
{
files
.
clear
();
try
{
server
::
MetricCollector
metric
;
if
(
partition
.
empty
())
{
std
::
vector
<
int
>
file_type
=
{(
int
)
TableFileSchema
::
RAW
,
(
int
)
TableFileSchema
::
TO_INDEX
,
(
int
)
TableFileSchema
::
INDEX
};
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
&
TableFileSchema
::
table_id_
,
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
file_type_
,
&
TableFileSchema
::
file_size_
,
&
TableFileSchema
::
row_count_
,
&
TableFileSchema
::
date_
,
&
TableFileSchema
::
engine_type_
),
where
(
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
and
in
(
&
TableFileSchema
::
file_type_
,
file_type
)));
TableSchema
table_schema
;
table_schema
.
table_id_
=
table_id
;
auto
status
=
DescribeTable
(
table_schema
);
if
(
!
status
.
ok
())
{
return
status
;
}
TableFileSchema
table_file
;
for
(
auto
&
file
:
selected
)
{
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
table_id_
=
std
::
get
<
1
>
(
file
);
table_file
.
file_id_
=
std
::
get
<
2
>
(
file
);
table_file
.
file_type_
=
std
::
get
<
3
>
(
file
);
table_file
.
file_size_
=
std
::
get
<
4
>
(
file
);
table_file
.
row_count_
=
std
::
get
<
5
>
(
file
);
table_file
.
date_
=
std
::
get
<
6
>
(
file
);
table_file
.
engine_type_
=
std
::
get
<
7
>
(
file
);
table_file
.
dimension_
=
table_schema
.
dimension_
;
table_file
.
index_file_size_
=
table_schema
.
index_file_size_
;
table_file
.
nlist_
=
table_schema
.
nlist_
;
table_file
.
metric_type_
=
table_schema
.
metric_type_
;
utils
::
GetTableFilePath
(
options_
,
table_file
);
auto
dateItr
=
files
.
find
(
table_file
.
date_
);
if
(
dateItr
==
files
.
end
())
{
files
[
table_file
.
date_
]
=
TableFilesSchema
();
}
files
[
table_file
.
date_
].
push_back
(
table_file
);
}
}
else
{
std
::
vector
<
int
>
file_type
=
{(
int
)
TableFileSchema
::
RAW
,
(
int
)
TableFileSchema
::
TO_INDEX
,
(
int
)
TableFileSchema
::
INDEX
};
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
&
TableFileSchema
::
table_id_
,
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
file_type_
,
&
TableFileSchema
::
file_size_
,
&
TableFileSchema
::
row_count_
,
&
TableFileSchema
::
date_
,
&
TableFileSchema
::
engine_type_
),
where
(
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
and
in
(
&
TableFileSchema
::
date_
,
partition
)
and
in
(
&
TableFileSchema
::
file_type_
,
file_type
)));
TableSchema
table_schema
;
table_schema
.
table_id_
=
table_id
;
auto
status
=
DescribeTable
(
table_schema
);
if
(
!
status
.
ok
())
{
return
status
;
}
TableFileSchema
table_file
;
for
(
auto
&
file
:
selected
)
{
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
table_id_
=
std
::
get
<
1
>
(
file
);
table_file
.
file_id_
=
std
::
get
<
2
>
(
file
);
table_file
.
file_type_
=
std
::
get
<
3
>
(
file
);
table_file
.
file_size_
=
std
::
get
<
4
>
(
file
);
table_file
.
row_count_
=
std
::
get
<
5
>
(
file
);
table_file
.
date_
=
std
::
get
<
6
>
(
file
);
table_file
.
engine_type_
=
std
::
get
<
7
>
(
file
);
table_file
.
dimension_
=
table_schema
.
dimension_
;
table_file
.
index_file_size_
=
table_schema
.
index_file_size_
;
table_file
.
nlist_
=
table_schema
.
nlist_
;
table_file
.
metric_type_
=
table_schema
.
metric_type_
;
utils
::
GetTableFilePath
(
options_
,
table_file
);
auto
dateItr
=
files
.
find
(
table_file
.
date_
);
if
(
dateItr
==
files
.
end
())
{
files
[
table_file
.
date_
]
=
TableFilesSchema
();
}
files
[
table_file
.
date_
].
push_back
(
table_file
);
}
}
}
catch
(
std
::
exception
&
e
)
{
return
HandleException
(
"Encounter exception when iterate index files"
,
e
);
}
return
Status
::
OK
();
}
Status
SqliteMetaImpl
::
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
...
...
cpp/src/db/meta/SqliteMetaImpl.h
浏览文件 @
28a1b252
...
...
@@ -41,7 +41,9 @@ class SqliteMetaImpl : public Meta {
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
override
;
Status
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
override
;
Status
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>
&
file_ids
)
override
;
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
override
;
...
...
@@ -57,10 +59,6 @@ class SqliteMetaImpl : public Meta {
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
...
...
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
浏览文件 @
28a1b252
...
...
@@ -201,6 +201,7 @@ DescribeTableTask::OnExecute() {
schema_
->
mutable_table_name
()
->
set_table_name
(
table_info
.
table_id_
);
schema_
->
set_dimension
(
table_info
.
dimension_
);
schema_
->
set_index_file_size
(
table_info
.
index_file_size_
);
}
catch
(
std
::
exception
&
ex
)
{
return
SetError
(
SERVER_UNEXPECTED_ERROR
,
ex
.
what
());
...
...
cpp/unittest/db/db_tests.cpp
浏览文件 @
28a1b252
...
...
@@ -307,8 +307,6 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
ASSERT_EQ
(
target_ids
.
size
(),
nb
);
}
sleep
(
2
);
engine
::
TableIndex
index
;
index
.
engine_type_
=
(
int
)
engine
::
EngineType
::
FAISS_IDMAP
;
db_
->
CreateIndex
(
TABLE_NAME
,
index
);
// wait until build index finish
...
...
cpp/unittest/db/meta_tests.cpp
浏览文件 @
28a1b252
...
...
@@ -260,17 +260,17 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
ASSERT_EQ
(
files
.
size
(),
to_index_files_cnt
);
meta
::
DatesT
dates
=
{
table_file
.
date_
};
status
=
impl_
->
FilesToSearch
(
table_id
,
dates
,
dated_files
);
std
::
vector
<
size_t
>
ids
;
status
=
impl_
->
FilesToSearch
(
table_id
,
ids
,
dates
,
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
to_index_files_cnt
+
raw_files_cnt
+
index_files_cnt
);
status
=
impl_
->
FilesToSearch
(
table_id
,
meta
::
DatesT
(),
dated_files
);
status
=
impl_
->
FilesToSearch
(
table_id
,
ids
,
meta
::
DatesT
(),
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
to_index_files_cnt
+
raw_files_cnt
+
index_files_cnt
);
std
::
vector
<
size_t
>
ids
;
status
=
impl_
->
FilesToSearch
(
table_id
,
ids
,
meta
::
DatesT
(),
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
...
...
cpp/unittest/db/mysql_meta_test.cpp
浏览文件 @
28a1b252
...
...
@@ -197,9 +197,12 @@ TEST_F(DISABLED_MySQLTest, ARCHIVE_TEST_DAYS) {
i
++
;
}
bool
has
;
status
=
impl
.
HasNonIndexFiles
(
table_id
,
has
);
ASSERT_TRUE
(
status
.
ok
());
std
::
vector
<
int
>
file_types
=
{
(
int
)
meta
::
TableFileSchema
::
NEW
,
};
std
::
vector
<
std
::
string
>
file_ids
;
status
=
impl
.
FilesByType
(
table_id
,
file_types
,
file_ids
);
ASSERT_FALSE
(
file_ids
.
empty
());
status
=
impl
.
UpdateTableFilesToIndex
(
table_id
);
ASSERT_TRUE
(
status
.
ok
());
...
...
@@ -332,17 +335,17 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILES_TEST) {
ASSERT_EQ
(
files
.
size
(),
to_index_files_cnt
);
meta
::
DatesT
dates
=
{
table_file
.
date_
};
status
=
impl
.
FilesToSearch
(
table_id
,
dates
,
dated_files
);
std
::
vector
<
size_t
>
ids
;
status
=
impl
.
FilesToSearch
(
table_id
,
ids
,
dates
,
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
to_index_files_cnt
+
raw_files_cnt
+
index_files_cnt
);
status
=
impl
.
FilesToSearch
(
table_id
,
meta
::
DatesT
(),
dated_files
);
status
=
impl
.
FilesToSearch
(
table_id
,
ids
,
meta
::
DatesT
(),
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
to_index_files_cnt
+
raw_files_cnt
+
index_files_cnt
);
std
::
vector
<
size_t
>
ids
;
status
=
impl
.
FilesToSearch
(
table_id
,
ids
,
meta
::
DatesT
(),
dated_files
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
dated_files
[
table_file
.
date_
].
size
(),
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录