Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
a4860bea
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a4860bea
编写于
1月 07, 2021
作者:
D
dragondriver
提交者:
yefu.chen
1月 07, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Parse type params and index params when buildindex is invoked
Signed-off-by:
N
dragondriver
<
jiquan.long@zilliz.com
>
上级
6ffe8739
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
71 addition
and
488 deletion
+71
-488
.devcontainer.json
.devcontainer.json
+1
-1
.gitignore
.gitignore
+1
-1
internal/indexbuilder/client/client.go
internal/indexbuilder/client/client.go
+53
-10
internal/master/segment_manager_test.go
internal/master/segment_manager_test.go
+1
-120
internal/querynode/load_index_service.go
internal/querynode/load_index_service.go
+0
-8
internal/querynode/load_index_service_test.go
internal/querynode/load_index_service_test.go
+9
-322
internal/querynode/query_node.go
internal/querynode/query_node.go
+0
-3
internal/querynode/query_node_test.go
internal/querynode/query_node_test.go
+3
-7
scripts/init_devcontainer.sh
scripts/init_devcontainer.sh
+3
-16
未找到文件。
.devcontainer.json
浏览文件 @
a4860bea
...
...
@@ -2,7 +2,7 @@
"name"
:
"Milvus Distributed Dev Container Definition"
,
"dockerComposeFile"
:
[
"./docker-compose-vscode.yml"
],
"service"
:
"ubuntu"
,
"initializeCommand"
:
"scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down
|| true
"
,
"initializeCommand"
:
"scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down
all -v || true && docker-compose -f docker-compose-vscode.yml pull --ignore-pull-failures ubuntu
"
,
"workspaceFolder"
:
"/go/src/github.com/zilliztech/milvus-distributed"
,
"shutdownAction"
:
"stopCompose"
,
"extensions"
:
[
...
...
.gitignore
浏览文件 @
a4860bea
...
...
@@ -11,7 +11,7 @@ pulsar/client-cpp/build/*
# vscode generated files
.vscode
docker-compose-vscode.yml
docker-compose-vscode.yml.
tmp
docker-compose-vscode.yml.
bak
cmake-build-debug
cmake-build-release
...
...
internal/indexbuilder/client/client.go
浏览文件 @
a4860bea
...
...
@@ -2,6 +2,10 @@ package indexbuilderclient
import
(
"context"
"encoding/json"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/errors"
"log"
"time"
"google.golang.org/grpc"
...
...
@@ -54,20 +58,59 @@ func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[st
if
c
.
tryConnect
()
!=
nil
{
panic
(
"BuildIndexWithoutID: failed to connect index builder"
)
}
parseMap
:=
func
(
mStr
string
)
(
map
[
string
]
string
,
error
)
{
buffer
:=
make
(
map
[
string
]
interface
{})
err
:=
json
.
Unmarshal
([]
byte
(
mStr
),
&
buffer
)
if
err
!=
nil
{
return
nil
,
errors
.
New
(
"Unmarshal params failed"
)
}
ret
:=
make
(
map
[
string
]
string
)
for
key
,
value
:=
range
buffer
{
valueStr
:=
fmt
.
Sprintf
(
"%v"
,
value
)
ret
[
key
]
=
valueStr
}
return
ret
,
nil
}
var
typeParamsKV
[]
*
commonpb
.
KeyValuePair
for
typeParam
:=
range
typeParams
{
typeParamsKV
=
append
(
typeParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
typeParam
,
Value
:
typeParams
[
typeParam
],
})
for
key
:=
range
typeParams
{
if
key
==
"params"
{
mapParams
,
err
:=
parseMap
(
typeParams
[
key
])
if
err
!=
nil
{
log
.
Println
(
"parse params error: "
,
err
)
}
for
pk
,
pv
:=
range
mapParams
{
typeParamsKV
=
append
(
typeParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
pk
,
Value
:
pv
,
})
}
}
else
{
typeParamsKV
=
append
(
typeParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
key
,
Value
:
typeParams
[
key
],
})
}
}
var
indexParamsKV
[]
*
commonpb
.
KeyValuePair
for
indexParam
:=
range
indexParams
{
indexParamsKV
=
append
(
indexParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
indexParam
,
Value
:
indexParams
[
indexParam
],
})
for
key
:=
range
indexParams
{
if
key
==
"params"
{
mapParams
,
err
:=
parseMap
(
indexParams
[
key
])
if
err
!=
nil
{
log
.
Println
(
"parse params error: "
,
err
)
}
for
pk
,
pv
:=
range
mapParams
{
indexParamsKV
=
append
(
indexParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
pk
,
Value
:
pv
,
})
}
}
else
{
indexParamsKV
=
append
(
indexParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
key
,
Value
:
indexParams
[
key
],
})
}
}
ctx
:=
context
.
TODO
()
...
...
internal/master/segment_manager_test.go
浏览文件 @
a4860bea
...
...
@@ -126,7 +126,7 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
}
}
time
.
Sleep
(
time
.
Duration
(
Params
.
SegIDAssignExpiration
)
*
time
.
Millisecond
)
time
.
Sleep
(
time
.
Duration
(
Params
.
SegIDAssignExpiration
))
timestamp
,
err
:=
globalTsoAllocator
()
assert
.
Nil
(
t
,
err
)
err
=
mt
.
UpdateSegment
(
&
pb
.
SegmentMeta
{
...
...
@@ -156,122 +156,3 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
NotEqualValues
(
t
,
0
,
segMeta
.
CloseTime
)
}
func
TestSegmentManager_SycnWritenode
(
t
*
testing
.
T
)
{
ctx
,
cancelFunc
:=
context
.
WithCancel
(
context
.
TODO
())
defer
cancelFunc
()
Init
()
Params
.
TopicNum
=
5
Params
.
QueryNodeNum
=
3
Params
.
SegmentSize
=
536870912
/
1024
/
1024
Params
.
SegmentSizeFactor
=
0.75
Params
.
DefaultRecordSize
=
1024
Params
.
MinSegIDAssignCnt
=
1048576
/
1024
Params
.
SegIDAssignExpiration
=
2000
etcdAddress
:=
Params
.
EtcdAddress
cli
,
err
:=
clientv3
.
New
(
clientv3
.
Config
{
Endpoints
:
[]
string
{
etcdAddress
}})
assert
.
Nil
(
t
,
err
)
rootPath
:=
"/test/root"
_
,
err
=
cli
.
Delete
(
ctx
,
rootPath
,
clientv3
.
WithPrefix
())
assert
.
Nil
(
t
,
err
)
kvBase
:=
etcdkv
.
NewEtcdKV
(
cli
,
rootPath
)
defer
kvBase
.
Close
()
mt
,
err
:=
NewMetaTable
(
kvBase
)
assert
.
Nil
(
t
,
err
)
collName
:=
"segmgr_test_coll"
var
collID
int64
=
1001
partitionTag
:=
"test_part"
schema
:=
&
schemapb
.
CollectionSchema
{
Name
:
collName
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
{
FieldID
:
1
,
Name
:
"f1"
,
IsPrimaryKey
:
false
,
DataType
:
schemapb
.
DataType_INT32
},
{
FieldID
:
2
,
Name
:
"f2"
,
IsPrimaryKey
:
false
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"128"
},
}},
},
}
err
=
mt
.
AddCollection
(
&
pb
.
CollectionMeta
{
ID
:
collID
,
Schema
:
schema
,
CreateTime
:
0
,
SegmentIDs
:
[]
UniqueID
{},
PartitionTags
:
[]
string
{},
})
assert
.
Nil
(
t
,
err
)
err
=
mt
.
AddPartition
(
collID
,
partitionTag
)
assert
.
Nil
(
t
,
err
)
var
cnt
int64
globalIDAllocator
:=
func
()
(
UniqueID
,
error
)
{
val
:=
atomic
.
AddInt64
(
&
cnt
,
1
)
return
val
,
nil
}
globalTsoAllocator
:=
func
()
(
Timestamp
,
error
)
{
val
:=
atomic
.
AddInt64
(
&
cnt
,
1
)
phy
:=
time
.
Now
()
.
UnixNano
()
/
int64
(
time
.
Millisecond
)
ts
:=
tsoutil
.
ComposeTS
(
phy
,
val
)
return
ts
,
nil
}
syncWriteChan
:=
make
(
chan
*
msgstream
.
TimeTickMsg
)
syncProxyChan
:=
make
(
chan
*
msgstream
.
TimeTickMsg
)
segAssigner
:=
NewSegmentAssigner
(
ctx
,
mt
,
globalTsoAllocator
,
syncProxyChan
)
mockScheduler
:=
&
MockFlushScheduler
{}
segManager
,
err
:=
NewSegmentManager
(
ctx
,
mt
,
globalIDAllocator
,
globalTsoAllocator
,
syncWriteChan
,
mockScheduler
,
segAssigner
)
assert
.
Nil
(
t
,
err
)
segManager
.
Start
()
defer
segManager
.
Close
()
sizePerRecord
,
err
:=
typeutil
.
EstimateSizePerRecord
(
schema
)
assert
.
Nil
(
t
,
err
)
maxCount
:=
uint32
(
Params
.
SegmentSize
*
1024
*
1024
/
float64
(
sizePerRecord
))
req
:=
[]
*
internalpb
.
SegIDRequest
{
{
Count
:
maxCount
,
ChannelID
:
1
,
CollName
:
collName
,
PartitionTag
:
partitionTag
},
{
Count
:
maxCount
,
ChannelID
:
2
,
CollName
:
collName
,
PartitionTag
:
partitionTag
},
{
Count
:
maxCount
,
ChannelID
:
3
,
CollName
:
collName
,
PartitionTag
:
partitionTag
},
}
assignSegment
,
err
:=
segManager
.
AssignSegment
(
req
)
assert
.
Nil
(
t
,
err
)
timestamp
,
err
:=
globalTsoAllocator
()
assert
.
Nil
(
t
,
err
)
for
i
:=
0
;
i
<
len
(
assignSegment
);
i
++
{
assert
.
EqualValues
(
t
,
maxCount
,
assignSegment
[
i
]
.
Count
)
assert
.
EqualValues
(
t
,
i
+
1
,
assignSegment
[
i
]
.
ChannelID
)
err
=
mt
.
UpdateSegment
(
&
pb
.
SegmentMeta
{
SegmentID
:
assignSegment
[
i
]
.
SegID
,
CollectionID
:
collID
,
PartitionTag
:
partitionTag
,
ChannelStart
:
0
,
ChannelEnd
:
1
,
CloseTime
:
timestamp
,
NumRows
:
int64
(
maxCount
),
MemSize
:
500000
,
})
assert
.
Nil
(
t
,
err
)
}
time
.
Sleep
(
time
.
Duration
(
Params
.
SegIDAssignExpiration
)
*
time
.
Millisecond
)
timestamp
,
err
=
globalTsoAllocator
()
assert
.
Nil
(
t
,
err
)
tsMsg
:=
&
msgstream
.
TimeTickMsg
{
BaseMsg
:
msgstream
.
BaseMsg
{
BeginTimestamp
:
timestamp
,
EndTimestamp
:
timestamp
,
HashValues
:
[]
uint32
{},
},
TimeTickMsg
:
internalpb
.
TimeTickMsg
{
MsgType
:
internalpb
.
MsgType_kTimeTick
,
PeerID
:
1
,
Timestamp
:
timestamp
,
},
}
syncWriteChan
<-
tsMsg
time
.
Sleep
(
300
*
time
.
Millisecond
)
status
:=
segManager
.
collStatus
[
collID
]
assert
.
Empty
(
t
,
status
.
segments
)
}
internal/querynode/load_index_service.go
浏览文件 @
a4860bea
...
...
@@ -100,7 +100,6 @@ func (lis *loadIndexService) start() {
continue
}
// 1. use msg's index paths to get index bytes
fmt
.
Println
(
"start load index"
)
var
indexBuffer
[][]
byte
var
err
error
fn
:=
func
()
error
{
...
...
@@ -139,13 +138,6 @@ func (lis *loadIndexService) start() {
}
}
func
(
lis
*
loadIndexService
)
close
()
{
if
lis
.
loadIndexMsgStream
!=
nil
{
lis
.
loadIndexMsgStream
.
Close
()
}
lis
.
cancel
()
}
func
(
lis
*
loadIndexService
)
printIndexParams
(
index
[]
*
commonpb
.
KeyValuePair
)
{
fmt
.
Println
(
"================================================="
)
for
i
:=
0
;
i
<
len
(
index
);
i
++
{
...
...
internal/querynode/load_index_service_test.go
浏览文件 @
a4860bea
...
...
@@ -22,29 +22,26 @@ import (
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
)
func
TestLoadIndexService
_FloatVector
(
t
*
testing
.
T
)
{
func
TestLoadIndexService
(
t
*
testing
.
T
)
{
node
:=
newQueryNode
()
collectionID
:=
rand
.
Int63n
(
1000000
)
segmentID
:=
rand
.
Int63n
(
1000000
)
initTestMeta
(
t
,
node
,
"collection0"
,
collectionID
,
segmentID
)
// loadIndexService and statsService
suffix
:=
"-test-search"
+
strconv
.
FormatInt
(
rand
.
Int63n
(
1000000
),
10
)
oldSearchChannelNames
:=
Params
.
SearchChannelNames
newSearchChannelNames
:=
makeNewChannelNames
(
oldSearchChannelNames
,
suffix
)
var
newSearchChannelNames
[]
string
for
_
,
channel
:=
range
oldSearchChannelNames
{
newSearchChannelNames
=
append
(
newSearchChannelNames
,
channel
+
"new"
)
}
Params
.
SearchChannelNames
=
newSearchChannelNames
oldSearchResultChannelNames
:=
Params
.
SearchChannelNames
newSearchResultChannelNames
:=
makeNewChannelNames
(
oldSearchResultChannelNames
,
suffix
)
var
newSearchResultChannelNames
[]
string
for
_
,
channel
:=
range
oldSearchResultChannelNames
{
newSearchResultChannelNames
=
append
(
newSearchResultChannelNames
,
channel
+
"new"
)
}
Params
.
SearchResultChannelNames
=
newSearchResultChannelNames
oldLoadIndexChannelNames
:=
Params
.
LoadIndexChannelNames
newLoadIndexChannelNames
:=
makeNewChannelNames
(
oldLoadIndexChannelNames
,
suffix
)
Params
.
LoadIndexChannelNames
=
newLoadIndexChannelNames
oldStatsChannelName
:=
Params
.
StatsChannelName
newStatsChannelNames
:=
makeNewChannelNames
([]
string
{
oldStatsChannelName
},
suffix
)
Params
.
StatsChannelName
=
newStatsChannelNames
[
0
]
go
node
.
Start
()
//generate insert data
...
...
@@ -331,319 +328,9 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
}
Params
.
SearchChannelNames
=
oldSearchChannelNames
Params
.
SearchResultChannelNames
=
oldSearchResultChannelNames
Params
.
LoadIndexChannelNames
=
oldLoadIndexChannelNames
Params
.
StatsChannelName
=
oldStatsChannelName
fmt
.
Println
(
"loadIndex floatVector test Done!"
)
defer
assert
.
Equal
(
t
,
findFiledStats
,
true
)
<-
node
.
queryNodeLoopCtx
.
Done
()
node
.
Close
()
}
func
TestLoadIndexService_BinaryVector
(
t
*
testing
.
T
)
{
node
:=
newQueryNode
()
collectionID
:=
rand
.
Int63n
(
1000000
)
segmentID
:=
rand
.
Int63n
(
1000000
)
initTestMeta
(
t
,
node
,
"collection0"
,
collectionID
,
segmentID
,
true
)
// loadIndexService and statsService
suffix
:=
"-test-search-binary"
+
strconv
.
FormatInt
(
rand
.
Int63n
(
1000000
),
10
)
oldSearchChannelNames
:=
Params
.
SearchChannelNames
newSearchChannelNames
:=
makeNewChannelNames
(
oldSearchChannelNames
,
suffix
)
Params
.
SearchChannelNames
=
newSearchChannelNames
oldSearchResultChannelNames
:=
Params
.
SearchChannelNames
newSearchResultChannelNames
:=
makeNewChannelNames
(
oldSearchResultChannelNames
,
suffix
)
Params
.
SearchResultChannelNames
=
newSearchResultChannelNames
oldLoadIndexChannelNames
:=
Params
.
LoadIndexChannelNames
newLoadIndexChannelNames
:=
makeNewChannelNames
(
oldLoadIndexChannelNames
,
suffix
)
Params
.
LoadIndexChannelNames
=
newLoadIndexChannelNames
oldStatsChannelName
:=
Params
.
StatsChannelName
newStatsChannelNames
:=
makeNewChannelNames
([]
string
{
oldStatsChannelName
},
suffix
)
Params
.
StatsChannelName
=
newStatsChannelNames
[
0
]
go
node
.
Start
()
const
msgLength
=
1000
const
receiveBufSize
=
1024
const
DIM
=
128
// generator index data
var
indexRowData
[]
byte
for
n
:=
0
;
n
<
msgLength
;
n
++
{
for
i
:=
0
;
i
<
DIM
/
8
;
i
++
{
indexRowData
=
append
(
indexRowData
,
byte
(
rand
.
Intn
(
8
)))
}
}
//generator insert data
var
insertRowBlob
[]
*
commonpb
.
Blob
var
timestamps
[]
uint64
var
rowIDs
[]
int64
var
hashValues
[]
uint32
offset
:=
0
for
n
:=
0
;
n
<
msgLength
;
n
++
{
rowData
:=
make
([]
byte
,
0
)
rowData
=
append
(
rowData
,
indexRowData
[
offset
:
offset
+
(
DIM
/
8
)]
...
)
offset
+=
DIM
/
8
age
:=
make
([]
byte
,
4
)
binary
.
LittleEndian
.
PutUint32
(
age
,
1
)
rowData
=
append
(
rowData
,
age
...
)
blob
:=
&
commonpb
.
Blob
{
Value
:
rowData
,
}
insertRowBlob
=
append
(
insertRowBlob
,
blob
)
timestamps
=
append
(
timestamps
,
uint64
(
n
))
rowIDs
=
append
(
rowIDs
,
int64
(
n
))
hashValues
=
append
(
hashValues
,
uint32
(
n
))
}
var
insertMsg
msgstream
.
TsMsg
=
&
msgstream
.
InsertMsg
{
BaseMsg
:
msgstream
.
BaseMsg
{
HashValues
:
hashValues
,
},
InsertRequest
:
internalpb
.
InsertRequest
{
MsgType
:
internalpb
.
MsgType_kInsert
,
ReqID
:
0
,
CollectionName
:
"collection0"
,
PartitionTag
:
"default"
,
SegmentID
:
segmentID
,
ChannelID
:
int64
(
0
),
ProxyID
:
int64
(
0
),
Timestamps
:
timestamps
,
RowIDs
:
rowIDs
,
RowData
:
insertRowBlob
,
},
}
insertMsgPack
:=
msgstream
.
MsgPack
{
BeginTs
:
0
,
EndTs
:
math
.
MaxUint64
,
Msgs
:
[]
msgstream
.
TsMsg
{
insertMsg
},
}
// generate timeTick
timeTickMsg
:=
&
msgstream
.
TimeTickMsg
{
BaseMsg
:
msgstream
.
BaseMsg
{
BeginTimestamp
:
0
,
EndTimestamp
:
0
,
HashValues
:
[]
uint32
{
0
},
},
TimeTickMsg
:
internalpb
.
TimeTickMsg
{
MsgType
:
internalpb
.
MsgType_kTimeTick
,
PeerID
:
UniqueID
(
0
),
Timestamp
:
math
.
MaxUint64
,
},
}
timeTickMsgPack
:=
&
msgstream
.
MsgPack
{
Msgs
:
[]
msgstream
.
TsMsg
{
timeTickMsg
},
}
// pulsar produce
insertChannels
:=
Params
.
InsertChannelNames
ddChannels
:=
Params
.
DDChannelNames
insertStream
:=
msgstream
.
NewPulsarMsgStream
(
node
.
queryNodeLoopCtx
,
receiveBufSize
)
insertStream
.
SetPulsarClient
(
Params
.
PulsarAddress
)
insertStream
.
CreatePulsarProducers
(
insertChannels
)
ddStream
:=
msgstream
.
NewPulsarMsgStream
(
node
.
queryNodeLoopCtx
,
receiveBufSize
)
ddStream
.
SetPulsarClient
(
Params
.
PulsarAddress
)
ddStream
.
CreatePulsarProducers
(
ddChannels
)
var
insertMsgStream
msgstream
.
MsgStream
=
insertStream
insertMsgStream
.
Start
()
var
ddMsgStream
msgstream
.
MsgStream
=
ddStream
ddMsgStream
.
Start
()
err
:=
insertMsgStream
.
Produce
(
&
insertMsgPack
)
assert
.
NoError
(
t
,
err
)
err
=
insertMsgStream
.
Broadcast
(
timeTickMsgPack
)
assert
.
NoError
(
t
,
err
)
err
=
ddMsgStream
.
Broadcast
(
timeTickMsgPack
)
assert
.
NoError
(
t
,
err
)
//generate search data and send search msg
searchRowData
:=
indexRowData
[
42
*
(
DIM
/
8
)
:
43
*
(
DIM
/
8
)]
dslString
:=
"{
\"
bool
\"
: {
\n\"
vector
\"
: {
\n
\"
vec
\"
: {
\n
\"
metric_type
\"
:
\"
JACCARD
\"
,
\n
\"
params
\"
: {
\n
\"
nprobe
\"
: 10
\n
},
\n
\"
query
\"
:
\"
$0
\"
,
\"
topk
\"
: 10
\n
}
\n
}
\n
}
\n
}"
placeholderValue
:=
servicepb
.
PlaceholderValue
{
Tag
:
"$0"
,
Type
:
servicepb
.
PlaceholderType_VECTOR_BINARY
,
Values
:
[][]
byte
{
searchRowData
},
}
placeholderGroup
:=
servicepb
.
PlaceholderGroup
{
Placeholders
:
[]
*
servicepb
.
PlaceholderValue
{
&
placeholderValue
},
}
placeGroupByte
,
err
:=
proto
.
Marshal
(
&
placeholderGroup
)
if
err
!=
nil
{
log
.
Print
(
"marshal placeholderGroup failed"
)
}
query
:=
servicepb
.
Query
{
CollectionName
:
"collection0"
,
PartitionTags
:
[]
string
{
"default"
},
Dsl
:
dslString
,
PlaceholderGroup
:
placeGroupByte
,
}
queryByte
,
err
:=
proto
.
Marshal
(
&
query
)
if
err
!=
nil
{
log
.
Print
(
"marshal query failed"
)
}
blob
:=
commonpb
.
Blob
{
Value
:
queryByte
,
}
fn
:=
func
(
n
int64
)
*
msgstream
.
MsgPack
{
searchMsg
:=
&
msgstream
.
SearchMsg
{
BaseMsg
:
msgstream
.
BaseMsg
{
HashValues
:
[]
uint32
{
0
},
},
SearchRequest
:
internalpb
.
SearchRequest
{
MsgType
:
internalpb
.
MsgType_kSearch
,
ReqID
:
n
,
ProxyID
:
int64
(
1
),
Timestamp
:
uint64
(
msgLength
),
ResultChannelID
:
int64
(
0
),
Query
:
&
blob
,
},
}
return
&
msgstream
.
MsgPack
{
Msgs
:
[]
msgstream
.
TsMsg
{
searchMsg
},
}
}
searchStream
:=
msgstream
.
NewPulsarMsgStream
(
node
.
queryNodeLoopCtx
,
receiveBufSize
)
searchStream
.
SetPulsarClient
(
Params
.
PulsarAddress
)
searchStream
.
CreatePulsarProducers
(
newSearchChannelNames
)
searchStream
.
Start
()
err
=
searchStream
.
Produce
(
fn
(
1
))
assert
.
NoError
(
t
,
err
)
//get search result
searchResultStream
:=
msgstream
.
NewPulsarMsgStream
(
node
.
queryNodeLoopCtx
,
receiveBufSize
)
searchResultStream
.
SetPulsarClient
(
Params
.
PulsarAddress
)
unmarshalDispatcher
:=
msgstream
.
NewUnmarshalDispatcher
()
searchResultStream
.
CreatePulsarConsumers
(
newSearchResultChannelNames
,
"loadIndexTestSubSearchResult2"
,
unmarshalDispatcher
,
receiveBufSize
)
searchResultStream
.
Start
()
searchResult
:=
searchResultStream
.
Consume
()
assert
.
NotNil
(
t
,
searchResult
)
unMarshaledHit
:=
servicepb
.
Hits
{}
err
=
proto
.
Unmarshal
(
searchResult
.
Msgs
[
0
]
.
(
*
msgstream
.
SearchResultMsg
)
.
Hits
[
0
],
&
unMarshaledHit
)
assert
.
Nil
(
t
,
err
)
// gen load index message pack
indexParams
:=
make
(
map
[
string
]
string
)
indexParams
[
"index_type"
]
=
"BIN_IVF_FLAT"
indexParams
[
"index_mode"
]
=
"cpu"
indexParams
[
"dim"
]
=
"128"
indexParams
[
"k"
]
=
"10"
indexParams
[
"nlist"
]
=
"100"
indexParams
[
"nprobe"
]
=
"10"
indexParams
[
"m"
]
=
"4"
indexParams
[
"nbits"
]
=
"8"
indexParams
[
"metric_type"
]
=
"JACCARD"
indexParams
[
"SLICE_SIZE"
]
=
"4"
var
indexParamsKV
[]
*
commonpb
.
KeyValuePair
for
key
,
value
:=
range
indexParams
{
indexParamsKV
=
append
(
indexParamsKV
,
&
commonpb
.
KeyValuePair
{
Key
:
key
,
Value
:
value
,
})
}
// generator index
typeParams
:=
make
(
map
[
string
]
string
)
typeParams
[
"dim"
]
=
"128"
index
,
err
:=
indexbuilder
.
NewCIndex
(
typeParams
,
indexParams
)
assert
.
Nil
(
t
,
err
)
err
=
index
.
BuildBinaryVecIndexWithoutIds
(
indexRowData
)
assert
.
Equal
(
t
,
err
,
nil
)
option
:=
&
minioKV
.
Option
{
Address
:
Params
.
MinioEndPoint
,
AccessKeyID
:
Params
.
MinioAccessKeyID
,
SecretAccessKeyID
:
Params
.
MinioSecretAccessKey
,
UseSSL
:
Params
.
MinioUseSSLStr
,
BucketName
:
Params
.
MinioBucketName
,
CreateBucket
:
true
,
}
minioKV
,
err
:=
minioKV
.
NewMinIOKV
(
node
.
queryNodeLoopCtx
,
option
)
assert
.
Equal
(
t
,
err
,
nil
)
//save index to minio
binarySet
,
err
:=
index
.
Serialize
()
assert
.
Equal
(
t
,
err
,
nil
)
indexPaths
:=
make
([]
string
,
0
)
for
_
,
index
:=
range
binarySet
{
path
:=
strconv
.
Itoa
(
int
(
segmentID
))
+
"/"
+
index
.
Key
indexPaths
=
append
(
indexPaths
,
path
)
minioKV
.
Save
(
path
,
string
(
index
.
Value
))
}
//test index search result
indexResult
,
err
:=
index
.
QueryOnBinaryVecIndexWithParam
(
searchRowData
,
indexParams
)
assert
.
Equal
(
t
,
err
,
nil
)
// create loadIndexClient
fieldID
:=
UniqueID
(
100
)
loadIndexChannelNames
:=
Params
.
LoadIndexChannelNames
client
:=
client
.
NewLoadIndexClient
(
node
.
queryNodeLoopCtx
,
Params
.
PulsarAddress
,
loadIndexChannelNames
)
client
.
LoadIndex
(
indexPaths
,
segmentID
,
fieldID
,
"vec"
,
indexParams
)
// init message stream consumer and do checks
statsMs
:=
msgstream
.
NewPulsarMsgStream
(
node
.
queryNodeLoopCtx
,
Params
.
StatsReceiveBufSize
)
statsMs
.
SetPulsarClient
(
Params
.
PulsarAddress
)
statsMs
.
CreatePulsarConsumers
([]
string
{
Params
.
StatsChannelName
},
Params
.
MsgChannelSubName
,
msgstream
.
NewUnmarshalDispatcher
(),
Params
.
StatsReceiveBufSize
)
statsMs
.
Start
()
findFiledStats
:=
false
for
{
receiveMsg
:=
msgstream
.
MsgStream
(
statsMs
)
.
Consume
()
assert
.
NotNil
(
t
,
receiveMsg
)
assert
.
NotEqual
(
t
,
len
(
receiveMsg
.
Msgs
),
0
)
for
_
,
msg
:=
range
receiveMsg
.
Msgs
{
statsMsg
,
ok
:=
msg
.
(
*
msgstream
.
QueryNodeStatsMsg
)
if
statsMsg
.
FieldStats
==
nil
||
len
(
statsMsg
.
FieldStats
)
==
0
{
continue
}
findFiledStats
=
true
assert
.
Equal
(
t
,
ok
,
true
)
assert
.
Equal
(
t
,
len
(
statsMsg
.
FieldStats
),
1
)
fieldStats0
:=
statsMsg
.
FieldStats
[
0
]
assert
.
Equal
(
t
,
fieldStats0
.
FieldID
,
fieldID
)
assert
.
Equal
(
t
,
fieldStats0
.
CollectionID
,
collectionID
)
assert
.
Equal
(
t
,
len
(
fieldStats0
.
IndexStats
),
1
)
indexStats0
:=
fieldStats0
.
IndexStats
[
0
]
params
:=
indexStats0
.
IndexParams
// sort index params by key
sort
.
Slice
(
indexParamsKV
,
func
(
i
,
j
int
)
bool
{
return
indexParamsKV
[
i
]
.
Key
<
indexParamsKV
[
j
]
.
Key
})
indexEqual
:=
node
.
loadIndexService
.
indexParamsEqual
(
params
,
indexParamsKV
)
assert
.
Equal
(
t
,
indexEqual
,
true
)
}
if
findFiledStats
{
break
}
}
err
=
searchStream
.
Produce
(
fn
(
2
))
assert
.
NoError
(
t
,
err
)
searchResult
=
searchResultStream
.
Consume
()
assert
.
NotNil
(
t
,
searchResult
)
err
=
proto
.
Unmarshal
(
searchResult
.
Msgs
[
0
]
.
(
*
msgstream
.
SearchResultMsg
)
.
Hits
[
0
],
&
unMarshaledHit
)
assert
.
Nil
(
t
,
err
)
idsIndex
:=
indexResult
.
IDs
()
idsSegment
:=
unMarshaledHit
.
IDs
assert
.
Equal
(
t
,
len
(
idsIndex
),
len
(
idsSegment
))
for
i
:=
0
;
i
<
len
(
idsIndex
);
i
++
{
assert
.
Equal
(
t
,
idsIndex
[
i
],
idsSegment
[
i
])
}
Params
.
SearchChannelNames
=
oldSearchChannelNames
Params
.
SearchResultChannelNames
=
oldSearchResultChannelNames
Params
.
LoadIndexChannelNames
=
oldLoadIndexChannelNames
Params
.
StatsChannelName
=
oldStatsChannelName
fmt
.
Println
(
"loadIndex binaryVector test Done!"
)
defer
assert
.
Equal
(
t
,
findFiledStats
,
true
)
<-
node
.
queryNodeLoopCtx
.
Done
()
node
.
Close
()
}
internal/querynode/query_node.go
浏览文件 @
a4860bea
...
...
@@ -97,9 +97,6 @@ func (node *QueryNode) Close() {
if
node
.
searchService
!=
nil
{
node
.
searchService
.
close
()
}
if
node
.
loadIndexService
!=
nil
{
node
.
loadIndexService
.
close
()
}
if
node
.
statsService
!=
nil
{
node
.
statsService
.
close
()
}
...
...
internal/querynode/query_node_test.go
浏览文件 @
a4860bea
...
...
@@ -35,7 +35,7 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1
28
"
,
Value
:
"1
6
"
,
},
},
IndexParams
:
[]
*
commonpb
.
KeyValuePair
{
...
...
@@ -92,12 +92,8 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar
return
&
collectionMeta
}
func
initTestMeta
(
t
*
testing
.
T
,
node
*
QueryNode
,
collectionName
string
,
collectionID
UniqueID
,
segmentID
UniqueID
,
optional
...
bool
)
{
isBinary
:=
false
if
len
(
optional
)
>
0
{
isBinary
=
optional
[
0
]
}
collectionMeta
:=
genTestCollectionMeta
(
collectionName
,
collectionID
,
isBinary
)
func
initTestMeta
(
t
*
testing
.
T
,
node
*
QueryNode
,
collectionName
string
,
collectionID
UniqueID
,
segmentID
UniqueID
)
{
collectionMeta
:=
genTestCollectionMeta
(
collectionName
,
collectionID
,
false
)
schemaBlob
:=
proto
.
MarshalTextString
(
collectionMeta
.
Schema
)
assert
.
NotEqual
(
t
,
""
,
schemaBlob
)
...
...
scripts/init_devcontainer.sh
浏览文件 @
a4860bea
...
...
@@ -8,15 +8,6 @@ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symli
done
ROOT_DIR
=
"
$(
cd
-P
"
$(
dirname
"
$SOURCE
"
)
/.."
&&
pwd
)
"
unameOut
=
"
$(
uname
-s
)
"
case
"
${
unameOut
}
"
in
Linux
*
)
machine
=
Linux
;;
Darwin
*
)
machine
=
Mac
;;
CYGWIN
*
)
machine
=
Cygwin
;;
MINGW
*
)
machine
=
MinGw
;;
*
)
machine
=
"UNKNOWN:
${
unameOut
}
"
esac
# Attempt to run in the container with the same UID/GID as we have on the host,
# as this results in the correct permissions on files created in the shared
# volumes. This isn't always possible, however, as IDs less than 100 are
...
...
@@ -30,12 +21,8 @@ gid=$(id -g)
[
"
$uid
"
-lt
500
]
&&
uid
=
501
[
"
$gid
"
-lt
500
]
&&
gid
=
$uid
awk
'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1'
$ROOT_DIR
/docker-compose.yml
>
$ROOT_DIR
/docker-compose-vscode.yml.
tmp
awk
'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1'
$ROOT_DIR
/docker-compose.yml
>
$ROOT_DIR
/docker-compose-vscode.yml.
bak
awk
'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1'
$ROOT_DIR
/docker-compose-vscode.yml.
tmp
>
$ROOT_DIR
/docker-compose-vscode.yml
awk
'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1'
$ROOT_DIR
/docker-compose-vscode.yml.
bak
>
$ROOT_DIR
/docker-compose-vscode.yml
if
[
"
${
machine
}
"
==
"Mac"
]
;
then
sed
-i
''
"s/# user: {{ CURRENT_ID }}/user:
\"
$uid
:
$gid
\"
/g"
$ROOT_DIR
/docker-compose-vscode.yml
else
sed
-i
"s/# user: {{ CURRENT_ID }}/user:
\"
$uid
:
$gid
\"
/g"
$ROOT_DIR
/docker-compose-vscode.yml
fi
\ No newline at end of file
sed
-i
'.bak'
"s/# user: {{ CURRENT_ID }}/user:
\"
$uid
:
$gid
\"
/g"
$ROOT_DIR
/docker-compose-vscode.yml
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录