提交 a250eb37 编写于 作者: N neza2017 提交者: yefu.chen

Check request type and add unittest

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 6e70ce3f
......@@ -2,7 +2,6 @@ package masterservice
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
......@@ -147,67 +146,67 @@ func (c *Core) UpdateStateCode(code internalpb.StateCode) {
func (c *Core) checkInit() error {
if c.MetaTable == nil {
return errors.New("MetaTable is nil")
return fmt.Errorf("MetaTable is nil")
}
if c.idAllocator == nil {
return errors.New("idAllocator is nil")
return fmt.Errorf("idAllocator is nil")
}
if c.tsoAllocator == nil {
return errors.New("tsoAllocator is nil")
return fmt.Errorf("tsoAllocator is nil")
}
if c.etcdCli == nil {
return errors.New("etcdCli is nil")
return fmt.Errorf("etcdCli is nil")
}
if c.metaKV == nil {
return errors.New("metaKV is nil")
return fmt.Errorf("metaKV is nil")
}
if c.kvBase == nil {
return errors.New("kvBase is nil")
return fmt.Errorf("kvBase is nil")
}
if c.ProxyTimeTickChan == nil {
return errors.New("ProxyTimeTickChan is nil")
return fmt.Errorf("ProxyTimeTickChan is nil")
}
if c.ddReqQueue == nil {
return errors.New("ddReqQueue is nil")
return fmt.Errorf("ddReqQueue is nil")
}
if c.DdCreateCollectionReq == nil {
return errors.New("DdCreateCollectionReq is nil")
return fmt.Errorf("DdCreateCollectionReq is nil")
}
if c.DdDropCollectionReq == nil {
return errors.New("DdDropCollectionReq is nil")
return fmt.Errorf("DdDropCollectionReq is nil")
}
if c.DdCreatePartitionReq == nil {
return errors.New("DdCreatePartitionReq is nil")
return fmt.Errorf("DdCreatePartitionReq is nil")
}
if c.DdDropPartitionReq == nil {
return errors.New("DdDropPartitionReq is nil")
return fmt.Errorf("DdDropPartitionReq is nil")
}
if c.DataServiceSegmentChan == nil {
return errors.New("DataServiceSegmentChan is nil")
return fmt.Errorf("DataServiceSegmentChan is nil")
}
if c.GetBinlogFilePathsFromDataServiceReq == nil {
return errors.New("GetBinlogFilePathsFromDataServiceReq is nil")
return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil")
}
if c.GetNumRowsReq == nil {
return errors.New("GetNumRowsReq is nil")
return fmt.Errorf("GetNumRowsReq is nil")
}
if c.BuildIndexReq == nil {
return errors.New("BuildIndexReq is nil")
return fmt.Errorf("BuildIndexReq is nil")
}
if c.DropIndexReq == nil {
return errors.New("DropIndexReq is nil")
return fmt.Errorf("DropIndexReq is nil")
}
if c.InvalidateCollectionMetaCache == nil {
return errors.New("InvalidateCollectionMetaCache is nil")
return fmt.Errorf("InvalidateCollectionMetaCache is nil")
}
if c.indexTaskQueue == nil {
return errors.New("indexTaskQueue is nil")
return fmt.Errorf("indexTaskQueue is nil")
}
if c.DataNodeSegmentFlushCompletedChan == nil {
return errors.New("DataNodeSegmentFlushCompletedChan is nil")
return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil")
}
if c.ReleaseCollection == nil {
return errors.New("ReleaseCollection is nil")
return fmt.Errorf("ReleaseCollection is nil")
}
log.Debug("master", zap.Int64("node id", int64(Params.NodeID)))
......@@ -383,15 +382,15 @@ func (c *Core) tsLoop() {
}
func (c *Core) setMsgStreams() error {
if Params.PulsarAddress == "" {
return errors.New("PulsarAddress is empty")
return fmt.Errorf("PulsarAddress is empty")
}
if Params.MsgChannelSubName == "" {
return errors.New("MsgChannelSubName is emptyr")
return fmt.Errorf("MsgChannelSubName is emptyr")
}
//proxy time tick stream,
if Params.ProxyTimeTickChannel == "" {
return errors.New("ProxyTimeTickChannel is empty")
return fmt.Errorf("ProxyTimeTickChannel is empty")
}
var err error
......@@ -411,7 +410,7 @@ func (c *Core) setMsgStreams() error {
// master time tick channel
if Params.TimeTickChannel == "" {
return errors.New("TimeTickChannel is empty")
return fmt.Errorf("TimeTickChannel is empty")
}
timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
timeTickStream.AsProducer([]string{Params.TimeTickChannel})
......@@ -419,7 +418,7 @@ func (c *Core) setMsgStreams() error {
// master dd channel
if Params.DdChannel == "" {
return errors.New("DdChannel is empty")
return fmt.Errorf("DdChannel is empty")
}
ddStream, _ := c.msFactory.NewMsgStream(c.ctx)
ddStream.AsProducer([]string{Params.DdChannel})
......@@ -557,7 +556,7 @@ func (c *Core) setMsgStreams() error {
//segment channel, data service create segment,or data node flush segment will put msg in this channel
if Params.DataServiceSegmentChannel == "" {
return errors.New("DataServiceSegmentChannel is empty")
return fmt.Errorf("DataServiceSegmentChannel is empty")
}
dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx)
dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
......@@ -618,10 +617,10 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error
CollectionName: collectionName,
})
if status == nil {
return errors.New("invalidate collection metacache resp is nil")
return fmt.Errorf("invalidate collection metacache resp is nil")
}
if status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(status.Reason)
return fmt.Errorf(status.Reason)
}
return nil
}
......@@ -723,7 +722,7 @@ func (c *Core) SetIndexService(s types.IndexService) error {
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(rsp.Reason)
return fmt.Errorf(rsp.Reason)
}
return nil
}
......@@ -883,7 +882,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
t := &CreateCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -916,7 +915,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
t := &DropCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -952,7 +951,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
t := &HasCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -996,7 +995,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
t := &DescribeCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1037,7 +1036,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
t := &ShowCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1077,7 +1076,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
t := &CreatePartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1110,7 +1109,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
t := &DropPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1146,7 +1145,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
t := &HasPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1193,7 +1192,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
t := &ShowPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1236,7 +1235,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1272,7 +1271,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1324,7 +1323,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
t := &DropIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1360,7 +1359,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1404,7 +1403,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error),
cv: make(chan error, 1),
core: c,
},
Req: in,
......
......@@ -15,6 +15,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
......@@ -535,7 +536,7 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err)
req := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: 160,
Timestamp: 160,
SourceID: 160,
......@@ -953,7 +954,525 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, collArray[2], "testColl")
})
t.Run("context_cancel", func(t *testing.T) {
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel2()
time.Sleep(time.Millisecond * 150)
st, err := core.CreateCollection(ctx2, &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: 1000,
Timestamp: 1000,
SourceID: 1000,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropCollection(ctx2, &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
MsgID: 1001,
Timestamp: 1001,
SourceID: 1001,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp1, err := core.HasCollection(ctx2, &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
MsgID: 1002,
Timestamp: 1002,
SourceID: 1002,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp2, err := core.DescribeCollection(ctx2, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 1003,
Timestamp: 1003,
SourceID: 1003,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp3, err := core.ShowCollections(ctx2, &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: 1004,
Timestamp: 1004,
SourceID: 1004,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreatePartition(ctx2, &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: 1005,
Timestamp: 1005,
SourceID: 1005,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropPartition(ctx2, &milvuspb.DropPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPartition,
MsgID: 1006,
Timestamp: 1006,
SourceID: 1006,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp4, err := core.HasPartition(ctx2, &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasPartition,
MsgID: 1007,
Timestamp: 1007,
SourceID: 1007,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp5, err := core.ShowPartitions(ctx2, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: 1008,
Timestamp: 1008,
SourceID: 1008,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreateIndex(ctx2, &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateIndex,
MsgID: 1009,
Timestamp: 1009,
SourceID: 1009,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp6, err := core.DescribeIndex(ctx2, &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
MsgID: 1010,
Timestamp: 1010,
SourceID: 1010,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropIndex(ctx2, &milvuspb.DropIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropIndex,
MsgID: 1011,
Timestamp: 1011,
SourceID: 1011,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp7, err := core.DescribeSegment(ctx2, &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
MsgID: 1012,
Timestamp: 1012,
SourceID: 1012,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp8, err := core.ShowSegments(ctx2, &milvuspb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowSegments,
MsgID: 1013,
Timestamp: 1013,
SourceID: 1013,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("undefine req type", func(t *testing.T) {
st, err := core.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2000,
Timestamp: 2000,
SourceID: 2000,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropCollection(ctx, &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2001,
Timestamp: 2001,
SourceID: 2001,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp1, err := core.HasCollection(ctx, &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2002,
Timestamp: 2002,
SourceID: 2002,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp2, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2003,
Timestamp: 2003,
SourceID: 2003,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp3, err := core.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2004,
Timestamp: 2004,
SourceID: 2004,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2005,
Timestamp: 2005,
SourceID: 2005,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2006,
Timestamp: 2006,
SourceID: 2006,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp4, err := core.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2007,
Timestamp: 2007,
SourceID: 2007,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp5, err := core.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2008,
Timestamp: 2008,
SourceID: 2008,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2009,
Timestamp: 2009,
SourceID: 2009,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp6, err := core.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2010,
Timestamp: 2010,
SourceID: 2010,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropIndex(ctx, &milvuspb.DropIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2011,
Timestamp: 2011,
SourceID: 2011,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp7, err := core.DescribeSegment(ctx, &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2012,
Timestamp: 2012,
SourceID: 2012,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp8, err := core.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 2013,
Timestamp: 2013,
SourceID: 2013,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("alloc time tick", func(t *testing.T) {
req := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 3000,
Timestamp: 3000,
SourceID: 3000,
},
Count: 1,
}
rsp, err := core.AllocTimestamp(ctx, req)
assert.Nil(t, err)
assert.Equal(t, uint32(1), rsp.Count)
assert.NotZero(t, rsp.Timestamp)
})
t.Run("alloc id", func(t *testing.T) {
req := &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 3001,
Timestamp: 3001,
SourceID: 3001,
},
Count: 1,
}
rsp, err := core.AllocID(ctx, req)
assert.Nil(t, err)
assert.Equal(t, uint32(1), rsp.Count)
assert.NotZero(t, rsp.ID)
})
t.Run("get_channels", func(t *testing.T) {
_, err := core.GetTimeTickChannel(ctx)
assert.Nil(t, err)
_, err = core.GetDdChannel(ctx)
assert.Nil(t, err)
_, err = core.GetStatisticsChannel(ctx)
assert.Nil(t, err)
})
err = core.Stop()
assert.Nil(t, err)
st, err := core.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, st.Status.ErrorCode, commonpb.ErrorCode_Success)
assert.NotEqual(t, st.State.StateCode, internalpb.StateCode_Healthy)
t.Run("state_not_healthy", func(t *testing.T) {
st, err := core.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: 4000,
Timestamp: 4000,
SourceID: 4000,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropCollection(ctx, &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
MsgID: 4001,
Timestamp: 4001,
SourceID: 4001,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp1, err := core.HasCollection(ctx, &milvuspb.HasCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasCollection,
MsgID: 4002,
Timestamp: 4002,
SourceID: 4002,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp2, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 4003,
Timestamp: 4003,
SourceID: 4003,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp3, err := core.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: 4004,
Timestamp: 4004,
SourceID: 4004,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: 4005,
Timestamp: 4005,
SourceID: 4005,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropPartition(ctx, &milvuspb.DropPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPartition,
MsgID: 4006,
Timestamp: 4006,
SourceID: 4006,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp4, err := core.HasPartition(ctx, &milvuspb.HasPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HasPartition,
MsgID: 4007,
Timestamp: 4007,
SourceID: 4007,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp5, err := core.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: 4008,
Timestamp: 4008,
SourceID: 4008,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateIndex,
MsgID: 4009,
Timestamp: 4009,
SourceID: 4009,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp6, err := core.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeIndex,
MsgID: 4010,
Timestamp: 4010,
SourceID: 4010,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success)
st, err = core.DropIndex(ctx, &milvuspb.DropIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropIndex,
MsgID: 4011,
Timestamp: 4011,
SourceID: 4011,
},
})
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success)
rsp7, err := core.DescribeSegment(ctx, &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
MsgID: 4012,
Timestamp: 4012,
SourceID: 4012,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success)
rsp8, err := core.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowSegments,
MsgID: 4013,
Timestamp: 4013,
SourceID: 4013,
},
})
assert.Nil(t, err)
assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success)
})
}
......@@ -2,7 +2,6 @@ package masterservice
import (
"context"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
......@@ -39,12 +38,12 @@ func (bt *baseReqTask) Notify(err error) {
func (bt *baseReqTask) WaitToFinish() error {
select {
case <-bt.core.ctx.Done():
return errors.New("context done")
return fmt.Errorf("core context done, %s", bt.core.ctx.Err().Error())
case <-bt.ctx.Done():
return errors.New("grpc context done")
return fmt.Errorf("request context done, %s", bt.ctx.Err().Error())
case err, ok := <-bt.cv:
if !ok {
return errors.New("notify chan closed")
return fmt.Errorf("notify chan closed")
}
return err
}
......@@ -72,6 +71,9 @@ func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool {
}
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateCollection {
return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var schema schemapb.CollectionSchema
err := proto.Unmarshal(t.Req.Schema, &schema)
if err != nil {
......@@ -217,6 +219,10 @@ func (t *DropCollectionReqTask) IgnoreTimeStamp() bool {
}
func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropCollection {
return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
......@@ -277,7 +283,10 @@ func (t *HasCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasCollectionReqTask) Execute(context.Context) error {
func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasCollection {
return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err == nil {
t.HasCollection = true
......@@ -310,6 +319,9 @@ func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool {
}
func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeCollection {
return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var coll *etcdpb.CollectionInfo
var err error
......@@ -359,7 +371,10 @@ func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowCollectionReqTask) Execute(context.Context) error {
func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowCollections {
return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.ListCollections()
if err != nil {
return err
......@@ -390,6 +405,9 @@ func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool {
}
func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreatePartition {
return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
......@@ -446,6 +464,9 @@ func (t *DropPartitionReqTask) IgnoreTimeStamp() bool {
}
func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
......@@ -497,7 +518,10 @@ func (t *HasPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *HasPartitionReqTask) Execute(context.Context) error {
func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasPartition {
return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
if err != nil {
return err
......@@ -528,7 +552,10 @@ func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowPartitionReqTask) Execute(context.Context) error {
func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowPartitions {
return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var coll *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName == "" {
......@@ -572,7 +599,10 @@ func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeSegmentReqTask) Execute(context.Context) error {
func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeSegment {
return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
if err != nil {
return err
......@@ -629,7 +659,10 @@ func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *ShowSegmentReqTask) Execute(context.Context) error {
func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowSegments {
return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
if err != nil {
return err
......@@ -674,6 +707,9 @@ func (t *CreateIndexReqTask) IgnoreTimeStamp() bool {
}
func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateIndex {
return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, err := t.core.idAllocator.AllocOne()
if err != nil {
......@@ -730,7 +766,10 @@ func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool {
return true
}
func (t *DescribeIndexReqTask) Execute(context.Context) error {
func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeIndex {
return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
if err != nil {
return err
......@@ -768,6 +807,9 @@ func (t *DropIndexReqTask) IgnoreTimeStamp() bool {
}
func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropIndex {
return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
if err != nil {
log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err))
......@@ -815,16 +857,6 @@ func (t *CreateIndexTask) BuildIndex() error {
if err != nil {
return err
}
if len(t.indexParams) == 0 {
t.indexParams = make([]*commonpb.KeyValuePair, 0, len(t.fieldSchema.IndexParams))
for _, p := range t.fieldSchema.IndexParams {
t.indexParams = append(t.indexParams, &commonpb.KeyValuePair{
Key: p.Key,
Value: p.Value,
})
}
}
bldID, err = t.core.BuildIndexReq(t.ctx, binlogs, t.fieldSchema.TypeParams, t.indexParams, t.indexID, t.indexName)
if err != nil {
return err
......
package masterservice
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
func TestEqualKeyPairArray(t *testing.T) {
p1 := []*commonpb.KeyValuePair{
{
Key: "k1",
Value: "v1",
},
}
p2 := []*commonpb.KeyValuePair{}
assert.False(t, EqualKeyPairArray(p1, p2))
p2 = append(p2, &commonpb.KeyValuePair{
Key: "k2",
Value: "v2",
})
assert.False(t, EqualKeyPairArray(p1, p2))
p2 = []*commonpb.KeyValuePair{
{
Key: "k1",
Value: "v2",
},
}
assert.False(t, EqualKeyPairArray(p1, p2))
p2 = []*commonpb.KeyValuePair{
{
Key: "k1",
Value: "v1",
},
}
assert.True(t, EqualKeyPairArray(p1, p2))
}
func Test_GetFieldSchemaByID(t *testing.T) {
coll := &etcdpb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
},
},
},
}
_, err := GetFieldSchemaByID(coll, 1)
assert.Nil(t, err)
_, err = GetFieldSchemaByID(coll, 2)
assert.NotNil(t, err)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册