Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
da6eeddb
M
milvus
项目概览
milvus
/
milvus
大约 1 年 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
da6eeddb
编写于
1月 06, 2021
作者:
S
sunby
提交者:
yefu.chen
1月 06, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add flush timestamp
Signed-off-by:
N
sunby
<
bingyi.sun@zilliz.com
>
上级
5a26f6ef
变更
26
显示空白变更内容
内联
并排
Showing
26 changed file
with
202 addition
and
432 deletion
+202
-432
internal/core/src/common/FieldMeta.h
internal/core/src/common/FieldMeta.h
+4
-4
internal/core/src/common/Schema.cpp
internal/core/src/common/Schema.cpp
+1
-1
internal/core/src/common/Types.h
internal/core/src/common/Types.h
+1
-32
internal/core/src/query/BruteForceSearch.cpp
internal/core/src/query/BruteForceSearch.cpp
+54
-22
internal/core/src/query/BruteForceSearch.h
internal/core/src/query/BruteForceSearch.h
+4
-3
internal/core/src/query/CMakeLists.txt
internal/core/src/query/CMakeLists.txt
+1
-2
internal/core/src/query/Plan.cpp
internal/core/src/query/Plan.cpp
+2
-2
internal/core/src/query/Search.cpp
internal/core/src/query/Search.cpp
+26
-27
internal/core/src/query/Search.h
internal/core/src/query/Search.h
+14
-16
internal/core/src/query/SubQueryResult.cpp
internal/core/src/query/SubQueryResult.cpp
+0
-77
internal/core/src/query/SubQueryResult.h
internal/core/src/query/SubQueryResult.h
+0
-98
internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
+2
-2
internal/core/src/query/visitors/ShowExprVisitor.cpp
internal/core/src/query/visitors/ShowExprVisitor.cpp
+2
-2
internal/core/src/segcore/SegmentBase.h
internal/core/src/segcore/SegmentBase.h
+1
-1
internal/core/src/segcore/SegmentNaive.cpp
internal/core/src/segcore/SegmentNaive.cpp
+8
-3
internal/core/src/segcore/SegmentSmallIndex.cpp
internal/core/src/segcore/SegmentSmallIndex.cpp
+10
-0
internal/core/src/segcore/reduce_c.cpp
internal/core/src/segcore/reduce_c.cpp
+2
-2
internal/core/src/segcore/segment_c.cpp
internal/core/src/segcore/segment_c.cpp
+1
-1
internal/core/src/utils/Types.h
internal/core/src/utils/Types.h
+40
-0
internal/core/unittest/CMakeLists.txt
internal/core/unittest/CMakeLists.txt
+0
-1
internal/core/unittest/test_indexing.cpp
internal/core/unittest/test_indexing.cpp
+6
-4
internal/core/unittest/test_query.cpp
internal/core/unittest/test_query.cpp
+7
-6
internal/core/unittest/test_reduce.cpp
internal/core/unittest/test_reduce.cpp
+0
-120
internal/master/flush_scheduler.go
internal/master/flush_scheduler.go
+10
-4
internal/master/master.go
internal/master/master.go
+1
-1
internal/master/persistence_scheduler_test.go
internal/master/persistence_scheduler_test.go
+5
-1
未找到文件。
internal/core/src/common/FieldMeta.h
浏览文件 @
da6eeddb
...
...
@@ -18,7 +18,7 @@
namespace
milvus
{
inline
int
datatype
_sizeof
(
DataType
data_type
,
int
dim
=
1
)
{
field
_sizeof
(
DataType
data_type
,
int
dim
=
1
)
{
switch
(
data_type
)
{
case
DataType
::
BOOL
:
return
sizeof
(
bool
);
...
...
@@ -78,7 +78,7 @@ datatype_name(DataType data_type) {
}
inline
bool
datatype
_is_vector
(
DataType
datatype
)
{
field
_is_vector
(
DataType
datatype
)
{
return
datatype
==
DataType
::
VECTOR_BINARY
||
datatype
==
DataType
::
VECTOR_FLOAT
;
}
...
...
@@ -119,9 +119,9 @@ struct FieldMeta {
int
get_sizeof
()
const
{
if
(
is_vector
())
{
return
datatype
_sizeof
(
type_
,
get_dim
());
return
field
_sizeof
(
type_
,
get_dim
());
}
else
{
return
datatype
_sizeof
(
type_
,
1
);
return
field
_sizeof
(
type_
,
1
);
}
}
...
...
internal/core/src/common/Schema.cpp
浏览文件 @
da6eeddb
...
...
@@ -50,7 +50,7 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
schema
->
primary_key_offset_opt_
=
schema
->
size
();
}
if
(
datatype
_is_vector
(
data_type
))
{
if
(
field
_is_vector
(
data_type
))
{
auto
type_map
=
RepeatedKeyValToMap
(
child
.
type_params
());
auto
index_map
=
RepeatedKeyValToMap
(
child
.
index_params
());
if
(
!
index_map
.
count
(
"metric_type"
))
{
...
...
internal/core/src/common/Types.h
浏览文件 @
da6eeddb
...
...
@@ -14,15 +14,13 @@
#include <faiss/MetricType.h>
#include <string>
#include <boost/align/aligned_allocator.hpp>
#include <memory>
#include <vector>
namespace
milvus
{
using
Timestamp
=
uint64_t
;
// TODO: use TiKV-like timestamp
using
engine
::
DataType
;
using
engine
::
FieldElementType
;
using
engine
::
idx_t
;
using
engine
::
QueryResult
;
using
MetricType
=
faiss
::
MetricType
;
MetricType
...
...
@@ -41,33 +39,4 @@ constexpr std::false_type always_false{};
template
<
typename
T
>
using
aligned_vector
=
std
::
vector
<
T
,
boost
::
alignment
::
aligned_allocator
<
T
,
512
>>
;
///////////////////////////////////////////////////////////////////////////////////////////////////
struct
QueryResult
{
QueryResult
()
=
default
;
QueryResult
(
uint64_t
num_queries
,
uint64_t
topK
)
:
topK_
(
topK
),
num_queries_
(
num_queries
)
{
auto
count
=
get_row_count
();
result_distances_
.
resize
(
count
);
internal_seg_offsets_
.
resize
(
count
);
}
[[
nodiscard
]]
uint64_t
get_row_count
()
const
{
return
topK_
*
num_queries_
;
}
public:
uint64_t
num_queries_
;
uint64_t
topK_
;
uint64_t
seg_id_
;
std
::
vector
<
float
>
result_distances_
;
public:
// TODO(gexi): utilize these field
std
::
vector
<
int64_t
>
internal_seg_offsets_
;
std
::
vector
<
int64_t
>
result_offsets_
;
std
::
vector
<
std
::
vector
<
char
>>
row_data_
;
};
using
QueryResultPtr
=
std
::
shared_ptr
<
QueryResult
>
;
}
// namespace milvus
internal/core/src/query/
SearchBruteForce
.cpp
→
internal/core/src/query/
BruteForceSearch
.cpp
浏览文件 @
da6eeddb
...
...
@@ -9,28 +9,68 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "
SearchBruteForce
.h"
#include "
BruteForceSearch
.h"
#include <vector>
#include <common/Types.h>
#include <boost/dynamic_bitset.hpp>
#include <queue>
#include "SubQueryResult.h"
namespace
milvus
::
query
{
SubQueryResult
BinarySearchBruteForce
Fast
(
MetricType
metric_type
,
void
BinarySearchBruteForce
Naive
(
MetricType
metric_type
,
int64_t
code_size
,
const
uint8_t
*
binary_chunk
,
int64_t
chunk_size
,
int64_t
topk
,
int64_t
num_queries
,
const
uint8_t
*
query_data
,
const
faiss
::
BitsetView
&
bitset
)
{
SubQueryResult
sub_result
(
num_queries
,
topk
,
metric_type
);
float
*
result_distances
=
sub_result
.
get_values
();
idx_t
*
result_labels
=
sub_result
.
get_labels
();
float
*
result_distances
,
idx_t
*
result_labels
,
faiss
::
ConcurrentBitsetPtr
bitset
)
{
// THIS IS A NAIVE IMPLEMENTATION, ready for optimize
Assert
(
metric_type
==
faiss
::
METRIC_Jaccard
);
Assert
(
code_size
%
4
==
0
);
using
T
=
std
::
tuple
<
float
,
int
>
;
for
(
int64_t
q
=
0
;
q
<
num_queries
;
++
q
)
{
auto
query_ptr
=
query_data
+
code_size
*
q
;
auto
query
=
boost
::
dynamic_bitset
(
query_ptr
,
query_ptr
+
code_size
);
std
::
vector
<
T
>
max_heap
(
topk
+
1
,
std
::
make_tuple
(
std
::
numeric_limits
<
float
>::
max
(),
-
1
));
for
(
int64_t
i
=
0
;
i
<
chunk_size
;
++
i
)
{
auto
element_ptr
=
binary_chunk
+
code_size
*
i
;
auto
element
=
boost
::
dynamic_bitset
(
element_ptr
,
element_ptr
+
code_size
);
auto
the_and
=
(
query
&
element
).
count
();
auto
the_or
=
(
query
|
element
).
count
();
auto
distance
=
the_or
?
(
float
)(
the_or
-
the_and
)
/
the_or
:
0
;
if
(
distance
<
std
::
get
<
0
>
(
max_heap
[
0
]))
{
max_heap
[
topk
]
=
std
::
make_tuple
(
distance
,
i
);
std
::
push_heap
(
max_heap
.
begin
(),
max_heap
.
end
());
std
::
pop_heap
(
max_heap
.
begin
(),
max_heap
.
end
());
}
}
std
::
sort
(
max_heap
.
begin
(),
max_heap
.
end
());
for
(
int
k
=
0
;
k
<
topk
;
++
k
)
{
auto
info
=
max_heap
[
k
];
result_distances
[
k
+
q
*
topk
]
=
std
::
get
<
0
>
(
info
);
result_labels
[
k
+
q
*
topk
]
=
std
::
get
<
1
>
(
info
);
}
}
}
void
BinarySearchBruteForceFast
(
MetricType
metric_type
,
int64_t
code_size
,
const
uint8_t
*
binary_chunk
,
int64_t
chunk_size
,
int64_t
topk
,
int64_t
num_queries
,
const
uint8_t
*
query_data
,
float
*
result_distances
,
idx_t
*
result_labels
,
faiss
::
ConcurrentBitsetPtr
bitset
)
{
const
idx_t
block_size
=
chunk_size
;
bool
use_heap
=
true
;
...
...
@@ -92,26 +132,18 @@ BinarySearchBruteForceFast(MetricType metric_type,
}
else
{
PanicInfo
(
"Unsupported metric type"
);
}
return
sub_result
;
}
void
FloatSearchBruteForceFast
(
MetricType
metric_type
,
const
float
*
chunk_data
,
int64_t
chunk_size
,
float
*
result_distances
,
idx_t
*
result_labels
,
const
faiss
::
BitsetView
&
bitset
)
{
// TODO
}
SubQueryResult
BinarySearchBruteForce
(
const
dataset
::
BinaryQueryDataset
&
query_dataset
,
const
uint8_t
*
binary_chunk
,
int64_t
chunk_size
,
const
faiss
::
BitsetView
&
bitset
)
{
float
*
result_distances
,
idx_t
*
result_labels
,
faiss
::
ConcurrentBitsetPtr
bitset
)
{
// TODO: refactor the internal function
return
BinarySearchBruteForceFast
(
query_dataset
.
metric_type
,
query_dataset
.
code_size
,
binary_chunk
,
chunk_size
,
query_dataset
.
topk
,
query_dataset
.
num_queries
,
query_dataset
.
query_data
,
bitset
);
BinarySearchBruteForceFast
(
query_dataset
.
metric_type
,
query_dataset
.
code_size
,
binary_chunk
,
chunk_size
,
query_dataset
.
topk
,
query_dataset
.
num_queries
,
query_dataset
.
query_data
,
result_distances
,
result_labels
,
bitset
);
}
}
// namespace milvus::query
internal/core/src/query/
SearchBruteForce
.h
→
internal/core/src/query/
BruteForceSearch
.h
浏览文件 @
da6eeddb
...
...
@@ -13,7 +13,6 @@
#include <faiss/utils/BinaryDistance.h>
#include "segcore/ConcurrentVector.h"
#include "common/Schema.h"
#include "query/SubQueryResult.h"
namespace
milvus
::
query
{
using
MetricType
=
faiss
::
MetricType
;
...
...
@@ -29,10 +28,12 @@ struct BinaryQueryDataset {
}
// namespace dataset
SubQueryResult
void
BinarySearchBruteForce
(
const
dataset
::
BinaryQueryDataset
&
query_dataset
,
const
uint8_t
*
binary_chunk
,
int64_t
chunk_size
,
const
faiss
::
BitsetView
&
bitset
=
nullptr
);
float
*
result_distances
,
idx_t
*
result_labels
,
faiss
::
ConcurrentBitsetPtr
bitset
=
nullptr
);
}
// namespace milvus::query
internal/core/src/query/CMakeLists.txt
浏览文件 @
da6eeddb
...
...
@@ -12,8 +12,7 @@ set(MILVUS_QUERY_SRCS
Plan.cpp
Search.cpp
SearchOnSealed.cpp
SearchBruteForce.cpp
SubQueryResult.cpp
BruteForceSearch.cpp
)
add_library
(
milvus_query
${
MILVUS_QUERY_SRCS
}
)
target_link_libraries
(
milvus_query milvus_proto milvus_utils knowhere
)
internal/core/src/query/Plan.cpp
浏览文件 @
da6eeddb
...
...
@@ -107,7 +107,7 @@ Parser::ParseRangeNode(const Json& out_body) {
auto
field_name
=
out_iter
.
key
();
auto
body
=
out_iter
.
value
();
auto
data_type
=
schema
[
field_name
].
get_data_type
();
Assert
(
!
datatype
_is_vector
(
data_type
));
Assert
(
!
field
_is_vector
(
data_type
));
switch
(
data_type
)
{
case
DataType
::
BOOL
:
...
...
@@ -155,7 +155,7 @@ Parser::ParseTermNode(const Json& out_body) {
auto
field_name
=
out_iter
.
key
();
auto
body
=
out_iter
.
value
();
auto
data_type
=
schema
[
field_name
].
get_data_type
();
Assert
(
!
datatype
_is_vector
(
data_type
));
Assert
(
!
field
_is_vector
(
data_type
));
switch
(
data_type
)
{
case
DataType
::
BOOL
:
{
return
ParseTermNodeImpl
<
bool
>
(
field_name
,
body
);
...
...
internal/core/src/query/Search.cpp
浏览文件 @
da6eeddb
...
...
@@ -16,7 +16,7 @@
#include <faiss/utils/distances.h>
#include "utils/tools.h"
#include "query/
SearchBruteForce
.h"
#include "query/
BruteForceSearch
.h"
namespace
milvus
::
query
{
...
...
@@ -34,7 +34,7 @@ create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk
}
Status
FloatSearch
(
const
segcore
::
SegmentSmallIndex
&
segment
,
QueryBruteForceImpl
(
const
segcore
::
SegmentSmallIndex
&
segment
,
const
query
::
QueryInfo
&
info
,
const
float
*
query_data
,
int64_t
num_queries
,
...
...
@@ -75,7 +75,6 @@ FloatSearch(const segcore::SegmentSmallIndex& segment,
const
auto
&
indexing_entry
=
indexing_record
.
get_vec_entry
(
vecfield_offset
);
auto
search_conf
=
indexing_entry
.
get_search_conf
(
topK
);
// TODO: use sub_qr
for
(
int
chunk_id
=
0
;
chunk_id
<
max_indexed_id
;
++
chunk_id
)
{
auto
indexing
=
indexing_entry
.
get_vec_indexing
(
chunk_id
);
auto
dataset
=
knowhere
::
GenDataset
(
num_queries
,
dim
,
query_data
);
...
...
@@ -100,12 +99,10 @@ FloatSearch(const segcore::SegmentSmallIndex& segment,
Assert
(
vec_chunk_size
==
indexing_entry
.
get_chunk_size
());
auto
max_chunk
=
upper_div
(
ins_barrier
,
vec_chunk_size
);
// TODO: use sub_qr
for
(
int
chunk_id
=
max_indexed_id
;
chunk_id
<
max_chunk
;
++
chunk_id
)
{
std
::
vector
<
int64_t
>
buf_uids
(
total_count
,
-
1
);
std
::
vector
<
float
>
buf_dis
(
total_count
,
std
::
numeric_limits
<
float
>::
max
());
// should be not visitable
faiss
::
float_maxheap_array_t
buf
=
{(
size_t
)
num_queries
,
(
size_t
)
topK
,
buf_uids
.
data
(),
buf_dis
.
data
()};
auto
&
chunk
=
vec_ptr
->
get_chunk
(
chunk_id
);
...
...
@@ -115,7 +112,6 @@ FloatSearch(const segcore::SegmentSmallIndex& segment,
auto
nsize
=
element_end
-
element_begin
;
auto
bitmap_view
=
create_bitmap_view
(
bitmaps_opt
,
chunk_id
);
// TODO: make it wrapped
faiss
::
knn_L2sqr
(
query_data
,
chunk
.
data
(),
dim
,
num_queries
,
nsize
,
&
buf
,
bitmap_view
);
Assert
(
buf_uids
.
size
()
==
total_count
);
...
...
@@ -138,7 +134,7 @@ FloatSearch(const segcore::SegmentSmallIndex& segment,
}
Status
Binary
Search
(
const
segcore
::
SegmentSmallIndex
&
segment
,
Binary
QueryBruteForceImpl
(
const
segcore
::
SegmentSmallIndex
&
segment
,
const
query
::
QueryInfo
&
info
,
const
uint8_t
*
query_data
,
int64_t
num_queries
,
...
...
@@ -173,8 +169,8 @@ BinarySearch(const segcore::SegmentSmallIndex& segment,
auto
total_count
=
topK
*
num_queries
;
// step 3: small indexing search
// TODO: this is too intrusive
// TODO: use QuerySubResult instead
std
::
vector
<
int64_t
>
final_uids
(
total_count
,
-
1
);
std
::
vector
<
float
>
final_dis
(
total_count
,
std
::
numeric_limits
<
float
>::
max
());
query
::
dataset
::
BinaryQueryDataset
query_dataset
{
metric_type
,
num_queries
,
topK
,
code_size
,
query_data
};
using
segcore
::
BinaryVector
;
...
...
@@ -185,27 +181,30 @@ BinarySearch(const segcore::SegmentSmallIndex& segment,
auto
vec_chunk_size
=
vec_ptr
->
get_chunk_size
();
auto
max_chunk
=
upper_div
(
ins_barrier
,
vec_chunk_size
);
SubQueryResult
final_result
(
num_queries
,
topK
,
metric_type
);
for
(
int
chunk_id
=
max_indexed_id
;
chunk_id
<
max_chunk
;
++
chunk_id
)
{
std
::
vector
<
int64_t
>
buf_uids
(
total_count
,
-
1
);
std
::
vector
<
float
>
buf_dis
(
total_count
,
std
::
numeric_limits
<
float
>::
max
());
auto
&
chunk
=
vec_ptr
->
get_chunk
(
chunk_id
);
auto
element_begin
=
chunk_id
*
vec_chunk_size
;
auto
element_end
=
std
::
min
(
ins_barrier
,
(
chunk_id
+
1
)
*
vec_chunk_size
);
auto
nsize
=
element_end
-
element_begin
;
auto
bitmap_view
=
create_bitmap_view
(
bitmaps_opt
,
chunk_id
);
auto
sub_result
=
BinarySearchBruteForce
(
query_dataset
,
chunk
.
data
(),
nsize
,
bitmap_view
);
BinarySearchBruteForce
(
query_dataset
,
chunk
.
data
(),
nsize
,
buf_dis
.
data
(),
buf_uids
.
data
()
,
bitmap_view
);
// convert chunk uid to segment uid
for
(
auto
&
x
:
sub_result
.
mutable_labels
()
)
{
for
(
auto
&
x
:
buf_uids
)
{
if
(
x
!=
-
1
)
{
x
+=
chunk_id
*
vec_chunk_size
;
}
}
final_result
.
merge
(
sub_result
);
segcore
::
merge_into
(
num_queries
,
topK
,
final_dis
.
data
(),
final_uids
.
data
(),
buf_dis
.
data
(),
buf_uids
.
data
());
}
results
.
result_distances_
=
std
::
move
(
final_
result
.
mutable_values
()
);
results
.
internal_seg_offsets_
=
std
::
move
(
final_
result
.
mutable_labels
()
);
results
.
result_distances_
=
std
::
move
(
final_
dis
);
results
.
internal_seg_offsets_
=
std
::
move
(
final_
uids
);
results
.
topK_
=
topK
;
results
.
num_queries_
=
num_queries
;
...
...
internal/core/src/query/Search.h
浏览文件 @
da6eeddb
...
...
@@ -14,16 +14,14 @@
#include "segcore/SegmentSmallIndex.h"
#include <deque>
#include <boost/dynamic_bitset.hpp>
#include "query/SubQueryResult.h"
namespace
milvus
::
query
{
using
BitmapChunk
=
boost
::
dynamic_bitset
<>
;
using
BitmapSimple
=
std
::
deque
<
BitmapChunk
>
;
// TODO: merge these two search into one
// note: c++17 don't support optional ref
Status
FloatSearch
(
const
segcore
::
SegmentSmallIndex
&
segment
,
QueryBruteForceImpl
(
const
segcore
::
SegmentSmallIndex
&
segment
,
const
QueryInfo
&
info
,
const
float
*
query_data
,
int64_t
num_queries
,
...
...
@@ -32,7 +30,7 @@ FloatSearch(const segcore::SegmentSmallIndex& segment,
QueryResult
&
results
);
Status
Binary
Search
(
const
segcore
::
SegmentSmallIndex
&
segment
,
Binary
QueryBruteForceImpl
(
const
segcore
::
SegmentSmallIndex
&
segment
,
const
query
::
QueryInfo
&
info
,
const
uint8_t
*
query_data
,
int64_t
num_queries
,
...
...
internal/core/src/query/SubQueryResult.cpp
已删除
100644 → 0
浏览文件 @
5a26f6ef
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "utils/EasyAssert.h"
#include "query/SubQueryResult.h"
#include "segcore/Reduce.h"
namespace
milvus
::
query
{
template
<
bool
is_desc
>
void
SubQueryResult
::
merge_impl
(
const
SubQueryResult
&
right
)
{
Assert
(
num_queries_
==
right
.
num_queries_
);
Assert
(
topk_
==
right
.
topk_
);
Assert
(
metric_type_
==
right
.
metric_type_
);
Assert
(
is_desc
==
is_descending
(
metric_type_
));
for
(
int64_t
qn
=
0
;
qn
<
num_queries_
;
++
qn
)
{
auto
offset
=
qn
*
topk_
;
int64_t
*
__restrict__
left_labels
=
this
->
get_labels
()
+
offset
;
float
*
__restrict__
left_values
=
this
->
get_values
()
+
offset
;
auto
right_labels
=
right
.
get_labels
()
+
offset
;
auto
right_values
=
right
.
get_values
()
+
offset
;
std
::
vector
<
float
>
buf_values
(
topk_
);
std
::
vector
<
int64_t
>
buf_labels
(
topk_
);
auto
lit
=
0
;
// left iter
auto
rit
=
0
;
// right iter
for
(
auto
buf_iter
=
0
;
buf_iter
<
topk_
;
++
buf_iter
)
{
auto
left_v
=
left_values
[
lit
];
auto
right_v
=
right_values
[
rit
];
// optimize out at compiling
if
(
is_desc
?
(
left_v
>=
right_v
)
:
(
left_v
<=
right_v
))
{
buf_values
[
buf_iter
]
=
left_values
[
lit
];
buf_labels
[
buf_iter
]
=
left_labels
[
lit
];
++
lit
;
}
else
{
buf_values
[
buf_iter
]
=
right_values
[
rit
];
buf_labels
[
buf_iter
]
=
right_labels
[
rit
];
++
rit
;
}
}
std
::
copy_n
(
buf_values
.
data
(),
topk_
,
left_values
);
std
::
copy_n
(
buf_labels
.
data
(),
topk_
,
left_labels
);
}
}
void
SubQueryResult
::
merge
(
const
SubQueryResult
&
sub_result
)
{
Assert
(
metric_type_
==
sub_result
.
metric_type_
);
if
(
is_descending
(
metric_type_
))
{
this
->
merge_impl
<
true
>
(
sub_result
);
}
else
{
this
->
merge_impl
<
false
>
(
sub_result
);
}
}
SubQueryResult
SubQueryResult
::
merge
(
const
SubQueryResult
&
left
,
const
SubQueryResult
&
right
)
{
auto
left_copy
=
left
;
left_copy
.
merge
(
right
);
return
left_copy
;
}
}
// namespace milvus::query
internal/core/src/query/SubQueryResult.h
已删除
100644 → 0
浏览文件 @
5a26f6ef
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "common/Types.h"
#include <limits>
#include <vector>
namespace
milvus
::
query
{
class
SubQueryResult
{
public:
SubQueryResult
(
int64_t
num_queries
,
int64_t
topk
,
MetricType
metric_type
)
:
metric_type_
(
metric_type
),
num_queries_
(
num_queries
),
topk_
(
topk
),
labels_
(
num_queries
*
topk
,
-
1
),
values_
(
num_queries
*
topk
,
init_value
(
metric_type
))
{
}
public:
static
constexpr
float
init_value
(
MetricType
metric_type
)
{
return
(
is_descending
(
metric_type
)
?
-
1
:
1
)
*
std
::
numeric_limits
<
float
>::
max
();
}
static
constexpr
bool
is_descending
(
MetricType
metric_type
)
{
// TODO
if
(
metric_type
==
MetricType
::
METRIC_INNER_PRODUCT
)
{
return
true
;
}
else
{
return
false
;
}
}
public:
int64_t
get_num_queries
()
const
{
return
num_queries_
;
}
int64_t
get_topk
()
const
{
return
topk_
;
}
const
int64_t
*
get_labels
()
const
{
return
labels_
.
data
();
}
int64_t
*
get_labels
()
{
return
labels_
.
data
();
}
const
float
*
get_values
()
const
{
return
values_
.
data
();
}
float
*
get_values
()
{
return
values_
.
data
();
}
auto
&
mutable_labels
()
{
return
labels_
;
}
auto
&
mutable_values
()
{
return
values_
;
}
static
SubQueryResult
merge
(
const
SubQueryResult
&
left
,
const
SubQueryResult
&
right
);
void
merge
(
const
SubQueryResult
&
sub_result
);
private:
template
<
bool
is_desc
>
void
merge_impl
(
const
SubQueryResult
&
sub_result
);
private:
int64_t
num_queries_
;
int64_t
topk_
;
MetricType
metric_type_
;
std
::
vector
<
int64_t
>
labels_
;
std
::
vector
<
float
>
values_
;
};
}
// namespace milvus::query
internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
浏览文件 @
da6eeddb
...
...
@@ -79,7 +79,7 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
SearchOnSealed
(
segment
->
get_schema
(),
sealed_indexing
,
node
.
query_info_
,
src_data
,
num_queries
,
timestamp_
,
bitset_pack
,
ret
);
}
else
{
FloatSearch
(
*
segment
,
node
.
query_info_
,
src_data
,
num_queries
,
timestamp_
,
bitset_pack
,
ret
);
QueryBruteForceImpl
(
*
segment
,
node
.
query_info_
,
src_data
,
num_queries
,
timestamp_
,
bitset_pack
,
ret
);
}
ret_
=
ret
;
...
...
@@ -104,7 +104,7 @@ ExecPlanNodeVisitor::visit(BinaryVectorANNS& node) {
bitset_pack
=
&
bitmap_holder
;
}
Binary
Search
(
*
segment
,
node
.
query_info_
,
src_data
,
num_queries
,
timestamp_
,
bitset_pack
,
ret
);
Binary
QueryBruteForceImpl
(
*
segment
,
node
.
query_info_
,
src_data
,
num_queries
,
timestamp_
,
bitset_pack
,
ret
);
ret_
=
ret
;
}
...
...
internal/core/src/query/visitors/ShowExprVisitor.cpp
浏览文件 @
da6eeddb
...
...
@@ -112,7 +112,7 @@ TermExtract(const TermExpr& expr_raw) {
void
ShowExprVisitor
::
visit
(
TermExpr
&
expr
)
{
Assert
(
!
ret_
.
has_value
());
Assert
(
datatype
_is_vector
(
expr
.
data_type_
)
==
false
);
Assert
(
field
_is_vector
(
expr
.
data_type_
)
==
false
);
auto
terms
=
[
&
]
{
switch
(
expr
.
data_type_
)
{
case
DataType
::
BOOL
:
...
...
@@ -161,7 +161,7 @@ ConditionExtract(const RangeExpr& expr_raw) {
void
ShowExprVisitor
::
visit
(
RangeExpr
&
expr
)
{
Assert
(
!
ret_
.
has_value
());
Assert
(
datatype
_is_vector
(
expr
.
data_type_
)
==
false
);
Assert
(
field
_is_vector
(
expr
.
data_type_
)
==
false
);
auto
conditions
=
[
&
]
{
switch
(
expr
.
data_type_
)
{
case
DataType
::
BOOL
:
...
...
internal/core/src/segcore/SegmentBase.h
浏览文件 @
da6eeddb
...
...
@@ -13,7 +13,7 @@
#include <vector>
#include "IndexMeta.h"
#include "
common
/Types.h"
#include "
utils
/Types.h"
#include "common/Schema.h"
#include <memory>
...
...
internal/core/src/segcore/SegmentNaive.cpp
浏览文件 @
da6eeddb
...
...
@@ -274,14 +274,19 @@ SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestam
auto
distances
=
final
->
Get
<
float
*>
(
knowhere
::
meta
::
DISTANCE
);
auto
total_num
=
num_queries
*
topK
;
result
.
result_ids_
.
resize
(
total_num
);
result
.
result_distances_
.
resize
(
total_num
);
result
.
num_queries_
=
num_queries
;
result
.
topK_
=
topK
;
std
::
copy_n
(
ids
,
total_num
,
result
.
internal_seg_offset
s_
.
data
());
std
::
copy_n
(
ids
,
total_num
,
result
.
result_id
s_
.
data
());
std
::
copy_n
(
distances
,
total_num
,
result
.
result_distances_
.
data
());
for
(
auto
&
id
:
result
.
result_ids_
)
{
id
=
record_
.
uids_
[
id
];
}
return
Status
::
OK
();
}
...
...
@@ -342,7 +347,7 @@ SegmentNaive::QuerySlowImpl(query::QueryDeprecatedPtr query_info, Timestamp time
result
.
topK_
=
topK
;
auto
row_num
=
topK
*
num_queries
;
result
.
internal_seg_offset
s_
.
resize
(
row_num
);
result
.
result_id
s_
.
resize
(
row_num
);
result
.
result_distances_
.
resize
(
row_num
);
for
(
int
q_id
=
0
;
q_id
<
num_queries
;
++
q_id
)
{
...
...
@@ -351,7 +356,7 @@ SegmentNaive::QuerySlowImpl(query::QueryDeprecatedPtr query_info, Timestamp time
auto
dst_id
=
topK
-
1
-
i
+
q_id
*
topK
;
auto
[
dis
,
offset
]
=
records
[
q_id
].
top
();
records
[
q_id
].
pop
();
result
.
internal_seg_offsets_
[
dst_id
]
=
offset
;
result
.
result_ids_
[
dst_id
]
=
record_
.
uids_
[
offset
]
;
result
.
result_distances_
[
dst_id
]
=
dis
;
}
}
...
...
internal/core/src/segcore/SegmentSmallIndex.cpp
浏览文件 @
da6eeddb
...
...
@@ -349,12 +349,19 @@ SegmentSmallIndex::FillTargetEntry(const query::Plan* plan, QueryResult& results
Assert
(
results
.
result_offsets_
.
size
()
==
size
);
Assert
(
results
.
row_data_
.
size
()
==
0
);
// TODO: deprecate
results
.
result_ids_
.
clear
();
results
.
result_ids_
.
resize
(
size
);
if
(
plan
->
schema_
.
get_is_auto_id
())
{
auto
&
uids
=
record_
.
uids_
;
for
(
int64_t
i
=
0
;
i
<
size
;
++
i
)
{
auto
seg_offset
=
results
.
internal_seg_offsets_
[
i
];
auto
row_id
=
seg_offset
==
-
1
?
-
1
:
uids
[
seg_offset
];
// TODO: deprecate
results
.
result_ids_
[
i
]
=
row_id
;
std
::
vector
<
char
>
blob
(
sizeof
(
row_id
));
memcpy
(
blob
.
data
(),
&
row_id
,
sizeof
(
row_id
));
results
.
row_data_
.
emplace_back
(
std
::
move
(
blob
));
...
...
@@ -370,6 +377,9 @@ SegmentSmallIndex::FillTargetEntry(const query::Plan* plan, QueryResult& results
auto
seg_offset
=
results
.
internal_seg_offsets_
[
i
];
auto
row_id
=
seg_offset
==
-
1
?
-
1
:
uids
->
operator
[](
seg_offset
);
// TODO: deprecate
results
.
result_ids_
[
i
]
=
row_id
;
std
::
vector
<
char
>
blob
(
sizeof
(
row_id
));
memcpy
(
blob
.
data
(),
&
row_id
,
sizeof
(
row_id
));
results
.
row_data_
.
emplace_back
(
std
::
move
(
blob
));
...
...
internal/core/src/segcore/reduce_c.cpp
浏览文件 @
da6eeddb
...
...
@@ -14,10 +14,10 @@
#include "segcore/reduce_c.h"
#include "segcore/Reduce.h"
#include "
common
/Types.h"
#include "
utils
/Types.h"
#include "pb/service_msg.pb.h"
using
SearchResult
=
milvus
::
QueryResult
;
using
SearchResult
=
milvus
::
engine
::
QueryResult
;
int
MergeInto
(
int64_t
num_queries
,
int64_t
topk
,
float
*
distances
,
int64_t
*
uids
,
float
*
new_distances
,
int64_t
*
new_uids
)
{
...
...
internal/core/src/segcore/segment_c.cpp
浏览文件 @
da6eeddb
...
...
@@ -165,7 +165,7 @@ CStatus
FillTargetEntry
(
CSegmentBase
c_segment
,
CPlan
c_plan
,
CQueryResult
c_result
)
{
auto
segment
=
(
milvus
::
segcore
::
SegmentBase
*
)
c_segment
;
auto
plan
=
(
milvus
::
query
::
Plan
*
)
c_plan
;
auto
result
=
(
milvus
::
QueryResult
*
)
c_result
;
auto
result
=
(
milvus
::
engine
::
QueryResult
*
)
c_result
;
auto
status
=
CStatus
();
try
{
...
...
internal/core/src/utils/Types.h
浏览文件 @
da6eeddb
...
...
@@ -136,5 +136,45 @@ struct AttrsData {
IDNumbers
id_array_
;
};
///////////////////////////////////////////////////////////////////////////////////////////////////
struct
QueryResult
{
QueryResult
()
=
default
;
QueryResult
(
uint64_t
num_queries
,
uint64_t
topK
)
:
topK_
(
topK
),
num_queries_
(
num_queries
)
{
auto
count
=
get_row_count
();
result_distances_
.
resize
(
count
);
internal_seg_offsets_
.
resize
(
count
);
// TODO: deprecated
result_ids_
.
resize
(
count
);
}
[[
nodiscard
]]
uint64_t
get_row_count
()
const
{
return
topK_
*
num_queries_
;
}
uint64_t
num_queries_
;
uint64_t
topK_
;
// uint64_t total_row_count_; // total_row_count_ = topK * num_queries_
// vector<tuple<Score, SegId, Offset>> data_reduced;
// vector<tuple<Score, SegId, Offset, RawData>>
// map<SegId, vector<tuple<DataOffset, ResLoc>>>
uint64_t
seg_id_
;
std
::
vector
<
float
>
result_distances_
;
// TODO(gexi): utilize these field
std
::
vector
<
int64_t
>
internal_seg_offsets_
;
std
::
vector
<
int64_t
>
result_offsets_
;
std
::
vector
<
std
::
vector
<
char
>>
row_data_
;
// TODO: deprecated, use row_data directly
std
::
vector
<
idx_t
>
result_ids_
;
};
using
QueryResultPtr
=
std
::
shared_ptr
<
QueryResult
>
;
}
// namespace engine
}
// namespace milvus
internal/core/unittest/CMakeLists.txt
浏览文件 @
da6eeddb
...
...
@@ -14,7 +14,6 @@ set(MILVUS_TEST_FILES
test_binary.cpp
test_index_wrapper.cpp
test_sealed.cpp
test_reduce.cpp
)
add_executable
(
all_tests
${
MILVUS_TEST_FILES
}
...
...
internal/core/unittest/test_indexing.cpp
浏览文件 @
da6eeddb
...
...
@@ -33,7 +33,7 @@
#include "test_utils/Timer.h"
#include "segcore/Reduce.h"
#include "test_utils/DataGen.h"
#include "query/
SearchBruteForce
.h"
#include "query/
BruteForceSearch
.h"
using
std
::
cin
;
using
std
::
cout
;
...
...
@@ -245,6 +245,8 @@ TEST(Indexing, BinaryBruteForce) {
schema
->
AddField
(
"vecbin"
,
DataType
::
VECTOR_BINARY
,
dim
,
MetricType
::
METRIC_Jaccard
);
schema
->
AddField
(
"age"
,
DataType
::
INT64
);
auto
dataset
=
DataGen
(
schema
,
N
,
10
);
vector
<
float
>
distances
(
result_count
);
vector
<
int64_t
>
ids
(
result_count
);
auto
bin_vec
=
dataset
.
get_col
<
uint8_t
>
(
0
);
auto
line_sizeof
=
schema
->
operator
[](
0
).
get_sizeof
();
auto
query_data
=
1024
*
line_sizeof
+
bin_vec
.
data
();
...
...
@@ -256,13 +258,13 @@ TEST(Indexing, BinaryBruteForce) {
query_data
//
};
auto
sub_result
=
query
::
BinarySearchBruteForce
(
query_dataset
,
bin_vec
.
data
(),
N
);
query
::
BinarySearchBruteForce
(
query_dataset
,
bin_vec
.
data
(),
N
,
distances
.
data
(),
ids
.
data
()
);
QueryResult
qr
;
qr
.
num_queries_
=
num_queries
;
qr
.
topK_
=
topk
;
qr
.
internal_seg_offsets_
=
std
::
move
(
sub_result
.
mutable_labels
())
;
qr
.
result_distances_
=
std
::
move
(
sub_result
.
mutable_values
())
;
qr
.
internal_seg_offsets_
=
ids
;
qr
.
result_distances_
=
distances
;
auto
json
=
QueryResultToJson
(
qr
);
auto
ref
=
json
::
parse
(
R"(
...
...
internal/core/unittest/test_query.cpp
浏览文件 @
da6eeddb
...
...
@@ -402,7 +402,7 @@ TEST(Query, FillSegment) {
pb
::
schema
::
CollectionSchema
proto
;
proto
.
set_name
(
"col"
);
proto
.
set_description
(
"asdfhsalkgfhsadg"
);
proto
.
set_autoid
(
fals
e
);
proto
.
set_autoid
(
tru
e
);
{
auto
field
=
proto
.
add_fields
();
...
...
@@ -425,7 +425,7 @@ TEST(Query, FillSegment) {
field
->
set_fieldid
(
101
);
field
->
set_is_primary_key
(
true
);
field
->
set_description
(
"asdgfsagf"
);
field
->
set_data_type
(
pb
::
schema
::
DataType
::
INT
64
);
field
->
set_data_type
(
pb
::
schema
::
DataType
::
INT
32
);
}
auto
schema
=
Schema
::
ParseFrom
(
proto
);
...
...
@@ -466,17 +466,18 @@ TEST(Query, FillSegment) {
result
.
result_offsets_
.
resize
(
topk
*
num_queries
);
segment
->
FillTargetEntry
(
plan
.
get
(),
result
);
// TODO: deprecated result_ids_
ASSERT_EQ
(
result
.
result_ids_
,
result
.
internal_seg_offsets_
);
auto
ans
=
result
.
row_data_
;
ASSERT_EQ
(
ans
.
size
(),
topk
*
num_queries
);
int64_t
std_index
=
0
;
auto
std_vec
=
dataset
.
get_col
<
int64_t
>
(
1
);
for
(
auto
&
vec
:
ans
)
{
ASSERT_EQ
(
vec
.
size
(),
sizeof
(
int64_t
));
int64_t
val
;
memcpy
(
&
val
,
vec
.
data
(),
sizeof
(
int64_t
));
auto
internal_offset
=
result
.
internal_seg_offsets_
[
std_index
];
auto
std_val
=
std_vec
[
internal_offset
];
ASSERT_EQ
(
val
,
std_val
)
<<
"io:"
<<
internal_offset
;
auto
std_val
=
result
.
result_ids_
[
std_index
];
ASSERT_EQ
(
val
,
std_val
);
++
std_index
;
}
}
...
...
internal/core/unittest/test_reduce.cpp
已删除
100644 → 0
浏览文件 @
5a26f6ef
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "query/SubQueryResult.h"
#include <vector>
#include <queue>
#include <random>
using
namespace
milvus
;
using
namespace
milvus
::
query
;
TEST
(
Reduce
,
SubQueryResult
)
{
int64_t
num_queries
=
512
;
int64_t
topk
=
32
;
int64_t
iteration
=
50
;
constexpr
int64_t
limit
=
100000000L
;
auto
metric_type
=
MetricType
::
METRIC_L2
;
using
queue_type
=
std
::
priority_queue
<
int64_t
>
;
std
::
vector
<
queue_type
>
ref_results
(
num_queries
);
for
(
auto
&
ref_result
:
ref_results
)
{
for
(
int
i
=
0
;
i
<
topk
;
++
i
)
{
ref_result
.
push
(
limit
);
}
}
std
::
default_random_engine
e
(
42
);
SubQueryResult
final_result
(
num_queries
,
topk
,
metric_type
);
for
(
int
i
=
0
;
i
<
iteration
;
++
i
)
{
std
::
vector
<
int64_t
>
labels
;
std
::
vector
<
float
>
values
;
for
(
int
n
=
0
;
n
<
num_queries
;
++
n
)
{
for
(
int
k
=
0
;
k
<
topk
;
++
k
)
{
auto
gen_x
=
e
()
%
limit
;
ref_results
[
n
].
push
(
gen_x
);
ref_results
[
n
].
pop
();
labels
.
push_back
(
gen_x
);
values
.
push_back
(
gen_x
);
}
std
::
sort
(
labels
.
begin
()
+
n
*
topk
,
labels
.
begin
()
+
n
*
topk
+
topk
);
std
::
sort
(
values
.
begin
()
+
n
*
topk
,
values
.
begin
()
+
n
*
topk
+
topk
);
}
SubQueryResult
sub_result
(
num_queries
,
topk
,
metric_type
);
sub_result
.
mutable_values
()
=
values
;
sub_result
.
mutable_labels
()
=
labels
;
final_result
.
merge
(
sub_result
);
}
for
(
int
n
=
0
;
n
<
num_queries
;
++
n
)
{
ASSERT_EQ
(
ref_results
[
n
].
size
(),
topk
);
for
(
int
k
=
0
;
k
<
topk
;
++
k
)
{
auto
ref_x
=
ref_results
[
n
].
top
();
ref_results
[
n
].
pop
();
auto
index
=
n
*
topk
+
topk
-
1
-
k
;
auto
label
=
final_result
.
get_labels
()[
index
];
auto
value
=
final_result
.
get_values
()[
index
];
ASSERT_EQ
(
label
,
ref_x
);
ASSERT_EQ
(
value
,
ref_x
);
}
}
}
TEST
(
Reduce
,
SubQueryResultDesc
)
{
int64_t
num_queries
=
512
;
int64_t
topk
=
32
;
int64_t
iteration
=
50
;
constexpr
int64_t
limit
=
100000000L
;
constexpr
int64_t
init_value
=
0
;
auto
metric_type
=
MetricType
::
METRIC_INNER_PRODUCT
;
using
queue_type
=
std
::
priority_queue
<
int64_t
,
std
::
vector
<
int64_t
>
,
std
::
greater
<
int64_t
>>
;
std
::
vector
<
queue_type
>
ref_results
(
num_queries
);
for
(
auto
&
ref_result
:
ref_results
)
{
for
(
int
i
=
0
;
i
<
topk
;
++
i
)
{
ref_result
.
push
(
init_value
);
}
}
std
::
default_random_engine
e
(
42
);
SubQueryResult
final_result
(
num_queries
,
topk
,
metric_type
);
for
(
int
i
=
0
;
i
<
iteration
;
++
i
)
{
std
::
vector
<
int64_t
>
labels
;
std
::
vector
<
float
>
values
;
for
(
int
n
=
0
;
n
<
num_queries
;
++
n
)
{
for
(
int
k
=
0
;
k
<
topk
;
++
k
)
{
auto
gen_x
=
e
()
%
limit
;
ref_results
[
n
].
push
(
gen_x
);
ref_results
[
n
].
pop
();
labels
.
push_back
(
gen_x
);
values
.
push_back
(
gen_x
);
}
std
::
sort
(
labels
.
begin
()
+
n
*
topk
,
labels
.
begin
()
+
n
*
topk
+
topk
,
std
::
greater
<
int64_t
>
());
std
::
sort
(
values
.
begin
()
+
n
*
topk
,
values
.
begin
()
+
n
*
topk
+
topk
,
std
::
greater
<
float
>
());
}
SubQueryResult
sub_result
(
num_queries
,
topk
,
metric_type
);
sub_result
.
mutable_values
()
=
values
;
sub_result
.
mutable_labels
()
=
labels
;
final_result
.
merge
(
sub_result
);
}
for
(
int
n
=
0
;
n
<
num_queries
;
++
n
)
{
ASSERT_EQ
(
ref_results
[
n
].
size
(),
topk
);
for
(
int
k
=
0
;
k
<
topk
;
++
k
)
{
auto
ref_x
=
ref_results
[
n
].
top
();
ref_results
[
n
].
pop
();
auto
index
=
n
*
topk
+
topk
-
1
-
k
;
auto
label
=
final_result
.
get_labels
()[
index
];
auto
value
=
final_result
.
get_values
()[
index
];
ASSERT_EQ
(
label
,
ref_x
);
ASSERT_EQ
(
value
,
ref_x
);
}
}
}
\ No newline at end of file
internal/master/flush_scheduler.go
浏览文件 @
da6eeddb
...
...
@@ -19,9 +19,10 @@ type FlushScheduler struct {
ctx
context
.
Context
cancel
context
.
CancelFunc
globalTSOAllocator
func
()
(
Timestamp
,
error
)
}
func
NewFlushScheduler
(
ctx
context
.
Context
,
client
WriteNodeClient
,
metaTable
*
metaTable
,
buildScheduler
*
IndexBuildScheduler
)
*
FlushScheduler
{
func
NewFlushScheduler
(
ctx
context
.
Context
,
client
WriteNodeClient
,
metaTable
*
metaTable
,
buildScheduler
*
IndexBuildScheduler
,
globalTSOAllocator
func
()
(
Timestamp
,
error
)
)
*
FlushScheduler
{
ctx2
,
cancel
:=
context
.
WithCancel
(
ctx
)
return
&
FlushScheduler
{
...
...
@@ -32,6 +33,7 @@ func NewFlushScheduler(ctx context.Context, client WriteNodeClient, metaTable *m
segmentDescribeChan
:
make
(
chan
UniqueID
,
100
),
ctx
:
ctx2
,
cancel
:
cancel
,
globalTSOAllocator
:
globalTSOAllocator
,
}
}
...
...
@@ -42,8 +44,12 @@ func (scheduler *FlushScheduler) schedule(id interface{}) error {
return
err
}
ts
,
err
:=
scheduler
.
globalTSOAllocator
()
if
err
!=
nil
{
return
err
}
// todo set corrent timestamp
err
=
scheduler
.
client
.
FlushSegment
(
segmentID
,
segmentMeta
.
CollectionID
,
segmentMeta
.
PartitionTag
,
Timestamp
(
0
)
)
err
=
scheduler
.
client
.
FlushSegment
(
segmentID
,
segmentMeta
.
CollectionID
,
segmentMeta
.
PartitionTag
,
ts
)
log
.
Printf
(
"flush segment %d"
,
segmentID
)
if
err
!=
nil
{
return
err
...
...
internal/master/master.go
浏览文件 @
da6eeddb
...
...
@@ -193,7 +193,7 @@ func CreateServer(ctx context.Context) (*Master, error) {
m
.
indexLoadSch
=
NewIndexLoadScheduler
(
ctx
,
loadIndexClient
,
m
.
metaTable
)
m
.
indexBuildSch
=
NewIndexBuildScheduler
(
ctx
,
buildIndexClient
,
m
.
metaTable
,
m
.
indexLoadSch
)
m
.
flushSch
=
NewFlushScheduler
(
ctx
,
flushClient
,
m
.
metaTable
,
m
.
indexBuildSch
)
m
.
flushSch
=
NewFlushScheduler
(
ctx
,
flushClient
,
m
.
metaTable
,
m
.
indexBuildSch
,
func
()
(
Timestamp
,
error
)
{
return
m
.
tsoAllocator
.
AllocOne
()
}
)
m
.
segmentAssigner
=
NewSegmentAssigner
(
ctx
,
metakv
,
func
()
(
Timestamp
,
error
)
{
return
m
.
tsoAllocator
.
AllocOne
()
},
...
...
internal/master/persistence_scheduler_test.go
浏览文件 @
da6eeddb
...
...
@@ -59,7 +59,11 @@ func TestPersistenceScheduler(t *testing.T) {
//Init scheduler
indexLoadSch
:=
NewIndexLoadScheduler
(
ctx
,
loadIndexClient
,
meta
)
indexBuildSch
:=
NewIndexBuildScheduler
(
ctx
,
buildIndexClient
,
meta
,
indexLoadSch
)
flushSch
:=
NewFlushScheduler
(
ctx
,
flushClient
,
meta
,
indexBuildSch
)
cnt
:=
0
flushSch
:=
NewFlushScheduler
(
ctx
,
flushClient
,
meta
,
indexBuildSch
,
func
()
(
Timestamp
,
error
)
{
cnt
++
return
Timestamp
(
cnt
),
nil
})
//scheduler start
err
=
indexLoadSch
.
Start
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录