Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
d1f31c7b
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
d1f31c7b
编写于
11月 24, 2020
作者:
Q
quicksilver
提交者:
yefu.chen
11月 24, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Cache Docker volumes in code-checker.yaml
Signed-off-by:
N
quicksilver
<
zhifeng.zhang@zilliz.com
>
上级
41fb2d29
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
25 addition
and
688 deletion
+25
-688
.github/workflows/code-checker.yaml
.github/workflows/code-checker.yaml
+6
-0
internal/reader/collection_replica.go
internal/reader/collection_replica.go
+0
-110
internal/reader/collection_replica_test.go
internal/reader/collection_replica_test.go
+11
-500
internal/reader/meta_service.go
internal/reader/meta_service.go
+0
-12
internal/reader/meta_service_test.go
internal/reader/meta_service_test.go
+8
-66
未找到文件。
.github/workflows/code-checker.yaml
浏览文件 @
d1f31c7b
...
...
@@ -42,6 +42,12 @@ jobs:
steps
:
-
name
:
Checkout
uses
:
actions/checkout@v2
-
name
:
Cache Docker Volumes
uses
:
actions/cache@v1
with
:
path
:
.docker
key
:
ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
restore-keys
:
ubuntu${{ matrix.ubuntu }}-
-
name
:
Dockerfile Lint
uses
:
reviewdog/action-hadolint@v1
with
:
...
...
internal/reader/collection_replica.go
浏览文件 @
d1f31c7b
...
...
@@ -13,7 +13,6 @@ package reader
import
"C"
import
(
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"log"
"strconv"
"sync"
...
...
@@ -38,18 +37,13 @@ type collectionReplica interface {
removeCollection
(
collectionID
UniqueID
)
error
getCollectionByID
(
collectionID
UniqueID
)
(
*
Collection
,
error
)
getCollectionByName
(
collectionName
string
)
(
*
Collection
,
error
)
hasCollection
(
collectionID
UniqueID
)
bool
// partition
// Partition tags in different collections are not unique,
// so partition api should specify the target collection.
getPartitionNum
(
collectionID
UniqueID
)
(
int
,
error
)
addPartition
(
collectionID
UniqueID
,
partitionTag
string
)
error
removePartition
(
collectionID
UniqueID
,
partitionTag
string
)
error
addPartitionsByCollectionMeta
(
colMeta
*
etcdpb
.
CollectionMeta
)
error
removePartitionsByCollectionMeta
(
colMeta
*
etcdpb
.
CollectionMeta
)
error
getPartitionByTag
(
collectionID
UniqueID
,
partitionTag
string
)
(
*
Partition
,
error
)
hasPartition
(
collectionID
UniqueID
,
partitionTag
string
)
bool
// segment
getSegmentNum
()
int
...
...
@@ -148,31 +142,7 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri
return
nil
,
errors
.
New
(
"Cannot found collection: "
+
collectionName
)
}
func
(
colReplica
*
collectionReplicaImpl
)
hasCollection
(
collectionID
UniqueID
)
bool
{
colReplica
.
mu
.
RLock
()
defer
colReplica
.
mu
.
RUnlock
()
for
_
,
col
:=
range
colReplica
.
collections
{
if
col
.
ID
()
==
collectionID
{
return
true
}
}
return
false
}
//----------------------------------------------------------------------------------------------------- partition
func
(
colReplica
*
collectionReplicaImpl
)
getPartitionNum
(
collectionID
UniqueID
)
(
int
,
error
)
{
collection
,
err
:=
colReplica
.
getCollectionByID
(
collectionID
)
if
err
!=
nil
{
return
-
1
,
err
}
colReplica
.
mu
.
RLock
()
defer
colReplica
.
mu
.
RUnlock
()
return
len
(
collection
.
partitions
),
nil
}
func
(
colReplica
*
collectionReplicaImpl
)
addPartition
(
collectionID
UniqueID
,
partitionTag
string
)
error
{
collection
,
err
:=
colReplica
.
getCollectionByID
(
collectionID
)
if
err
!=
nil
{
...
...
@@ -212,61 +182,6 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID,
return
nil
}
func
(
colReplica
*
collectionReplicaImpl
)
addPartitionsByCollectionMeta
(
colMeta
*
etcdpb
.
CollectionMeta
)
error
{
if
!
colReplica
.
hasCollection
(
colMeta
.
ID
)
{
err
:=
errors
.
New
(
"Cannot find collection, id = "
+
strconv
.
FormatInt
(
colMeta
.
ID
,
10
))
return
err
}
pToAdd
:=
make
([]
string
,
0
)
for
_
,
partitionTag
:=
range
colMeta
.
PartitionTags
{
if
!
colReplica
.
hasPartition
(
colMeta
.
ID
,
partitionTag
)
{
pToAdd
=
append
(
pToAdd
,
partitionTag
)
}
}
for
_
,
tag
:=
range
pToAdd
{
err
:=
colReplica
.
addPartition
(
colMeta
.
ID
,
tag
)
if
err
!=
nil
{
log
.
Println
(
err
)
}
}
return
nil
}
func
(
colReplica
*
collectionReplicaImpl
)
removePartitionsByCollectionMeta
(
colMeta
*
etcdpb
.
CollectionMeta
)
error
{
col
,
err
:=
colReplica
.
getCollectionByID
(
colMeta
.
ID
)
if
err
!=
nil
{
return
err
}
colReplica
.
mu
.
Lock
()
pToDel
:=
make
([]
string
,
0
)
for
_
,
partition
:=
range
col
.
partitions
{
hasPartition
:=
false
for
_
,
tag
:=
range
colMeta
.
PartitionTags
{
if
partition
.
partitionTag
==
tag
{
hasPartition
=
true
}
}
if
!
hasPartition
{
pToDel
=
append
(
pToDel
,
partition
.
partitionTag
)
}
}
colReplica
.
mu
.
Unlock
()
for
_
,
tag
:=
range
pToDel
{
err
:=
colReplica
.
removePartition
(
col
.
ID
(),
tag
)
if
err
!=
nil
{
log
.
Println
(
err
)
}
}
return
nil
}
func
(
colReplica
*
collectionReplicaImpl
)
getPartitionByTag
(
collectionID
UniqueID
,
partitionTag
string
)
(
*
Partition
,
error
)
{
collection
,
err
:=
colReplica
.
getCollectionByID
(
collectionID
)
if
err
!=
nil
{
...
...
@@ -285,25 +200,6 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID
return
nil
,
errors
.
New
(
"cannot find partition, tag = "
+
partitionTag
)
}
func
(
colReplica
*
collectionReplicaImpl
)
hasPartition
(
collectionID
UniqueID
,
partitionTag
string
)
bool
{
collection
,
err
:=
colReplica
.
getCollectionByID
(
collectionID
)
if
err
!=
nil
{
log
.
Println
(
err
)
return
false
}
colReplica
.
mu
.
RLock
()
defer
colReplica
.
mu
.
RUnlock
()
for
_
,
p
:=
range
*
collection
.
Partitions
()
{
if
p
.
Tag
()
==
partitionTag
{
return
true
}
}
return
false
}
//----------------------------------------------------------------------------------------------------- segment
func
(
colReplica
*
collectionReplicaImpl
)
getSegmentNum
()
int
{
colReplica
.
mu
.
RLock
()
...
...
@@ -313,9 +209,6 @@ func (colReplica *collectionReplicaImpl) getSegmentNum() int {
}
func
(
colReplica
*
collectionReplicaImpl
)
getSegmentStatistics
()
*
internalpb
.
QueryNodeSegStats
{
colReplica
.
mu
.
RLock
()
defer
colReplica
.
mu
.
RUnlock
()
var
statisticData
=
make
([]
*
internalpb
.
SegmentStats
,
0
)
for
segmentID
,
segment
:=
range
colReplica
.
segments
{
...
...
@@ -413,9 +306,6 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
//-----------------------------------------------------------------------------------------------------
func
(
colReplica
*
collectionReplicaImpl
)
freeAll
()
{
colReplica
.
mu
.
Lock
()
defer
colReplica
.
mu
.
Unlock
()
for
_
,
seg
:=
range
colReplica
.
segments
{
deleteSegment
(
seg
)
}
...
...
internal/reader/collection_replica_test.go
浏览文件 @
d1f31c7b
...
...
@@ -13,60 +13,7 @@ import (
)
//----------------------------------------------------------------------------------------------------- collection
func
TestCollectionReplica_getCollectionNum
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
collectionName
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
UniqueID
(
0
),
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"default"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_addCollection
(
t
*
testing
.
T
)
{
func
TestColSegContainer_addCollection
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -119,11 +66,9 @@ func TestCollectionReplica_addCollection(t *testing.T) {
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
UniqueID
(
0
))
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_removeCollection
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_removeCollection
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -182,11 +127,9 @@ func TestCollectionReplica_removeCollection(t *testing.T) {
err
=
(
*
node
.
replica
)
.
removeCollection
(
collectionID
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
0
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_getCollectionByID
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_getCollectionByID
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -246,11 +189,9 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
assert
.
NotNil
(
t
,
targetCollection
)
assert
.
Equal
(
t
,
targetCollection
.
meta
.
Schema
.
Name
,
"collection0"
)
assert
.
Equal
(
t
,
targetCollection
.
meta
.
ID
,
UniqueID
(
0
))
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_getCollectionByName
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_getCollectionByName
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -310,68 +251,10 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) {
assert
.
NotNil
(
t
,
targetCollection
)
assert
.
Equal
(
t
,
targetCollection
.
meta
.
Schema
.
Name
,
"collection0"
)
assert
.
Equal
(
t
,
targetCollection
.
meta
.
ID
,
UniqueID
(
0
))
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_hasCollection
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
collectionName
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
UniqueID
(
0
),
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"default"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
hasCollection
:=
(
*
node
.
replica
)
.
hasCollection
(
UniqueID
(
0
))
assert
.
Equal
(
t
,
hasCollection
,
true
)
hasCollection
=
(
*
node
.
replica
)
.
hasCollection
(
UniqueID
(
1
))
assert
.
Equal
(
t
,
hasCollection
,
false
)
(
*
node
.
replica
)
.
freeAll
()
}
//----------------------------------------------------------------------------------------------------- partition
func
TestCol
lectionReplica_getPartitionNum
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer_addPartition
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -434,82 +317,9 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partition
.
partitionTag
,
"default"
)
}
partitionNum
,
err
:=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
1
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_addPartition
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
collectionID
:=
UniqueID
(
0
)
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
"collection0"
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"default"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
collection
,
err
:=
(
*
node
.
replica
)
.
getCollectionByName
(
collectionName
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
collectionID
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
for
_
,
tag
:=
range
collectionMeta
.
PartitionTags
{
err
:=
(
*
node
.
replica
)
.
addPartition
(
collectionID
,
tag
)
assert
.
NoError
(
t
,
err
)
partition
,
err
:=
(
*
node
.
replica
)
.
getPartitionByTag
(
collectionID
,
tag
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partition
.
partitionTag
,
"default"
)
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_removePartition
(
t
*
testing
.
T
)
{
func
TestColSegContainer_removePartition
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -575,157 +385,9 @@ func TestCollectionReplica_removePartition(t *testing.T) {
err
=
(
*
node
.
replica
)
.
removePartition
(
collectionID
,
partitionTag
)
assert
.
NoError
(
t
,
err
)
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_addPartitionsByCollectionMeta
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
collectionID
:=
UniqueID
(
0
)
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
"collection0"
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"p0"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
collection
,
err
:=
(
*
node
.
replica
)
.
getCollectionByName
(
collectionName
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
collectionID
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
collectionMeta
.
PartitionTags
=
[]
string
{
"p0"
,
"p1"
,
"p2"
}
err
=
(
*
node
.
replica
)
.
addPartitionsByCollectionMeta
(
&
collectionMeta
)
assert
.
NoError
(
t
,
err
)
partitionNum
,
err
:=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
3
)
hasPartition
:=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_removePartitionsByCollectionMeta
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
collectionID
:=
UniqueID
(
0
)
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
"collection0"
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"p0"
,
"p1"
,
"p2"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
collection
,
err
:=
(
*
node
.
replica
)
.
getCollectionByName
(
collectionName
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
collectionID
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
collectionMeta
.
PartitionTags
=
[]
string
{
"p0"
}
err
=
(
*
node
.
replica
)
.
addPartitionsByCollectionMeta
(
&
collectionMeta
)
assert
.
NoError
(
t
,
err
)
partitionNum
,
err
:=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
1
)
hasPartition
:=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_getPartitionByTag
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_getPartitionByTag
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -789,78 +451,10 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
assert
.
Equal
(
t
,
partition
.
partitionTag
,
"default"
)
assert
.
NotNil
(
t
,
partition
)
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_hasPartition
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
collectionID
:=
UniqueID
(
0
)
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
"collection0"
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"default"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
collection
,
err
:=
(
*
node
.
replica
)
.
getCollectionByName
(
collectionName
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
collectionID
)
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
err
=
(
*
node
.
replica
)
.
addPartition
(
collectionID
,
collectionMeta
.
PartitionTags
[
0
])
assert
.
NoError
(
t
,
err
)
hasPartition
:=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"default"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"default1"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
(
*
node
.
replica
)
.
freeAll
()
}
//----------------------------------------------------------------------------------------------------- segment
func
TestCol
lectionReplica
_addSegment
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_addSegment
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -927,11 +521,9 @@ func TestCollectionReplica_addSegment(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
targetSeg
.
segmentID
,
UniqueID
(
i
))
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_removeSegment
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_removeSegment
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -1000,11 +592,9 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
err
=
(
*
node
.
replica
)
.
removeSegment
(
UniqueID
(
i
))
assert
.
NoError
(
t
,
err
)
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_getSegmentByID
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_getSegmentByID
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -1071,11 +661,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
targetSeg
.
segmentID
,
UniqueID
(
i
))
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCol
lectionReplica
_hasSegment
(
t
*
testing
.
T
)
{
func
TestCol
SegContainer
_hasSegment
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
...
...
@@ -1146,81 +734,4 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
hasSeg
=
(
*
node
.
replica
)
.
hasSegment
(
UniqueID
(
i
+
100
))
assert
.
Equal
(
t
,
hasSeg
,
false
)
}
(
*
node
.
replica
)
.
freeAll
()
}
func
TestCollectionReplica_freeAll
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
node
:=
NewQueryNode
(
ctx
,
0
)
collectionName
:=
"collection0"
collectionID
:=
UniqueID
(
0
)
fieldVec
:=
schemapb
.
FieldSchema
{
Name
:
"vec"
,
DataType
:
schemapb
.
DataType_VECTOR_FLOAT
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"16"
,
},
},
}
fieldInt
:=
schemapb
.
FieldSchema
{
Name
:
"age"
,
DataType
:
schemapb
.
DataType_INT32
,
TypeParams
:
[]
*
commonpb
.
KeyValuePair
{
{
Key
:
"dim"
,
Value
:
"1"
,
},
},
}
schema
:=
schemapb
.
CollectionSchema
{
Name
:
"collection0"
,
Fields
:
[]
*
schemapb
.
FieldSchema
{
&
fieldVec
,
&
fieldInt
,
},
}
collectionMeta
:=
etcdpb
.
CollectionMeta
{
ID
:
collectionID
,
Schema
:
&
schema
,
CreateTime
:
Timestamp
(
0
),
SegmentIDs
:
[]
UniqueID
{
0
},
PartitionTags
:
[]
string
{
"default"
},
}
collectionMetaBlob
:=
proto
.
MarshalTextString
(
&
collectionMeta
)
assert
.
NotEqual
(
t
,
""
,
collectionMetaBlob
)
var
err
=
(
*
node
.
replica
)
.
addCollection
(
&
collectionMeta
,
collectionMetaBlob
)
assert
.
NoError
(
t
,
err
)
collection
,
err
:=
(
*
node
.
replica
)
.
getCollectionByName
(
collectionName
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
meta
.
Schema
.
Name
,
collectionName
)
assert
.
Equal
(
t
,
collection
.
meta
.
ID
,
UniqueID
(
0
))
assert
.
Equal
(
t
,
(
*
node
.
replica
)
.
getCollectionNum
(),
1
)
err
=
(
*
node
.
replica
)
.
addPartition
(
collectionID
,
collectionMeta
.
PartitionTags
[
0
])
assert
.
NoError
(
t
,
err
)
const
segmentNum
=
3
for
i
:=
0
;
i
<
segmentNum
;
i
++
{
err
:=
(
*
node
.
replica
)
.
addSegment
(
UniqueID
(
i
),
collectionMeta
.
PartitionTags
[
0
],
collectionID
)
assert
.
NoError
(
t
,
err
)
targetSeg
,
err
:=
(
*
node
.
replica
)
.
getSegmentByID
(
UniqueID
(
i
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
targetSeg
.
segmentID
,
UniqueID
(
i
))
hasSeg
:=
(
*
node
.
replica
)
.
hasSegment
(
UniqueID
(
i
))
assert
.
Equal
(
t
,
hasSeg
,
true
)
hasSeg
=
(
*
node
.
replica
)
.
hasSegment
(
UniqueID
(
i
+
100
))
assert
.
Equal
(
t
,
hasSeg
,
false
)
}
(
*
node
.
replica
)
.
freeAll
()
}
internal/reader/meta_service.go
浏览文件 @
d1f31c7b
...
...
@@ -214,18 +214,6 @@ func (mService *metaService) processSegmentModify(id string, value string) {
func
(
mService
*
metaService
)
processCollectionModify
(
id
string
,
value
string
)
{
println
(
"Modify Collection: "
,
id
)
col
:=
mService
.
collectionUnmarshal
(
value
)
if
col
!=
nil
{
err
:=
(
*
mService
.
replica
)
.
addPartitionsByCollectionMeta
(
col
)
if
err
!=
nil
{
log
.
Println
(
err
)
}
err
=
(
*
mService
.
replica
)
.
removePartitionsByCollectionMeta
(
col
)
if
err
!=
nil
{
log
.
Println
(
err
)
}
}
}
func
(
mService
*
metaService
)
processModify
(
key
string
,
msg
string
)
{
...
...
internal/reader/meta_service_test.go
浏览文件 @
d1f31c7b
...
...
@@ -452,9 +452,7 @@ func TestMetaService_processCollectionModify(t *testing.T) {
>
>
segmentIDs: 0
partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "default"
`
(
*
node
.
metaService
)
.
processCollectionCreate
(
id
,
value
)
...
...
@@ -465,19 +463,7 @@ func TestMetaService_processCollectionModify(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
ID
(),
UniqueID
(
0
))
partitionNum
,
err
:=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
3
)
hasPartition
:=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p3"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
// TODO: use different index for testing processCollectionModify
newValue
:=
`schema: <
name: "test"
fields: <
...
...
@@ -498,28 +484,13 @@ func TestMetaService_processCollectionModify(t *testing.T) {
>
>
segmentIDs: 0
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
partition_tags: "default"
`
(
*
node
.
metaService
)
.
processCollectionModify
(
id
,
newValue
)
collection
,
err
=
(
*
node
.
replica
)
.
getCollectionByName
(
"test"
)
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
ID
(),
UniqueID
(
0
))
partitionNum
,
err
=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
3
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p3"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
}
func
TestMetaService_processModify
(
t
*
testing
.
T
)
{
...
...
@@ -552,9 +523,7 @@ func TestMetaService_processModify(t *testing.T) {
>
>
segmentIDs: 0
partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "default"
`
(
*
node
.
metaService
)
.
processCreate
(
key1
,
msg1
)
...
...
@@ -565,21 +534,8 @@ func TestMetaService_processModify(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
ID
(),
UniqueID
(
0
))
partitionNum
,
err
:=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
3
)
hasPartition
:=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p3"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
key2
:=
"by-dev/segment/0"
msg2
:=
`partition_tag: "
p1
"
msg2
:=
`partition_tag: "
default
"
channel_start: 0
channel_end: 128
close_time: 18446744073709551615
...
...
@@ -612,9 +568,7 @@ func TestMetaService_processModify(t *testing.T) {
>
>
segmentIDs: 0
partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
partition_tags: "default"
`
(
*
node
.
metaService
)
.
processModify
(
key1
,
msg3
)
...
...
@@ -622,25 +576,13 @@ func TestMetaService_processModify(t *testing.T) {
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
collection
.
ID
(),
UniqueID
(
0
))
partitionNum
,
err
=
(
*
node
.
replica
)
.
getPartitionNum
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
assert
.
Equal
(
t
,
partitionNum
,
3
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p0"
)
assert
.
Equal
(
t
,
hasPartition
,
false
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p1"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p2"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
hasPartition
=
(
*
node
.
replica
)
.
hasPartition
(
UniqueID
(
0
),
"p3"
)
assert
.
Equal
(
t
,
hasPartition
,
true
)
msg4
:=
`partition_tag: "p1"
msg4
:=
`partition_tag: "default"
channel_start: 0
channel_end: 128
close_time: 18446744073709551615
`
// TODO: modify segment for testing processCollectionModify
(
*
node
.
metaService
)
.
processModify
(
key2
,
msg4
)
seg
,
err
:=
(
*
node
.
replica
)
.
getSegmentByID
(
UniqueID
(
0
))
assert
.
NoError
(
t
,
err
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录