Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
e771bda9
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
e771bda9
编写于
8月 14, 2021
作者:
C
Cai Yudong
提交者:
GitHub
8月 14, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
optimize retrieve output vector code structure (#7102)
Signed-off-by:
N
yudong.cai
<
yudong.cai@zilliz.com
>
上级
382fa6f2
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
228 addition
and
192 deletion
+228
-192
internal/core/src/segcore/SegmentInterface.cpp
internal/core/src/segcore/SegmentInterface.cpp
+1
-0
internal/proxy/task.go
internal/proxy/task.go
+9
-0
internal/querynode/historical.go
internal/querynode/historical.go
+65
-19
internal/querynode/query_collection.go
internal/querynode/query_collection.go
+17
-162
internal/querynode/segment.go
internal/querynode/segment.go
+80
-2
internal/querynode/streaming.go
internal/querynode/streaming.go
+43
-0
internal/storage/vector_chunk_manager.go
internal/storage/vector_chunk_manager.go
+6
-2
internal/storage/vector_chunk_manager_test.go
internal/storage/vector_chunk_manager_test.go
+7
-7
未找到文件。
internal/core/src/segcore/SegmentInterface.cpp
浏览文件 @
e771bda9
...
...
@@ -143,6 +143,7 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_
auto
data_type
=
field_meta
.
get_data_type
();
auto
data_array
=
std
::
make_unique
<
DataArray
>
();
data_array
->
set_field_id
(
field_meta
.
get_id
().
get
());
data_array
->
set_type
(
milvus
::
proto
::
schema
::
DataType
(
field_meta
.
get_data_type
()));
if
(
!
datatype_is_vector
(
data_type
))
{
auto
scalar_array
=
CreateScalarArrayFrom
(
data_raw
,
count
,
data_type
);
...
...
internal/proxy/task.go
浏览文件 @
e771bda9
...
...
@@ -1262,6 +1262,15 @@ func (dct *DropCollectionTask) PostExecute(ctx context.Context) error {
return
nil
}
// Support wildcard in output fields:
// "*" - all scalar fields
// "%" - all vector fields
// For example, A and B are scalar fields, C and D are vector fields, duplicated fields will automatically be removed.
// output_fields=["*"] ==> [A,B]
// output_fields=["%"] ==> [C,D]
// output_fields=["*","%"] ==> [A,B,C,D]
// output_fields=["*",A] ==> [A,B]
// output_fields=["*",C] ==> [A,B,C]
func
translateOutputFields
(
outputFields
[]
string
,
schema
*
schemapb
.
CollectionSchema
,
addPrimary
bool
)
([]
string
,
error
)
{
var
primaryFieldName
string
scalarFieldNameMap
:=
make
(
map
[
string
]
bool
)
...
...
internal/querynode/historical.go
浏览文件 @
e771bda9
...
...
@@ -27,6 +27,8 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
)
...
...
@@ -177,25 +179,71 @@ func (h *historical) removeGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueI
}
}
func
(
h
*
historical
)
search
(
searchReqs
[]
*
searchRequest
,
collID
UniqueID
,
partIDs
[]
UniqueID
,
plan
*
SearchPlan
,
searchTs
Timestamp
)
([]
*
SearchResult
,
[]
*
Segment
,
error
)
{
func
(
h
*
historical
)
retrieve
(
collID
UniqueID
,
partIDs
[]
UniqueID
,
vcm
*
storage
.
VectorChunkManager
,
plan
*
RetrievePlan
)
([]
*
segcorepb
.
RetrieveResults
,
[]
UniqueID
,
error
)
{
retrieveResults
:=
make
([]
*
segcorepb
.
RetrieveResults
,
0
)
retrieveSegmentIDs
:=
make
([]
UniqueID
,
0
)
// get historical partition ids
var
retrievePartIDs
[]
UniqueID
if
len
(
partIDs
)
==
0
{
hisPartIDs
,
err
:=
h
.
replica
.
getPartitionIDs
(
collID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
retrievePartIDs
=
hisPartIDs
}
else
{
for
_
,
id
:=
range
partIDs
{
_
,
err
:=
h
.
replica
.
getPartitionByID
(
id
)
if
err
==
nil
{
retrievePartIDs
=
append
(
retrievePartIDs
,
id
)
}
}
}
col
,
err
:=
h
.
replica
.
getCollectionByID
(
collID
)
if
err
!=
nil
{
return
nil
,
nil
,
err
}
for
_
,
partID
:=
range
retrievePartIDs
{
segIDs
,
err
:=
h
.
replica
.
getSegmentIDs
(
partID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
for
_
,
segID
:=
range
segIDs
{
seg
,
err
:=
h
.
replica
.
getSegmentByID
(
segID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
result
,
err
:=
seg
.
getEntityByIds
(
plan
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
if
err
=
seg
.
fillVectorFieldsData
(
collID
,
col
.
schema
,
vcm
,
result
);
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
retrieveResults
=
append
(
retrieveResults
,
result
)
retrieveSegmentIDs
=
append
(
retrieveSegmentIDs
,
segID
)
}
}
return
retrieveResults
,
retrieveSegmentIDs
,
nil
}
func
(
h
*
historical
)
search
(
searchReqs
[]
*
searchRequest
,
collID
UniqueID
,
partIDs
[]
UniqueID
,
plan
*
SearchPlan
,
searchTs
Timestamp
)
([]
*
SearchResult
,
[]
UniqueID
,
error
)
{
searchResults
:=
make
([]
*
SearchResult
,
0
)
se
gmentResults
:=
make
([]
*
Segment
,
0
)
se
archSegmentIDs
:=
make
([]
UniqueID
,
0
)
// get historical partition ids
var
searchPartIDs
[]
UniqueID
if
len
(
partIDs
)
==
0
{
hisPartIDs
,
err
:=
h
.
replica
.
getPartitionIDs
(
collID
)
if
len
(
hisPartIDs
)
==
0
{
// no partitions in collection, do empty search
return
nil
,
nil
,
nil
}
if
err
!=
nil
{
return
searchResults
,
se
gmentResult
s
,
err
return
searchResults
,
se
archSegmentID
s
,
err
}
log
.
Debug
(
"no partition specified, search all partitions"
,
zap
.
Any
(
"collectionID"
,
collID
),
...
...
@@ -223,9 +271,7 @@ func (h *historical) search(searchReqs []*searchRequest,
// all partitions have been released
if
len
(
searchPartIDs
)
==
0
&&
col
.
getLoadType
()
==
loadTypePartition
{
return
nil
,
nil
,
errors
.
New
(
"partitions have been released , collectionID = "
+
fmt
.
Sprintln
(
collID
)
+
"target partitionIDs = "
+
fmt
.
Sprintln
(
partIDs
))
fmt
.
Sprintln
(
collID
)
+
"target partitionIDs = "
+
fmt
.
Sprintln
(
partIDs
))
}
if
len
(
searchPartIDs
)
==
0
&&
col
.
getLoadType
()
==
loadTypeCollection
{
...
...
@@ -244,24 +290,24 @@ func (h *historical) search(searchReqs []*searchRequest,
for
_
,
partID
:=
range
searchPartIDs
{
segIDs
,
err
:=
h
.
replica
.
getSegmentIDs
(
partID
)
if
err
!=
nil
{
return
searchResults
,
se
gmentResult
s
,
err
return
searchResults
,
se
archSegmentID
s
,
err
}
for
_
,
segID
:=
range
segIDs
{
seg
,
err
:=
h
.
replica
.
getSegmentByID
(
segID
)
if
err
!=
nil
{
return
searchResults
,
se
gmentResult
s
,
err
return
searchResults
,
se
archSegmentID
s
,
err
}
if
!
seg
.
getOnService
()
{
continue
}
searchResult
,
err
:=
seg
.
search
(
plan
,
searchReqs
,
[]
Timestamp
{
searchTs
})
if
err
!=
nil
{
return
searchResults
,
se
gmentResult
s
,
err
return
searchResults
,
se
archSegmentID
s
,
err
}
searchResults
=
append
(
searchResults
,
searchResult
)
se
gmentResults
=
append
(
segmentResults
,
seg
)
se
archSegmentIDs
=
append
(
searchSegmentIDs
,
seg
.
segmentID
)
}
}
return
searchResults
,
se
gmentResult
s
,
nil
return
searchResults
,
se
archSegmentID
s
,
nil
}
internal/querynode/query_collection.go
浏览文件 @
e771bda9
...
...
@@ -12,7 +12,6 @@
package
querynode
import
(
"bytes"
"context"
"encoding/binary"
"fmt"
...
...
@@ -28,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
...
...
@@ -843,18 +841,14 @@ func (q *queryCollection) search(msg queryMsg) error {
}
searchResults
:=
make
([]
*
SearchResult
,
0
)
sealedSegmentSearched
:=
make
([]
UniqueID
,
0
)
// historical search
hisSearchResults
,
hisSegmentResults
,
err1
:=
q
.
historical
.
search
(
searchRequests
,
collectionID
,
searchMsg
.
PartitionIDs
,
plan
,
travelTimestamp
)
hisSearchResults
,
sealedSegmentSearched
,
err1
:=
q
.
historical
.
search
(
searchRequests
,
collectionID
,
searchMsg
.
PartitionIDs
,
plan
,
travelTimestamp
)
if
err1
!=
nil
{
log
.
Warn
(
err1
.
Error
())
return
err1
}
searchResults
=
append
(
searchResults
,
hisSearchResults
...
)
for
_
,
seg
:=
range
hisSegmentResults
{
sealedSegmentSearched
=
append
(
sealedSegmentSearched
,
seg
.
segmentID
)
}
tr
.
Record
(
"historical search done"
)
// streaming search
...
...
@@ -1045,98 +1039,6 @@ func (q *queryCollection) search(msg queryMsg) error {
return
nil
}
func
(
q
*
queryCollection
)
fillVectorFieldsData
(
segment
*
Segment
,
result
*
segcorepb
.
RetrieveResults
)
error
{
// If segment is growing, vector data is in memory
if
segment
.
segmentType
==
segmentTypeGrowing
{
log
.
Debug
(
"Segment is growing, all vector data is in memory"
)
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"segmentType"
,
segment
.
segmentType
))
return
nil
}
collection
,
_
:=
q
.
streaming
.
replica
.
getCollectionByID
(
q
.
collectionID
)
schema
:=
&
etcdpb
.
CollectionMeta
{
ID
:
q
.
collectionID
,
Schema
:
collection
.
schema
}
schemaHelper
,
err
:=
typeutil
.
CreateSchemaHelper
(
collection
.
schema
)
if
err
!=
nil
{
return
err
}
for
_
,
resultFieldData
:=
range
result
.
FieldsData
{
log
.
Debug
(
"FillVectorFieldData for fieldID"
,
zap
.
Any
(
"fieldID"
,
resultFieldData
.
FieldId
))
// If the vector field doesn't have index. Vector data is in memory for
// brute force search. No need to download data from remote.
_
,
ok
:=
segment
.
indexInfos
[
resultFieldData
.
FieldId
]
if
!
ok
{
log
.
Debug
(
"FillVectorFieldData fielD doesn't have index"
,
zap
.
Any
(
"fielD"
,
resultFieldData
.
FieldId
))
continue
}
vecFieldInfo
,
err
:=
segment
.
getVectorFieldInfo
(
resultFieldData
.
FieldId
)
if
err
!=
nil
{
continue
}
dim
:=
resultFieldData
.
GetVectors
()
.
GetDim
()
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"dim"
,
dim
))
fieldSchema
,
err
:=
schemaHelper
.
GetFieldFromID
(
resultFieldData
.
FieldId
)
if
err
!=
nil
{
return
err
}
dataType
:=
fieldSchema
.
DataType
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"datatype"
,
dataType
))
data
:=
resultFieldData
.
GetVectors
()
.
GetData
()
for
i
,
offset
:=
range
result
.
Offset
{
var
vecPath
string
for
index
,
idBinlogRowSize
:=
range
segment
.
idBinlogRowSizes
{
if
offset
<
idBinlogRowSize
{
vecPath
=
vecFieldInfo
.
fieldBinlog
.
Binlogs
[
index
]
break
}
else
{
offset
-=
idBinlogRowSize
}
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"path"
,
vecPath
))
err
:=
q
.
vcm
.
DownloadVectorFile
(
vecPath
,
schema
)
if
err
!=
nil
{
return
err
}
switch
dataType
{
case
schemapb
.
DataType_BinaryVector
:
rowBytes
:=
dim
/
8
content
:=
make
([]
byte
,
rowBytes
)
_
,
err
:=
q
.
vcm
.
ReadAt
(
vecPath
,
content
,
offset
*
rowBytes
)
if
err
!=
nil
{
return
err
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"binaryVectorResult"
,
content
))
resultLen
:=
dim
/
8
copy
(
data
.
(
*
schemapb
.
VectorField_BinaryVector
)
.
BinaryVector
[
i
*
int
(
resultLen
)
:
(
i
+
1
)
*
int
(
resultLen
)],
content
)
case
schemapb
.
DataType_FloatVector
:
rowBytes
:=
dim
*
4
content
:=
make
([]
byte
,
rowBytes
)
_
,
err
:=
q
.
vcm
.
ReadAt
(
vecPath
,
content
,
offset
*
rowBytes
)
if
err
!=
nil
{
return
err
}
floatResult
:=
make
([]
float32
,
dim
)
buf
:=
bytes
.
NewReader
(
content
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
floatResult
)
if
err
!=
nil
{
return
err
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"floatVectorResult"
,
floatResult
))
resultLen
:=
dim
copy
(
data
.
(
*
schemapb
.
VectorField_FloatVector
)
.
FloatVector
.
Data
[
i
*
int
(
resultLen
)
:
(
i
+
1
)
*
int
(
resultLen
)],
floatResult
)
}
}
}
return
nil
}
func
(
q
*
queryCollection
)
retrieve
(
msg
queryMsg
)
error
{
// TODO(yukun)
// step 1: get retrieve object and defer destruction
...
...
@@ -1170,77 +1072,30 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
tr
:=
timerecord
.
NewTimeRecorder
(
fmt
.
Sprintf
(
"retrieve %d"
,
retrieveMsg
.
CollectionID
))
var
globalSealedSegments
[]
UniqueID
var
partitionIDsInHistorical
[]
UniqueID
var
partitionIDsInStreaming
[]
UniqueID
partitionIDsInQuery
:=
retrieveMsg
.
PartitionIDs
if
len
(
partitionIDsInQuery
)
==
0
{
globalSealedSegments
=
q
.
historical
.
getGlobalSegmentIDsByCollectionID
(
collectionID
)
partitionIDsInHistoricalCol
,
err1
:=
q
.
historical
.
replica
.
getPartitionIDs
(
collectionID
)
partitionIDsInStreamingCol
,
err2
:=
q
.
streaming
.
replica
.
getPartitionIDs
(
collectionID
)
if
err1
!=
nil
&&
err2
!=
nil
{
return
err2
}
partitionIDsInHistorical
=
partitionIDsInHistoricalCol
partitionIDsInStreaming
=
partitionIDsInStreamingCol
if
len
(
retrieveMsg
.
PartitionIDs
)
>
0
{
globalSealedSegments
=
q
.
historical
.
getGlobalSegmentIDsByPartitionIds
(
retrieveMsg
.
PartitionIDs
)
}
else
{
globalSealedSegments
=
q
.
historical
.
getGlobalSegmentIDsByPartitionIds
(
partitionIDsInQuery
)
for
_
,
id
:=
range
partitionIDsInQuery
{
_
,
err1
:=
q
.
historical
.
replica
.
getPartitionByID
(
id
)
if
err1
==
nil
{
partitionIDsInHistorical
=
append
(
partitionIDsInHistorical
,
id
)
}
_
,
err2
:=
q
.
streaming
.
replica
.
getPartitionByID
(
id
)
if
err2
==
nil
{
partitionIDsInStreaming
=
append
(
partitionIDsInStreaming
,
id
)
}
if
err1
!=
nil
&&
err2
!=
nil
{
return
err2
}
}
globalSealedSegments
=
q
.
historical
.
getGlobalSegmentIDsByCollectionID
(
collectionID
)
}
sealedSegmentRetrieved
:=
make
([]
UniqueID
,
0
)
var
mergeList
[]
*
segcorepb
.
RetrieveResults
for
_
,
partitionID
:=
range
partitionIDsInHistorical
{
segmentIDs
,
err
:=
q
.
historical
.
replica
.
getSegmentIDs
(
partitionID
)
if
err
!=
nil
{
return
err
}
for
_
,
segmentID
:=
range
segmentIDs
{
segment
,
err
:=
q
.
historical
.
replica
.
getSegmentByID
(
segmentID
)
if
err
!=
nil
{
return
err
}
result
,
err
:=
segment
.
getEntityByIds
(
plan
)
if
err
!=
nil
{
return
err
}
if
err
=
q
.
fillVectorFieldsData
(
segment
,
result
);
err
!=
nil
{
return
err
}
mergeList
=
append
(
mergeList
,
result
)
sealedSegmentRetrieved
=
append
(
sealedSegmentRetrieved
,
segmentID
)
}
// historical retrieve
hisRetrieveResults
,
sealedSegmentRetrieved
,
err1
:=
q
.
historical
.
retrieve
(
collectionID
,
retrieveMsg
.
PartitionIDs
,
q
.
vcm
,
plan
)
if
err1
!=
nil
{
log
.
Warn
(
err1
.
Error
())
return
err1
}
mergeList
=
append
(
mergeList
,
hisRetrieveResults
...
)
tr
.
Record
(
"historical retrieve done"
)
for
_
,
partitionID
:=
range
partitionIDsInStreaming
{
segmentIDs
,
err
:=
q
.
streaming
.
replica
.
getSegmentIDs
(
partitionID
)
if
err
!=
nil
{
return
err
}
for
_
,
segmentID
:=
range
segmentIDs
{
segment
,
err
:=
q
.
streaming
.
replica
.
getSegmentByID
(
segmentID
)
if
err
!=
nil
{
return
err
}
result
,
err
:=
segment
.
getEntityByIds
(
plan
)
if
err
!=
nil
{
return
err
}
mergeList
=
append
(
mergeList
,
result
)
}
// streaming retrieve
strRetrieveResults
,
_
,
err2
:=
q
.
streaming
.
retrieve
(
collectionID
,
retrieveMsg
.
PartitionIDs
,
plan
)
if
err2
!=
nil
{
log
.
Warn
(
err2
.
Error
())
return
err2
}
mergeList
=
append
(
mergeList
,
strRetrieveResults
...
)
tr
.
Record
(
"streaming retrieve done"
)
result
,
err
:=
mergeRetrieveResults
(
mergeList
)
...
...
internal/querynode/segment.go
浏览文件 @
e771bda9
...
...
@@ -12,9 +12,7 @@
package
querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "segcore/collection_c.h"
...
...
@@ -23,6 +21,8 @@ package querynode
*/
import
"C"
import
(
"bytes"
"encoding/binary"
"errors"
"fmt"
"strconv"
...
...
@@ -35,7 +35,9 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
)
type
segmentType
int32
...
...
@@ -316,6 +318,82 @@ func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults
return
result
,
nil
}
func
(
s
*
Segment
)
fillVectorFieldsData
(
collectionID
UniqueID
,
schema
*
schemapb
.
CollectionSchema
,
vcm
*
storage
.
VectorChunkManager
,
result
*
segcorepb
.
RetrieveResults
)
error
{
for
_
,
fieldData
:=
range
result
.
FieldsData
{
log
.
Debug
(
"FillVectorFieldData for fieldID"
,
zap
.
Any
(
"fieldID"
,
fieldData
.
FieldId
))
// If the vector field doesn't have index. Vector data is in memory for
// brute force search. No need to download data from remote.
_
,
ok
:=
s
.
indexInfos
[
fieldData
.
FieldId
]
if
!
ok
{
log
.
Debug
(
"FillVectorFieldData field doesn't have index"
,
zap
.
Any
(
"field"
,
fieldData
.
FieldId
))
continue
}
vecFieldInfo
,
err
:=
s
.
getVectorFieldInfo
(
fieldData
.
FieldId
)
if
err
!=
nil
{
continue
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"fieldId"
,
fieldData
.
FieldId
))
dim
:=
fieldData
.
GetVectors
()
.
GetDim
()
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Int64
(
"dim"
,
dim
),
zap
.
Any
(
"datatype"
,
fieldData
.
Type
))
for
i
,
offset
:=
range
result
.
Offset
{
var
vecPath
string
for
index
,
idBinlogRowSize
:=
range
s
.
idBinlogRowSizes
{
if
offset
<
idBinlogRowSize
{
vecPath
=
vecFieldInfo
.
fieldBinlog
.
Binlogs
[
index
]
break
}
else
{
offset
-=
idBinlogRowSize
}
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"path"
,
vecPath
))
err
:=
vcm
.
DownloadVectorFile
(
vecPath
,
collectionID
,
schema
)
if
err
!=
nil
{
return
err
}
switch
fieldData
.
Type
{
case
schemapb
.
DataType_BinaryVector
:
rowBytes
:=
dim
/
8
x
:=
fieldData
.
GetVectors
()
.
GetData
()
.
(
*
schemapb
.
VectorField_BinaryVector
)
content
:=
make
([]
byte
,
rowBytes
)
_
,
err
:=
vcm
.
ReadAt
(
vecPath
,
content
,
offset
*
rowBytes
)
if
err
!=
nil
{
return
err
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"binaryVectorResult"
,
content
))
resultLen
:=
dim
/
8
copy
(
x
.
BinaryVector
[
i
*
int
(
resultLen
)
:
(
i
+
1
)
*
int
(
resultLen
)],
content
)
case
schemapb
.
DataType_FloatVector
:
x
:=
fieldData
.
GetVectors
()
.
GetData
()
.
(
*
schemapb
.
VectorField_FloatVector
)
rowBytes
:=
dim
*
4
content
:=
make
([]
byte
,
rowBytes
)
_
,
err
:=
vcm
.
ReadAt
(
vecPath
,
content
,
offset
*
rowBytes
)
if
err
!=
nil
{
return
err
}
floatResult
:=
make
([]
float32
,
dim
)
buf
:=
bytes
.
NewReader
(
content
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
floatResult
)
if
err
!=
nil
{
return
err
}
log
.
Debug
(
"FillVectorFieldData"
,
zap
.
Any
(
"floatVectorResult"
,
floatResult
))
resultLen
:=
dim
copy
(
x
.
FloatVector
.
Data
[
i
*
int
(
resultLen
)
:
(
i
+
1
)
*
int
(
resultLen
)],
floatResult
)
}
}
}
return
nil
}
//-------------------------------------------------------------------------------------- index info interface
func
(
s
*
Segment
)
setIndexName
(
fieldID
int64
,
name
string
)
error
{
s
.
paramMutex
.
Lock
()
...
...
internal/querynode/streaming.go
浏览文件 @
e771bda9
...
...
@@ -21,6 +21,7 @@ import (
etcdkv
"github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
...
...
@@ -61,6 +62,48 @@ func (s *streaming) close() {
s
.
replica
.
freeAll
()
}
func
(
s
*
streaming
)
retrieve
(
collID
UniqueID
,
partIDs
[]
UniqueID
,
plan
*
RetrievePlan
)
([]
*
segcorepb
.
RetrieveResults
,
[]
UniqueID
,
error
)
{
retrieveResults
:=
make
([]
*
segcorepb
.
RetrieveResults
,
0
)
retrieveSegmentIDs
:=
make
([]
UniqueID
,
0
)
var
retrievePartIDs
[]
UniqueID
if
len
(
partIDs
)
==
0
{
strPartIDs
,
err
:=
s
.
replica
.
getPartitionIDs
(
collID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
retrievePartIDs
=
strPartIDs
}
else
{
for
_
,
id
:=
range
partIDs
{
_
,
err
:=
s
.
replica
.
getPartitionByID
(
id
)
if
err
==
nil
{
retrievePartIDs
=
append
(
retrievePartIDs
,
id
)
}
}
}
for
_
,
partID
:=
range
retrievePartIDs
{
segIDs
,
err
:=
s
.
replica
.
getSegmentIDs
(
partID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
for
_
,
segID
:=
range
segIDs
{
seg
,
err
:=
s
.
replica
.
getSegmentByID
(
segID
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
result
,
err
:=
seg
.
getEntityByIds
(
plan
)
if
err
!=
nil
{
return
retrieveResults
,
retrieveSegmentIDs
,
err
}
retrieveResults
=
append
(
retrieveResults
,
result
)
retrieveSegmentIDs
=
append
(
retrieveSegmentIDs
,
segID
)
}
}
return
retrieveResults
,
retrieveSegmentIDs
,
nil
}
func
(
s
*
streaming
)
search
(
searchReqs
[]
*
searchRequest
,
collID
UniqueID
,
partIDs
[]
UniqueID
,
vChannel
Channel
,
plan
*
SearchPlan
,
searchTs
Timestamp
)
([]
*
SearchResult
,
error
)
{
...
...
internal/storage/vector_chunk_manager.go
浏览文件 @
e771bda9
...
...
@@ -17,6 +17,7 @@ import (
"errors"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
type
VectorChunkManager
struct
{
...
...
@@ -31,11 +32,14 @@ func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager Ch
}
}
func
(
vcm
*
VectorChunkManager
)
DownloadVectorFile
(
key
string
,
schema
*
etcdpb
.
CollectionMet
a
)
error
{
func
(
vcm
*
VectorChunkManager
)
DownloadVectorFile
(
key
string
,
collectionID
UniqueID
,
schema
*
schemapb
.
CollectionSchem
a
)
error
{
if
vcm
.
localChunkManager
.
Exist
(
key
)
{
return
nil
}
insertCodec
:=
NewInsertCodec
(
schema
)
insertCodec
:=
NewInsertCodec
(
&
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
schema
,
})
content
,
err
:=
vcm
.
remoteChunkManager
.
Read
(
key
)
if
err
!=
nil
{
return
err
...
...
internal/storage/vector_chunk_manager_test.go
浏览文件 @
e771bda9
...
...
@@ -44,19 +44,19 @@ func TestVectorChunkManager(t *testing.T) {
lcm
:=
NewLocalChunkManager
(
localPath
)
schema
:=
initSchem
a
()
meta
:=
initMet
a
()
vcm
:=
NewVectorChunkManager
(
lcm
,
rcm
)
assert
.
NotNil
(
t
,
vcm
)
binlogs
:=
initBinlogFile
(
schem
a
)
binlogs
:=
initBinlogFile
(
met
a
)
assert
.
NotNil
(
t
,
binlogs
)
for
_
,
binlog
:=
range
binlogs
{
rcm
.
Write
(
binlog
.
Key
,
binlog
.
Value
)
}
err
=
vcm
.
DownloadVectorFile
(
"108"
,
s
chema
)
err
=
vcm
.
DownloadVectorFile
(
"108"
,
meta
.
ID
,
meta
.
S
chema
)
assert
.
Nil
(
t
,
err
)
err
=
vcm
.
DownloadVectorFile
(
"109"
,
s
chema
)
err
=
vcm
.
DownloadVectorFile
(
"109"
,
meta
.
ID
,
meta
.
S
chema
)
assert
.
Nil
(
t
,
err
)
content
,
err
:=
vcm
.
Read
(
"108"
)
...
...
@@ -107,8 +107,8 @@ func newMinIOKVClient(ctx context.Context, bucketName string) (*miniokv.MinIOKV,
return
client
,
err
}
func
init
Schem
a
()
*
etcdpb
.
CollectionMeta
{
schem
a
:=
&
etcdpb
.
CollectionMeta
{
func
init
Met
a
()
*
etcdpb
.
CollectionMeta
{
met
a
:=
&
etcdpb
.
CollectionMeta
{
ID
:
1
,
CreateTime
:
1
,
SegmentIDs
:
[]
int64
{
0
,
1
},
...
...
@@ -156,7 +156,7 @@ func initSchema() *etcdpb.CollectionMeta {
},
},
}
return
schem
a
return
met
a
}
func
initBinlogFile
(
schema
*
etcdpb
.
CollectionMeta
)
[]
*
Blob
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录