diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index dc68f76ba0e2041b375475ee9d3e086d5b28d717..14a71c81fcfe45a92e86fb2468a1e05126727c24 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,7 +2,6 @@ package masterservice import ( "context" - "errors" "fmt" "math/rand" "sync" @@ -147,67 +146,67 @@ func (c *Core) UpdateStateCode(code internalpb.StateCode) { func (c *Core) checkInit() error { if c.MetaTable == nil { - return errors.New("MetaTable is nil") + return fmt.Errorf("MetaTable is nil") } if c.idAllocator == nil { - return errors.New("idAllocator is nil") + return fmt.Errorf("idAllocator is nil") } if c.tsoAllocator == nil { - return errors.New("tsoAllocator is nil") + return fmt.Errorf("tsoAllocator is nil") } if c.etcdCli == nil { - return errors.New("etcdCli is nil") + return fmt.Errorf("etcdCli is nil") } if c.metaKV == nil { - return errors.New("metaKV is nil") + return fmt.Errorf("metaKV is nil") } if c.kvBase == nil { - return errors.New("kvBase is nil") + return fmt.Errorf("kvBase is nil") } if c.ProxyTimeTickChan == nil { - return errors.New("ProxyTimeTickChan is nil") + return fmt.Errorf("ProxyTimeTickChan is nil") } if c.ddReqQueue == nil { - return errors.New("ddReqQueue is nil") + return fmt.Errorf("ddReqQueue is nil") } if c.DdCreateCollectionReq == nil { - return errors.New("DdCreateCollectionReq is nil") + return fmt.Errorf("DdCreateCollectionReq is nil") } if c.DdDropCollectionReq == nil { - return errors.New("DdDropCollectionReq is nil") + return fmt.Errorf("DdDropCollectionReq is nil") } if c.DdCreatePartitionReq == nil { - return errors.New("DdCreatePartitionReq is nil") + return fmt.Errorf("DdCreatePartitionReq is nil") } if c.DdDropPartitionReq == nil { - return errors.New("DdDropPartitionReq is nil") + return fmt.Errorf("DdDropPartitionReq is nil") } if c.DataServiceSegmentChan == nil { - return errors.New("DataServiceSegmentChan is nil") + return fmt.Errorf("DataServiceSegmentChan is nil") } if c.GetBinlogFilePathsFromDataServiceReq == nil { - return errors.New("GetBinlogFilePathsFromDataServiceReq is nil") + return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil") } if c.GetNumRowsReq == nil { - return errors.New("GetNumRowsReq is nil") + return fmt.Errorf("GetNumRowsReq is nil") } if c.BuildIndexReq == nil { - return errors.New("BuildIndexReq is nil") + return fmt.Errorf("BuildIndexReq is nil") } if c.DropIndexReq == nil { - return errors.New("DropIndexReq is nil") + return fmt.Errorf("DropIndexReq is nil") } if c.InvalidateCollectionMetaCache == nil { - return errors.New("InvalidateCollectionMetaCache is nil") + return fmt.Errorf("InvalidateCollectionMetaCache is nil") } if c.indexTaskQueue == nil { - return errors.New("indexTaskQueue is nil") + return fmt.Errorf("indexTaskQueue is nil") } if c.DataNodeSegmentFlushCompletedChan == nil { - return errors.New("DataNodeSegmentFlushCompletedChan is nil") + return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil") } if c.ReleaseCollection == nil { - return errors.New("ReleaseCollection is nil") + return fmt.Errorf("ReleaseCollection is nil") } log.Debug("master", zap.Int64("node id", int64(Params.NodeID))) @@ -383,15 +382,15 @@ func (c *Core) tsLoop() { } func (c *Core) setMsgStreams() error { if Params.PulsarAddress == "" { - return errors.New("PulsarAddress is empty") + return fmt.Errorf("PulsarAddress is empty") } if Params.MsgChannelSubName == "" { - return errors.New("MsgChannelSubName is emptyr") + return fmt.Errorf("MsgChannelSubName is emptyr") } //proxy time tick stream, if Params.ProxyTimeTickChannel == "" { - return errors.New("ProxyTimeTickChannel is empty") + return fmt.Errorf("ProxyTimeTickChannel is empty") } var err error @@ -411,7 +410,7 @@ func (c *Core) setMsgStreams() error { // master time tick channel if Params.TimeTickChannel == "" { - return errors.New("TimeTickChannel is empty") + return fmt.Errorf("TimeTickChannel is empty") } timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) timeTickStream.AsProducer([]string{Params.TimeTickChannel}) @@ -419,7 +418,7 @@ func (c *Core) setMsgStreams() error { // master dd channel if Params.DdChannel == "" { - return errors.New("DdChannel is empty") + return fmt.Errorf("DdChannel is empty") } ddStream, _ := c.msFactory.NewMsgStream(c.ctx) ddStream.AsProducer([]string{Params.DdChannel}) @@ -557,7 +556,7 @@ func (c *Core) setMsgStreams() error { //segment channel, data service create segment,or data node flush segment will put msg in this channel if Params.DataServiceSegmentChannel == "" { - return errors.New("DataServiceSegmentChannel is empty") + return fmt.Errorf("DataServiceSegmentChannel is empty") } dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx) dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName) @@ -618,10 +617,10 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error CollectionName: collectionName, }) if status == nil { - return errors.New("invalidate collection metacache resp is nil") + return fmt.Errorf("invalidate collection metacache resp is nil") } if status.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(status.Reason) + return fmt.Errorf(status.Reason) } return nil } @@ -723,7 +722,7 @@ func (c *Core) SetIndexService(s types.IndexService) error { return err } if rsp.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(rsp.Reason) + return fmt.Errorf(rsp.Reason) } return nil } @@ -883,7 +882,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -916,7 +915,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -952,7 +951,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -996,7 +995,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1037,7 +1036,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1077,7 +1076,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1110,7 +1109,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1146,7 +1145,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1193,7 +1192,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1236,7 +1235,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1272,7 +1271,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1324,7 +1323,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c t := &DropIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1360,7 +1359,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, @@ -1404,7 +1403,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, - cv: make(chan error), + cv: make(chan error, 1), core: c, }, Req: in, diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index f026e3cac4be54a890858069e5584009e28fcb45..601d18cf755550cd26b6681af9e83c4ec4ae1332 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -15,6 +15,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" @@ -535,7 +536,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) req := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ShowCollections, + MsgType: commonpb.MsgType_ShowPartitions, MsgID: 160, Timestamp: 160, SourceID: 160, @@ -953,7 +954,525 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collArray[2], "testColl") }) + t.Run("context_cancel", func(t *testing.T) { + ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*100) + defer cancel2() + time.Sleep(time.Millisecond * 150) + st, err := core.CreateCollection(ctx2, &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateCollection, + MsgID: 1000, + Timestamp: 1000, + SourceID: 1000, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropCollection(ctx2, &milvuspb.DropCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropCollection, + MsgID: 1001, + Timestamp: 1001, + SourceID: 1001, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp1, err := core.HasCollection(ctx2, &milvuspb.HasCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_HasCollection, + MsgID: 1002, + Timestamp: 1002, + SourceID: 1002, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp2, err := core.DescribeCollection(ctx2, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + MsgID: 1003, + Timestamp: 1003, + SourceID: 1003, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp3, err := core.ShowCollections(ctx2, &milvuspb.ShowCollectionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowCollections, + MsgID: 1004, + Timestamp: 1004, + SourceID: 1004, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreatePartition(ctx2, &milvuspb.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreatePartition, + MsgID: 1005, + Timestamp: 1005, + SourceID: 1005, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropPartition(ctx2, &milvuspb.DropPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropPartition, + MsgID: 1006, + Timestamp: 1006, + SourceID: 1006, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp4, err := core.HasPartition(ctx2, &milvuspb.HasPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_HasPartition, + MsgID: 1007, + Timestamp: 1007, + SourceID: 1007, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp5, err := core.ShowPartitions(ctx2, &milvuspb.ShowPartitionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowPartitions, + MsgID: 1008, + Timestamp: 1008, + SourceID: 1008, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreateIndex(ctx2, &milvuspb.CreateIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateIndex, + MsgID: 1009, + Timestamp: 1009, + SourceID: 1009, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp6, err := core.DescribeIndex(ctx2, &milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeIndex, + MsgID: 1010, + Timestamp: 1010, + SourceID: 1010, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropIndex(ctx2, &milvuspb.DropIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropIndex, + MsgID: 1011, + Timestamp: 1011, + SourceID: 1011, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp7, err := core.DescribeSegment(ctx2, &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeSegment, + MsgID: 1012, + Timestamp: 1012, + SourceID: 1012, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp8, err := core.ShowSegments(ctx2, &milvuspb.ShowSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowSegments, + MsgID: 1013, + Timestamp: 1013, + SourceID: 1013, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success) + + }) + + t.Run("undefine req type", func(t *testing.T) { + st, err := core.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2000, + Timestamp: 2000, + SourceID: 2000, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2001, + Timestamp: 2001, + SourceID: 2001, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp1, err := core.HasCollection(ctx, &milvuspb.HasCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2002, + Timestamp: 2002, + SourceID: 2002, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp2, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2003, + Timestamp: 2003, + SourceID: 2003, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp3, err := core.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2004, + Timestamp: 2004, + SourceID: 2004, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2005, + Timestamp: 2005, + SourceID: 2005, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2006, + Timestamp: 2006, + SourceID: 2006, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp4, err := core.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2007, + Timestamp: 2007, + SourceID: 2007, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp5, err := core.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2008, + Timestamp: 2008, + SourceID: 2008, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2009, + Timestamp: 2009, + SourceID: 2009, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp6, err := core.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2010, + Timestamp: 2010, + SourceID: 2010, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropIndex(ctx, &milvuspb.DropIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2011, + Timestamp: 2011, + SourceID: 2011, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp7, err := core.DescribeSegment(ctx, &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2012, + Timestamp: 2012, + SourceID: 2012, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp8, err := core.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 2013, + Timestamp: 2013, + SourceID: 2013, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success) + + }) + + t.Run("alloc time tick", func(t *testing.T) { + req := &masterpb.AllocTimestampRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 3000, + Timestamp: 3000, + SourceID: 3000, + }, + Count: 1, + } + rsp, err := core.AllocTimestamp(ctx, req) + assert.Nil(t, err) + assert.Equal(t, uint32(1), rsp.Count) + assert.NotZero(t, rsp.Timestamp) + }) + + t.Run("alloc id", func(t *testing.T) { + req := &masterpb.AllocIDRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 3001, + Timestamp: 3001, + SourceID: 3001, + }, + Count: 1, + } + rsp, err := core.AllocID(ctx, req) + assert.Nil(t, err) + assert.Equal(t, uint32(1), rsp.Count) + assert.NotZero(t, rsp.ID) + }) + + t.Run("get_channels", func(t *testing.T) { + _, err := core.GetTimeTickChannel(ctx) + assert.Nil(t, err) + _, err = core.GetDdChannel(ctx) + assert.Nil(t, err) + _, err = core.GetStatisticsChannel(ctx) + assert.Nil(t, err) + }) + err = core.Stop() assert.Nil(t, err) + st, err := core.GetComponentStates(ctx) + assert.Nil(t, err) + assert.Equal(t, st.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.NotEqual(t, st.State.StateCode, internalpb.StateCode_Healthy) + t.Run("state_not_healthy", func(t *testing.T) { + st, err := core.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateCollection, + MsgID: 4000, + Timestamp: 4000, + SourceID: 4000, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropCollection, + MsgID: 4001, + Timestamp: 4001, + SourceID: 4001, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp1, err := core.HasCollection(ctx, &milvuspb.HasCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_HasCollection, + MsgID: 4002, + Timestamp: 4002, + SourceID: 4002, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp1.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp2, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + MsgID: 4003, + Timestamp: 4003, + SourceID: 4003, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp2.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp3, err := core.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowCollections, + MsgID: 4004, + Timestamp: 4004, + SourceID: 4004, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp3.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreatePartition, + MsgID: 4005, + Timestamp: 4005, + SourceID: 4005, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropPartition, + MsgID: 4006, + Timestamp: 4006, + SourceID: 4006, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp4, err := core.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_HasPartition, + MsgID: 4007, + Timestamp: 4007, + SourceID: 4007, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp4.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp5, err := core.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowPartitions, + MsgID: 4008, + Timestamp: 4008, + SourceID: 4008, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp5.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateIndex, + MsgID: 4009, + Timestamp: 4009, + SourceID: 4009, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp6, err := core.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeIndex, + MsgID: 4010, + Timestamp: 4010, + SourceID: 4010, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp6.Status.ErrorCode, commonpb.ErrorCode_Success) + + st, err = core.DropIndex(ctx, &milvuspb.DropIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropIndex, + MsgID: 4011, + Timestamp: 4011, + SourceID: 4011, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_Success) + + rsp7, err := core.DescribeSegment(ctx, &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeSegment, + MsgID: 4012, + Timestamp: 4012, + SourceID: 4012, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp7.Status.ErrorCode, commonpb.ErrorCode_Success) + + rsp8, err := core.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowSegments, + MsgID: 4013, + Timestamp: 4013, + SourceID: 4013, + }, + }) + assert.Nil(t, err) + assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success) + + }) } diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 9796f12d61c4f5b980f7d67779b31a4f46c02122..e11d818c85729f33b32ff50c37ebba84e8362a8e 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -2,7 +2,6 @@ package masterservice import ( "context" - "errors" "fmt" "github.com/golang/protobuf/proto" @@ -39,12 +38,12 @@ func (bt *baseReqTask) Notify(err error) { func (bt *baseReqTask) WaitToFinish() error { select { case <-bt.core.ctx.Done(): - return errors.New("context done") + return fmt.Errorf("core context done, %s", bt.core.ctx.Err().Error()) case <-bt.ctx.Done(): - return errors.New("grpc context done") + return fmt.Errorf("request context done, %s", bt.ctx.Err().Error()) case err, ok := <-bt.cv: if !ok { - return errors.New("notify chan closed") + return fmt.Errorf("notify chan closed") } return err } @@ -72,6 +71,9 @@ func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool { } func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_CreateCollection { + return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } var schema schemapb.CollectionSchema err := proto.Unmarshal(t.Req.Schema, &schema) if err != nil { @@ -217,6 +219,10 @@ func (t *DropCollectionReqTask) IgnoreTimeStamp() bool { } func (t *DropCollectionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DropCollection { + return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } + collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err @@ -277,7 +283,10 @@ func (t *HasCollectionReqTask) IgnoreTimeStamp() bool { return true } -func (t *HasCollectionReqTask) Execute(context.Context) error { +func (t *HasCollectionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_HasCollection { + return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } _, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err == nil { t.HasCollection = true @@ -310,6 +319,9 @@ func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool { } func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DescribeCollection { + return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } var coll *etcdpb.CollectionInfo var err error @@ -359,7 +371,10 @@ func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool { return true } -func (t *ShowCollectionReqTask) Execute(context.Context) error { +func (t *ShowCollectionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_ShowCollections { + return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } coll, err := t.core.MetaTable.ListCollections() if err != nil { return err @@ -390,6 +405,9 @@ func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool { } func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_CreatePartition { + return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err @@ -446,6 +464,9 @@ func (t *DropPartitionReqTask) IgnoreTimeStamp() bool { } func (t *DropPartitionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DropPartition { + return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err @@ -497,7 +518,10 @@ func (t *HasPartitionReqTask) IgnoreTimeStamp() bool { return true } -func (t *HasPartitionReqTask) Execute(context.Context) error { +func (t *HasPartitionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_HasPartition { + return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) if err != nil { return err @@ -528,7 +552,10 @@ func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool { return true } -func (t *ShowPartitionReqTask) Execute(context.Context) error { +func (t *ShowPartitionReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_ShowPartitions { + return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } var coll *etcdpb.CollectionInfo var err error if t.Req.CollectionName == "" { @@ -572,7 +599,10 @@ func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool { return true } -func (t *DescribeSegmentReqTask) Execute(context.Context) error { +func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DescribeSegment { + return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { return err @@ -629,7 +659,10 @@ func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool { return true } -func (t *ShowSegmentReqTask) Execute(context.Context) error { +func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_ShowSegments { + return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { return err @@ -674,6 +707,9 @@ func (t *CreateIndexReqTask) IgnoreTimeStamp() bool { } func (t *CreateIndexReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_CreateIndex { + return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } indexName := Params.DefaultIndexName //TODO, get name from request indexID, err := t.core.idAllocator.AllocOne() if err != nil { @@ -730,7 +766,10 @@ func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool { return true } -func (t *DescribeIndexReqTask) Execute(context.Context) error { +func (t *DescribeIndexReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DescribeIndex { + return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) if err != nil { return err @@ -768,6 +807,9 @@ func (t *DropIndexReqTask) IgnoreTimeStamp() bool { } func (t *DropIndexReqTask) Execute(ctx context.Context) error { + if t.Type() != commonpb.MsgType_DropIndex { + return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) + } info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) if err != nil { log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err)) @@ -815,16 +857,6 @@ func (t *CreateIndexTask) BuildIndex() error { if err != nil { return err } - - if len(t.indexParams) == 0 { - t.indexParams = make([]*commonpb.KeyValuePair, 0, len(t.fieldSchema.IndexParams)) - for _, p := range t.fieldSchema.IndexParams { - t.indexParams = append(t.indexParams, &commonpb.KeyValuePair{ - Key: p.Key, - Value: p.Value, - }) - } - } bldID, err = t.core.BuildIndexReq(t.ctx, binlogs, t.fieldSchema.TypeParams, t.indexParams, t.indexID, t.indexName) if err != nil { return err diff --git a/internal/masterservice/util_test.go b/internal/masterservice/util_test.go new file mode 100644 index 0000000000000000000000000000000000000000..914d4466c072a69c827de49df2008e558ab4de13 --- /dev/null +++ b/internal/masterservice/util_test.go @@ -0,0 +1,59 @@ +package masterservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +func TestEqualKeyPairArray(t *testing.T) { + p1 := []*commonpb.KeyValuePair{ + { + Key: "k1", + Value: "v1", + }, + } + + p2 := []*commonpb.KeyValuePair{} + assert.False(t, EqualKeyPairArray(p1, p2)) + + p2 = append(p2, &commonpb.KeyValuePair{ + Key: "k2", + Value: "v2", + }) + assert.False(t, EqualKeyPairArray(p1, p2)) + p2 = []*commonpb.KeyValuePair{ + { + Key: "k1", + Value: "v2", + }, + } + assert.False(t, EqualKeyPairArray(p1, p2)) + + p2 = []*commonpb.KeyValuePair{ + { + Key: "k1", + Value: "v1", + }, + } + assert.True(t, EqualKeyPairArray(p1, p2)) +} + +func Test_GetFieldSchemaByID(t *testing.T) { + coll := &etcdpb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + }, + }, + }, + } + _, err := GetFieldSchemaByID(coll, 1) + assert.Nil(t, err) + _, err = GetFieldSchemaByID(coll, 2) + assert.NotNil(t, err) +}