未验证 提交 596357dc 编写于 作者: D dragondriver 提交者: GitHub

Add more data definition test cases (#7623)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 2943ba5b
......@@ -22,7 +22,7 @@ import (
func TestChannelsMgrImpl_getChannels(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -40,7 +40,7 @@ func TestChannelsMgrImpl_getChannels(t *testing.T) {
func TestChannelsMgrImpl_getVChannels(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -58,7 +58,7 @@ func TestChannelsMgrImpl_getVChannels(t *testing.T) {
func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -80,7 +80,7 @@ func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -98,7 +98,7 @@ func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -125,7 +125,7 @@ func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -140,7 +140,7 @@ func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -153,7 +153,7 @@ func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -171,7 +171,7 @@ func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......@@ -198,7 +198,7 @@ func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) {
func TestChannelsMgrImpl_removeAllDQLMsgStream(t *testing.T) {
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory)
defer mgr.removeAllDMLStream()
......
......@@ -121,7 +121,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
cct := &CreateCollectionTask{
cct := &createCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: request,
......@@ -174,7 +174,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
dct := &DropCollectionTask{
dct := &dropCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropCollectionRequest: request,
......@@ -228,7 +228,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle
Status: unhealthyStatus(),
}, nil
}
hct := &HasCollectionTask{
hct := &hasCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasCollectionRequest: request,
......@@ -282,7 +282,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
lct := &LoadCollectionTask{
lct := &loadCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadCollectionRequest: request,
......@@ -332,7 +332,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
rct := &ReleaseCollectionTask{
rct := &releaseCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleaseCollectionRequest: request,
......@@ -385,7 +385,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des
Status: unhealthyStatus(),
}, nil
}
dct := &DescribeCollectionTask{
dct := &describeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
......@@ -441,7 +441,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp
Status: unhealthyStatus(),
}, nil
}
g := &GetCollectionStatisticsTask{
g := &getCollectionStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
GetCollectionStatisticsRequest: request,
......@@ -497,7 +497,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo
Status: unhealthyStatus(),
}, nil
}
sct := &ShowCollectionsTask{
sct := &showCollectionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowCollectionsRequest: request,
......@@ -520,7 +520,10 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo
log.Debug("ShowCollections",
zap.String("role", Params.RoleName),
zap.Any("request", request))
zap.String("DbName", sct.ShowCollectionsRequest.DbName),
zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
)
defer func() {
log.Debug("ShowCollections Done",
zap.Error(err),
......@@ -546,7 +549,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
cpt := &CreatePartitionTask{
cpt := &createPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: request,
......@@ -599,7 +602,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
dpt := &DropPartitionTask{
dpt := &dropPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropPartitionRequest: request,
......@@ -654,7 +657,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit
Status: unhealthyStatus(),
}, nil
}
hpt := &HasPartitionTask{
hpt := &hasPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasPartitionRequest: request,
......@@ -713,7 +716,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
lpt := &LoadPartitionTask{
lpt := &loadPartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadPartitionsRequest: request,
......@@ -766,7 +769,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
rpt := &ReleasePartitionTask{
rpt := &releasePartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleasePartitionsRequest: request,
......@@ -821,7 +824,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb
Status: unhealthyStatus(),
}, nil
}
g := &GetPartitionStatisticsTask{
g := &getPartitionStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
GetPartitionStatisticsRequest: request,
......@@ -880,7 +883,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar
Status: unhealthyStatus(),
}, nil
}
spt := &ShowPartitionsTask{
spt := &showPartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowPartitionsRequest: request,
......@@ -904,7 +907,10 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar
log.Debug("ShowPartitions",
zap.String("role", Params.RoleName),
zap.Any("request", request))
zap.String("db", spt.ShowPartitionsRequest.DbName),
zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames),
)
defer func() {
log.Debug("ShowPartitions Done",
zap.Error(err),
......
......@@ -17,6 +17,8 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
......@@ -243,3 +245,111 @@ func newMockDqlTask(ctx context.Context) *mockDqlTask {
func newDefaultMockDqlTask() *mockDqlTask {
return newMockDqlTask(context.Background())
}
type simpleMockMsgStream struct {
msgChan chan *msgstream.MsgPack
msgCount int
msgCountMtx sync.RWMutex
}
func (ms *simpleMockMsgStream) Start() {
}
func (ms *simpleMockMsgStream) Close() {
}
func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
return ms.msgChan
}
func (ms *simpleMockMsgStream) AsProducer(channels []string) {
}
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
return nil
}
func (ms *simpleMockMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {
}
func (ms *simpleMockMsgStream) getMsgCount() int {
ms.msgCountMtx.RLock()
defer ms.msgCountMtx.RUnlock()
return ms.msgCount
}
func (ms *simpleMockMsgStream) increaseMsgCount(delta int) {
ms.msgCountMtx.Lock()
defer ms.msgCountMtx.Unlock()
ms.msgCount += delta
}
func (ms *simpleMockMsgStream) decreaseMsgCount(delta int) {
ms.increaseMsgCount(-delta)
}
func (ms *simpleMockMsgStream) Produce(pack *msgstream.MsgPack) error {
defer ms.increaseMsgCount(1)
ms.msgChan <- pack
return nil
}
func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) error {
return nil
}
func (ms *simpleMockMsgStream) GetProduceChannels() []string {
return nil
}
func (ms *simpleMockMsgStream) Consume() *msgstream.MsgPack {
if ms.getMsgCount() <= 0 {
return nil
}
defer ms.decreaseMsgCount(1)
return <-ms.msgChan
}
func (ms *simpleMockMsgStream) Seek(offset []*msgstream.MsgPosition) error {
return nil
}
func newSimpleMockMsgStream() *simpleMockMsgStream {
return &simpleMockMsgStream{
msgChan: make(chan *msgstream.MsgPack, 1024),
msgCount: 0,
}
}
type simpleMockMsgStreamFactory struct {
}
func (factory *simpleMockMsgStreamFactory) SetParams(params map[string]interface{}) error {
return nil
}
func (factory *simpleMockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return newSimpleMockMsgStream(), nil
}
func (factory *simpleMockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return newSimpleMockMsgStream(), nil
}
func (factory *simpleMockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return newSimpleMockMsgStream(), nil
}
func newSimpleMockMsgStreamFactory() *simpleMockMsgStreamFactory {
return &simpleMockMsgStreamFactory{}
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package proxy
import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/msgstream"
)
type SimpleMsgStream struct {
msgChan chan *msgstream.MsgPack
msgCount int
msgCountMtx sync.RWMutex
}
func (ms *SimpleMsgStream) Start() {
}
func (ms *SimpleMsgStream) Close() {
}
func (ms *SimpleMsgStream) Chan() <-chan *msgstream.MsgPack {
return ms.msgChan
}
func (ms *SimpleMsgStream) AsProducer(channels []string) {
}
func (ms *SimpleMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *SimpleMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
return nil
}
func (ms *SimpleMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {
}
func (ms *SimpleMsgStream) getMsgCount() int {
ms.msgCountMtx.RLock()
defer ms.msgCountMtx.RUnlock()
return ms.msgCount
}
func (ms *SimpleMsgStream) increaseMsgCount(delta int) {
ms.msgCountMtx.Lock()
defer ms.msgCountMtx.Unlock()
ms.msgCount += delta
}
func (ms *SimpleMsgStream) decreaseMsgCount(delta int) {
ms.increaseMsgCount(-delta)
}
func (ms *SimpleMsgStream) Produce(pack *msgstream.MsgPack) error {
defer ms.increaseMsgCount(1)
ms.msgChan <- pack
return nil
}
func (ms *SimpleMsgStream) Broadcast(pack *msgstream.MsgPack) error {
return nil
}
func (ms *SimpleMsgStream) GetProduceChannels() []string {
return nil
}
func (ms *SimpleMsgStream) Consume() *msgstream.MsgPack {
if ms.getMsgCount() <= 0 {
return nil
}
defer ms.decreaseMsgCount(1)
return <-ms.msgChan
}
func (ms *SimpleMsgStream) Seek(offset []*msgstream.MsgPosition) error {
return nil
}
func NewSimpleMsgStream() *SimpleMsgStream {
return &SimpleMsgStream{
msgChan: make(chan *msgstream.MsgPack, 1024),
msgCount: 0,
}
}
type SimpleMsgStreamFactory struct {
}
func (factory *SimpleMsgStreamFactory) SetParams(params map[string]interface{}) error {
return nil
}
func (factory *SimpleMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return NewSimpleMsgStream(), nil
}
func (factory *SimpleMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return NewSimpleMsgStream(), nil
}
func (factory *SimpleMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return NewSimpleMsgStream(), nil
}
func NewSimpleMsgStreamFactory() *SimpleMsgStreamFactory {
return &SimpleMsgStreamFactory{}
}
......@@ -459,6 +459,9 @@ func TestProxy(t *testing.T) {
prefix := "test_proxy_"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
otherCollectionName := collectionName + funcutil.GenRandomStr()
partitionName := prefix + funcutil.GenRandomStr()
otherPartitionName := partitionName + funcutil.GenRandomStr()
shardsNum := int32(2)
int64Field := "int64"
floatVecField := "fVec"
......@@ -501,9 +504,9 @@ func TestProxy(t *testing.T) {
},
}
}
schema := constructCollectionSchema()
constructCreateCollectionRequest := func() *milvuspb.CreateCollectionRequest {
schema := constructCollectionSchema()
bs, err := proto.Marshal(schema)
assert.NoError(t, err)
return &milvuspb.CreateCollectionRequest{
......@@ -514,12 +517,512 @@ func TestProxy(t *testing.T) {
ShardsNum: shardsNum,
}
}
createCollectionReq := constructCreateCollectionRequest()
t.Run("create collection", func(t *testing.T) {
req := constructCreateCollectionRequest()
req := createCollectionReq
resp, err := proxy.CreateCollection(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// recreate -> fail
req2 := constructCreateCollectionRequest()
resp, err = proxy.CreateCollection(ctx, req2)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("has collection", func(t *testing.T) {
resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
TimeStamp: 0,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.True(t, resp.Value)
// has other collection: false
resp, err = proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
TimeStamp: 0,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.False(t, resp.Value)
})
t.Run("load collection", func(t *testing.T) {
resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// load other collection -> fail
resp, err = proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("show in-memory collections", func(t *testing.T) {
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_InMemory,
CollectionNames: nil,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 1, len(resp.CollectionNames))
// get in-memory percentage
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_InMemory,
CollectionNames: []string{collectionName},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 1, len(resp.CollectionNames))
assert.Equal(t, 1, len(resp.InMemoryPercentages))
// get in-memory percentage of not loaded collection -> fail
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_InMemory,
CollectionNames: []string{otherCollectionName},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("describe collection", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
TimeStamp: 0,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, collectionID, resp.CollectionID)
// TODO(dragondriver): shards num
assert.Equal(t, len(schema.Fields), len(resp.Schema.Fields))
// TODO(dragondriver): compare fields schema, not sure the order of fields
// describe other collection -> fail
resp, err = proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
CollectionID: collectionID,
TimeStamp: 0,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("get collection statistics", func(t *testing.T) {
resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// TODO(dragondriver): check num rows
// get statistics of other collection -> fail
resp, err = proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("show collections", func(t *testing.T) {
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_All,
CollectionNames: nil,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 1, len(resp.CollectionNames))
})
t.Run("create partition", func(t *testing.T) {
resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// recreate -> fail
resp, err = proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// create partition with non-exist collection -> fail
resp, err = proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("has partition", func(t *testing.T) {
resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.True(t, resp.Value)
resp, err = proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: otherPartitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.False(t, resp.Value)
// non-exist collection -> fail
resp, err = proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("load partitions", func(t *testing.T) {
resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionNames: []string{partitionName},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// non-exist partition -> fail
resp, err = proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionNames: []string{otherPartitionName},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// non-exist collection-> fail
resp, err = proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
PartitionNames: []string{partitionName},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("show in-memory partitions", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: nil,
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// default partition?
assert.Equal(t, 1, len(resp.PartitionNames))
// show partition not in-memory -> fail
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: []string{otherPartitionName},
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// non-exist collection -> fail
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
CollectionID: collectionID,
PartitionNames: []string{partitionName},
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("get partition statistics", func(t *testing.T) {
resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// non-exist partition -> fail
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: otherPartitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// non-exist collection -> fail
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("show partitions", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: nil,
Type: milvuspb.ShowType_All,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// default partition
assert.Equal(t, 2, len(resp.PartitionNames))
// non-exist collection -> fail
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
CollectionID: collectionID + 1,
PartitionNames: nil,
Type: milvuspb.ShowType_All,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("release partition", func(t *testing.T) {
resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionNames: []string{partitionName},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("show in-memory partitions after release partition", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: nil,
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// default partition
assert.Equal(t, 1, len(resp.PartitionNames))
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: []string{partitionName}, // released
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("drop partition", func(t *testing.T) {
resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// invalidate meta cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// drop non-exist partition -> fail
resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: otherCollectionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: otherCollectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("has partition after drop partition", func(t *testing.T) {
resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.False(t, resp.Value)
})
t.Run("show partitions after drop partition", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
PartitionNames: nil,
Type: milvuspb.ShowType_All,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// default partition
assert.Equal(t, 1, len(resp.PartitionNames))
})
t.Run("release collection", func(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// release dql message stream
resp, err = proxy.ReleaseDQLMessageStream(ctx, &proxypb.ReleaseDQLMessageStreamRequest{
Base: nil,
DbID: 0,
CollectionID: collectionID,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("show in-memory collections after release", func(t *testing.T) {
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_InMemory,
CollectionNames: nil,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 0, len(resp.CollectionNames))
})
t.Run("drop collection", func(t *testing.T) {
......@@ -552,5 +1055,30 @@ func TestProxy(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("has collection after drop collection", func(t *testing.T) {
resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
TimeStamp: 0,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.False(t, resp.Value)
})
t.Run("show all collections after drop collection", func(t *testing.T) {
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: nil,
DbName: dbName,
TimeStamp: 0,
Type: milvuspb.ShowType_All,
CollectionNames: nil,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, 0, len(resp.CollectionNames))
})
cancel()
}
此差异已折叠。
......@@ -455,7 +455,7 @@ func TestTaskScheduler(t *testing.T) {
ctx := context.Background()
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
factory := NewSimpleMsgStreamFactory()
factory := newSimpleMockMsgStreamFactory()
sched, err := newTaskScheduler(ctx, idAllocatorIns, tsoAllocatorIns, factory)
assert.NoError(t, err)
......
......@@ -103,10 +103,11 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
}
func (nodeCtx *nodeCtx) Close() {
for _, channel := range nodeCtx.inputChannels {
close(channel)
log.Warn("close inputChannel")
}
// data race with nodeCtx.ReceiveMsg { nodeCtx.inputChannels[inputChanIdx] <- msg }
//for _, channel := range nodeCtx.inputChannels {
// close(channel)
// log.Warn("close inputChannel")
//}
nodeCtx.node.Close()
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册