Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
947976db
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
947976db
编写于
1月 27, 2021
作者:
X
xige-16
提交者:
yefu.chen
1月 27, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add test for query service’s load function
Signed-off-by:
N
xige-16
<
xi.ge@zilliz.com
>
上级
dbfe6851
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
195 addition
and
36 deletion
+195
-36
cmd/datanode/main.go
cmd/datanode/main.go
+9
-7
internal/datanode/meta_service.go
internal/datanode/meta_service.go
+1
-0
internal/dataservice/param.go
internal/dataservice/param.go
+1
-1
internal/dataservice/server.go
internal/dataservice/server.go
+3
-3
internal/distributed/datanode/service.go
internal/distributed/datanode/service.go
+3
-1
internal/queryservice/queryservice.go
internal/queryservice/queryservice.go
+9
-24
internal/queryservice/queryservice_test.go
internal/queryservice/queryservice_test.go
+169
-0
未找到文件。
cmd/datanode/main.go
浏览文件 @
947976db
...
...
@@ -23,7 +23,6 @@ const interval = 200
func
main
()
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
svr
,
err
:=
dnc
.
New
(
ctx
)
if
err
!=
nil
{
...
...
@@ -114,6 +113,11 @@ func main() {
panic
(
err
)
}
if
err
:=
svr
.
Start
();
err
!=
nil
{
panic
(
err
)
}
log
.
Println
(
"Data node successfully started ..."
)
sc
:=
make
(
chan
os
.
Signal
,
1
)
signal
.
Notify
(
sc
,
syscall
.
SIGHUP
,
...
...
@@ -127,15 +131,13 @@ func main() {
cancel
()
}()
if
err
:=
svr
.
Start
();
err
!=
nil
{
panic
(
err
)
}
log
.
Println
(
"Data node successfully started ..."
)
<-
ctx
.
Done
()
log
.
Println
(
"Got signal to exit signal:"
,
sig
.
String
())
svr
.
Stop
()
if
err
:=
svr
.
Stop
();
err
!=
nil
{
panic
(
err
)
}
switch
sig
{
case
syscall
.
SIGTERM
:
exit
(
0
)
...
...
internal/datanode/meta_service.go
浏览文件 @
947976db
...
...
@@ -67,6 +67,7 @@ func (mService *metaService) getCollectionNames() ([]string, error) {
}
func
(
mService
*
metaService
)
createCollection
(
name
string
)
error
{
log
.
Println
(
"Describing collections"
)
req
:=
&
milvuspb
.
DescribeCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_kDescribeCollection
,
...
...
internal/dataservice/param.go
浏览文件 @
947976db
...
...
@@ -182,7 +182,7 @@ func (p *ParamTable) initSegmentInfoChannelName() {
func
(
p
*
ParamTable
)
initDataServiceSubscriptionName
()
{
var
err
error
p
.
DataServiceSubscriptionName
,
err
=
p
.
Load
(
"msgChannel.
chan
NamePrefix.dataServiceSubNamePrefix"
)
p
.
DataServiceSubscriptionName
,
err
=
p
.
Load
(
"msgChannel.
sub
NamePrefix.dataServiceSubNamePrefix"
)
if
err
!=
nil
{
panic
(
err
)
}
...
...
internal/dataservice/server.go
浏览文件 @
947976db
...
...
@@ -122,14 +122,14 @@ func (s *Server) Start() error {
}
s
.
ddHandler
=
newDDHandler
(
s
.
meta
,
s
.
segAllocator
)
s
.
initSegmentInfoChannel
()
if
err
=
s
.
initMsgProducer
();
err
!=
nil
{
return
err
}
if
err
=
s
.
loadMetaFromMaster
();
err
!=
nil
{
return
err
}
s
.
startServerLoop
()
s
.
waitDataNodeRegister
()
if
err
=
s
.
initMsgProducer
();
err
!=
nil
{
return
err
}
s
.
state
.
Store
(
internalpb2
.
StateCode_HEALTHY
)
log
.
Println
(
"start success"
)
return
nil
...
...
internal/distributed/datanode/service.go
浏览文件 @
947976db
...
...
@@ -27,7 +27,9 @@ type Server struct {
}
func
New
(
ctx
context
.
Context
)
(
*
Server
,
error
)
{
var
s
=
&
Server
{}
var
s
=
&
Server
{
ctx
:
ctx
,
}
s
.
core
=
dn
.
NewDataNode
(
s
.
ctx
)
s
.
grpcServer
=
grpc
.
NewServer
()
...
...
internal/queryservice/queryservice.go
浏览文件 @
947976db
...
...
@@ -77,26 +77,6 @@ func (qs *QueryService) Stop() error {
return
nil
}
//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetDataService(d)
// if err != nil {
// return err
// }
// }
// return nil
//}
//
//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetIndexService(i)
// if err != nil {
// return err
// }
// }
// return nil
//}
func
(
qs
*
QueryService
)
GetComponentStates
()
(
*
internalpb2
.
ComponentStates
,
error
)
{
serviceComponentInfo
:=
&
internalpb2
.
ComponentInfo
{
NodeID
:
Params
.
QueryServiceID
,
...
...
@@ -134,6 +114,7 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
// TODO:: do addWatchDmChannel to query node after registerNode
func
(
qs
*
QueryService
)
RegisterNode
(
req
*
querypb
.
RegisterNodeRequest
)
(
*
querypb
.
RegisterNodeResponse
,
error
)
{
fmt
.
Println
(
"register query node ="
,
req
.
Address
)
// TODO:: add mutex
allocatedID
:=
qs
.
numRegisterNode
qs
.
numRegisterNode
++
...
...
@@ -158,6 +139,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
}
qs
.
queryNodes
=
append
(
qs
.
queryNodes
,
node
)
// TODO:: watch dm channels
return
&
querypb
.
RegisterNodeResponse
{
Status
:
&
commonpb
.
Status
{
ErrorCode
:
commonpb
.
ErrorCode_SUCCESS
,
...
...
@@ -289,7 +271,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
segmentIDs
:=
showSegmentResponse
.
SegmentIDs
segmentStates
:=
make
(
map
[
UniqueID
]
*
datapb
.
SegmentStatesResponse
)
channel2id
:=
make
(
map
[
string
]
int
)
id2channels
:=
make
(
map
[
int
][]
string
)
//
id2channels := make(map[int][]string)
id2segs
:=
make
(
map
[
int
][]
UniqueID
)
offset
:=
0
...
...
@@ -306,13 +288,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
for
i
,
str
:=
range
state
.
StartPositions
{
flatChannelName
+=
str
.
ChannelName
channelNames
=
append
(
channelNames
,
str
.
ChannelName
)
if
i
<
len
(
state
.
StartPositions
)
{
if
i
+
1
<
len
(
state
.
StartPositions
)
{
flatChannelName
+=
"/"
}
}
if
flatChannelName
==
""
{
log
.
Fatal
(
"segmentState's channel name is empty"
)
}
if
_
,
ok
:=
channel2id
[
flatChannelName
];
!
ok
{
channel2id
[
flatChannelName
]
=
offset
id2channels
[
offset
]
=
channelNames
//
id2channels[offset] = channelNames
id2segs
[
offset
]
=
make
([]
UniqueID
,
0
)
id2segs
[
offset
]
=
append
(
id2segs
[
offset
],
segmentID
)
offset
++
...
...
@@ -329,7 +314,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if
segmentStates
[
v
]
.
State
==
datapb
.
SegmentState_SegmentFlushed
{
selectedSegs
=
append
(
selectedSegs
,
v
)
}
else
{
if
i
>
0
&&
segmentStates
[
v
-
1
]
.
State
!=
datapb
.
SegmentState_SegmentFlushed
{
if
i
>
0
&&
segmentStates
[
selectedSegs
[
i
-
1
]
]
.
State
!=
datapb
.
SegmentState_SegmentFlushed
{
break
}
selectedSegs
=
append
(
selectedSegs
,
v
)
...
...
internal/queryservice/queryservice_test.go
浏览文件 @
947976db
...
...
@@ -2,11 +2,148 @@ package queryservice
import
(
"context"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type
masterMock
struct
{
collectionIDs
[]
UniqueID
col2partition
map
[
UniqueID
][]
UniqueID
partition2segment
map
[
UniqueID
][]
UniqueID
}
func
newMasterMock
()
*
masterMock
{
collectionIDs
:=
make
([]
UniqueID
,
0
)
collectionIDs
=
append
(
collectionIDs
,
1
)
col2partition
:=
make
(
map
[
UniqueID
][]
UniqueID
)
partitionIDs
:=
make
([]
UniqueID
,
0
)
partitionIDs
=
append
(
partitionIDs
,
1
)
col2partition
[
1
]
=
partitionIDs
partition2segment
:=
make
(
map
[
UniqueID
][]
UniqueID
)
segmentIDs
:=
make
([]
UniqueID
,
0
)
segmentIDs
=
append
(
segmentIDs
,
1
)
segmentIDs
=
append
(
segmentIDs
,
2
)
segmentIDs
=
append
(
segmentIDs
,
3
)
segmentIDs
=
append
(
segmentIDs
,
4
)
segmentIDs
=
append
(
segmentIDs
,
5
)
segmentIDs
=
append
(
segmentIDs
,
6
)
partition2segment
[
1
]
=
segmentIDs
return
&
masterMock
{
collectionIDs
:
collectionIDs
,
col2partition
:
col2partition
,
partition2segment
:
partition2segment
,
}
}
func
(
master
*
masterMock
)
ShowPartitions
(
in
*
milvuspb
.
ShowPartitionRequest
)
(
*
milvuspb
.
ShowPartitionResponse
,
error
)
{
collectionID
:=
in
.
CollectionID
partitionIDs
:=
make
([]
UniqueID
,
0
)
for
_
,
id
:=
range
master
.
collectionIDs
{
if
id
==
collectionID
{
partitions
:=
master
.
col2partition
[
collectionID
]
partitionIDs
=
append
(
partitionIDs
,
partitions
...
)
}
}
response
:=
&
milvuspb
.
ShowPartitionResponse
{
Status
:
&
commonpb
.
Status
{
ErrorCode
:
commonpb
.
ErrorCode_SUCCESS
,
},
PartitionIDs
:
partitionIDs
,
}
return
response
,
nil
}
func
(
master
*
masterMock
)
ShowSegments
(
in
*
milvuspb
.
ShowSegmentRequest
)
(
*
milvuspb
.
ShowSegmentResponse
,
error
)
{
collectionID
:=
in
.
CollectionID
partitionID
:=
in
.
PartitionID
for
_
,
id
:=
range
master
.
collectionIDs
{
if
id
==
collectionID
{
partitions
:=
master
.
col2partition
[
collectionID
]
for
_
,
partition
:=
range
partitions
{
if
partition
==
partitionID
{
return
&
milvuspb
.
ShowSegmentResponse
{
Status
:
&
commonpb
.
Status
{
ErrorCode
:
commonpb
.
ErrorCode_SUCCESS
,
},
SegmentIDs
:
master
.
partition2segment
[
partition
],
},
nil
}
}
}
}
return
nil
,
errors
.
New
(
"collection id or partition id not found"
)
}
type
dataMock
struct
{
segmentIDs
[]
UniqueID
segmentStates
map
[
UniqueID
]
*
datapb
.
SegmentStatesResponse
}
func
newDataMock
()
*
dataMock
{
positions1
:=
make
([]
*
internalpb2
.
MsgPosition
,
0
)
positions2
:=
make
([]
*
internalpb2
.
MsgPosition
,
0
)
positions1
=
append
(
positions1
,
&
internalpb2
.
MsgPosition
{
ChannelName
:
"insertChannel-"
+
strconv
.
FormatInt
(
1
,
10
)})
positions1
=
append
(
positions1
,
&
internalpb2
.
MsgPosition
{
ChannelName
:
"insertChannel-"
+
strconv
.
FormatInt
(
2
,
10
)})
positions2
=
append
(
positions2
,
&
internalpb2
.
MsgPosition
{
ChannelName
:
"insertChannel-"
+
strconv
.
FormatInt
(
3
,
10
)})
positions2
=
append
(
positions2
,
&
internalpb2
.
MsgPosition
{
ChannelName
:
"insertChannel-"
+
strconv
.
FormatInt
(
4
,
10
)})
segmentIDs
:=
make
([]
UniqueID
,
0
)
segmentIDs
=
append
(
segmentIDs
,
1
)
segmentIDs
=
append
(
segmentIDs
,
2
)
segmentIDs
=
append
(
segmentIDs
,
3
)
segmentIDs
=
append
(
segmentIDs
,
4
)
segmentIDs
=
append
(
segmentIDs
,
5
)
segmentIDs
=
append
(
segmentIDs
,
6
)
fillStates
:=
func
(
time
uint64
,
position
[]
*
internalpb2
.
MsgPosition
,
state
datapb
.
SegmentState
)
*
datapb
.
SegmentStatesResponse
{
return
&
datapb
.
SegmentStatesResponse
{
Status
:
&
commonpb
.
Status
{
ErrorCode
:
commonpb
.
ErrorCode_SUCCESS
,
},
State
:
state
,
CreateTime
:
time
,
StartPositions
:
position
,
}
}
segmentStates
:=
make
(
map
[
UniqueID
]
*
datapb
.
SegmentStatesResponse
)
segmentStates
[
1
]
=
fillStates
(
1
,
positions1
,
datapb
.
SegmentState_SegmentFlushed
)
segmentStates
[
2
]
=
fillStates
(
2
,
positions2
,
datapb
.
SegmentState_SegmentFlushed
)
segmentStates
[
3
]
=
fillStates
(
3
,
positions1
,
datapb
.
SegmentState_SegmentFlushed
)
segmentStates
[
4
]
=
fillStates
(
4
,
positions2
,
datapb
.
SegmentState_SegmentFlushed
)
segmentStates
[
5
]
=
fillStates
(
5
,
positions1
,
datapb
.
SegmentState_SegmentGrowing
)
segmentStates
[
6
]
=
fillStates
(
6
,
positions2
,
datapb
.
SegmentState_SegmentGrowing
)
return
&
dataMock
{
segmentIDs
:
segmentIDs
,
segmentStates
:
segmentStates
,
}
}
func
(
data
*
dataMock
)
GetSegmentStates
(
req
*
datapb
.
SegmentStatesRequest
)
(
*
datapb
.
SegmentStatesResponse
,
error
)
{
segmentID
:=
req
.
SegmentID
for
_
,
id
:=
range
data
.
segmentIDs
{
if
segmentID
==
id
{
return
data
.
segmentStates
[
id
],
nil
}
}
return
nil
,
errors
.
New
(
"segment id not found"
)
}
func
TestQueryService_Init
(
t
*
testing
.
T
)
{
service
,
err
:=
NewQueryService
(
context
.
Background
())
assert
.
Nil
(
t
,
err
)
...
...
@@ -34,3 +171,35 @@ func TestQueryService_Init(t *testing.T) {
service
.
Stop
()
}
func
TestQueryService_load
(
t
*
testing
.
T
)
{
service
,
err
:=
NewQueryService
(
context
.
Background
())
assert
.
Nil
(
t
,
err
)
service
.
Init
()
service
.
Start
()
service
.
SetMasterService
(
newMasterMock
())
service
.
SetDataService
(
newDataMock
())
registerNodeRequest
:=
&
querypb
.
RegisterNodeRequest
{
Address
:
&
commonpb
.
Address
{},
}
service
.
RegisterNode
(
registerNodeRequest
)
t
.
Run
(
"Test LoadSegment"
,
func
(
t
*
testing
.
T
)
{
loadCollectionRequest
:=
&
querypb
.
LoadCollectionRequest
{
CollectionID
:
1
,
}
response
,
err
:=
service
.
LoadCollection
(
loadCollectionRequest
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
response
.
ErrorCode
,
commonpb
.
ErrorCode_SUCCESS
)
})
t
.
Run
(
"Test LoadPartition"
,
func
(
t
*
testing
.
T
)
{
loadPartitionRequest
:=
&
querypb
.
LoadPartitionRequest
{
CollectionID
:
1
,
PartitionIDs
:
[]
UniqueID
{
1
},
}
response
,
err
:=
service
.
LoadPartitions
(
loadPartitionRequest
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
response
.
ErrorCode
,
commonpb
.
ErrorCode_SUCCESS
)
})
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录