diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 12e4b567037d26ade375aa42b4e24029b89dfdca..5e060d1fe7d8cffe5934aa4be8cc5255bab9b9ad 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -41,7 +41,61 @@ import ( "go.etcd.io/etcd/clientv3" ) +func GenSegInfoMsgPack(seg *datapb.SegmentInfo) *msgstream.MsgPack { + msgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + } + segMsg := &msgstream.SegmentInfoMsg{ + BaseMsg: baseMsg, + SegmentMsg: datapb.SegmentMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + Segment: seg, + }, + } + msgPack.Msgs = append(msgPack.Msgs, segMsg) + return &msgPack +} + +func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack { + msgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{0}, + } + segMsg := &msgstream.FlushCompletedMsg{ + BaseMsg: baseMsg, + SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentFlushDone, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + SegmentID: segID, + }, + } + msgPack.Msgs = append(msgPack.Msgs, segMsg) + return &msgPack +} + func TestGrpcService(t *testing.T) { + const ( + dbName = "testDB" + collName = "testColl" + collName2 = "testColl-again" + partName = "testPartition" + fieldName = "vector" + segID = 1001 + ) rand.Seed(time.Now().UnixNano()) randVal := rand.Int() @@ -94,7 +148,10 @@ func TestGrpcService(t *testing.T) { assert.Nil(t, err) core.ProxyTimeTickChan = make(chan typeutil.Timestamp, 8) - core.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 8) + FlushedSegmentChan := make(chan *msgstream.MsgPack, 8) + core.DataNodeFlushedSegmentChan = FlushedSegmentChan + SegmentInfoChan := make(chan *msgstream.MsgPack, 8) + core.DataServiceSegmentChan = SegmentInfoChan timeTickArray := make([]typeutil.Timestamp, 0, 16) core.SendTimeTick = func(ts typeutil.Timestamp) error { @@ -130,8 +187,6 @@ func TestGrpcService(t *testing.T) { return nil } - core.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) - core.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { return []string{"file1", "file2", "file3"}, nil } @@ -185,28 +240,28 @@ func TestGrpcService(t *testing.T) { req := &internalpb.GetComponentStatesRequest{} rsp, err := svr.GetComponentStates(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("get time tick channel", func(t *testing.T) { req := &internalpb.GetTimeTickChannelRequest{} rsp, err := svr.GetTimeTickChannel(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("get statistics channel", func(t *testing.T) { req := &internalpb.GetStatisticsChannelRequest{} rsp, err := svr.GetStatisticsChannel(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("get dd channel", func(t *testing.T) { req := &internalpb.GetDdChannelRequest{} rsp, err := svr.GetDdChannel(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("alloc time stamp", func(t *testing.T) { @@ -215,7 +270,7 @@ func TestGrpcService(t *testing.T) { } rsp, err := svr.AllocTimestamp(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("alloc id", func(t *testing.T) { @@ -224,20 +279,18 @@ func TestGrpcService(t *testing.T) { } rsp, err := svr.AllocID(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) }) t.Run("create collection", func(t *testing.T) { schema := schemapb.CollectionSchema{ - Name: "testColl", - Description: "testColl", - AutoID: true, + Name: collName, + AutoID: true, Fields: []*schemapb.FieldSchema{ { FieldID: 100, - Name: "vector", + Name: fieldName, IsPrimaryKey: false, - Description: "vector", DataType: schemapb.DataType_FloatVector, TypeParams: nil, IndexParams: []*commonpb.KeyValuePair{ @@ -261,32 +314,32 @@ func TestGrpcService(t *testing.T) { SourceID: 100, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, Schema: sbf, } status, err := cli.CreateCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, len(createCollectionArray), 1) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_CreateCollection) - assert.Equal(t, createCollectionArray[0].CollectionName, "testColl") + assert.Equal(t, 1, len(createCollectionArray)) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + assert.Equal(t, commonpb.MsgType_CreateCollection, createCollectionArray[0].Base.MsgType) + assert.Equal(t, collName, createCollectionArray[0].CollectionName) req.Base.MsgID = 101 req.Base.Timestamp = 101 req.Base.SourceID = 101 status, err = cli.CreateCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) req.Base.MsgID = 102 req.Base.Timestamp = 102 req.Base.SourceID = 102 - req.CollectionName = "testColl-again" + req.CollectionName = collName2 status, err = cli.CreateCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) schema.Name = req.CollectionName sbf, err = proto.Marshal(&schema) @@ -297,10 +350,10 @@ func TestGrpcService(t *testing.T) { req.Base.SourceID = 103 status, err = cli.CreateCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, len(createCollectionArray), 2) - assert.Equal(t, createCollectionArray[1].Base.MsgType, commonpb.MsgType_CreateCollection) - assert.Equal(t, createCollectionArray[1].CollectionName, "testColl-again") + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + assert.Equal(t, 2, len(createCollectionArray)) + assert.Equal(t, commonpb.MsgType_CreateCollection, createCollectionArray[1].Base.MsgType) + assert.Equal(t, collName2, createCollectionArray[1].CollectionName) //time stamp go back, master response to add the timestamp, so the time tick will never go back //schema.Name = "testColl-goback" @@ -328,12 +381,12 @@ func TestGrpcService(t *testing.T) { SourceID: 110, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, } rsp, err := cli.HasCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.Value, true) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, true, rsp.Value) req = &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ @@ -347,8 +400,8 @@ func TestGrpcService(t *testing.T) { } rsp, err = cli.HasCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.Value, false) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, false, rsp.Value) // test time stamp go back req = &milvuspb.HasCollectionRequest{ @@ -363,12 +416,12 @@ func TestGrpcService(t *testing.T) { } rsp, err = cli.HasCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.Value, false) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, false, rsp.Value) }) t.Run("describe collection", func(t *testing.T) { - collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ @@ -378,13 +431,13 @@ func TestGrpcService(t *testing.T) { SourceID: 120, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, } rsp, err := cli.DescribeCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.Schema.Name, "testColl") - assert.Equal(t, rsp.CollectionID, collMeta.ID) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, collName, rsp.Schema.Name) + assert.Equal(t, collMeta.ID, rsp.CollectionID) }) t.Run("show collection", func(t *testing.T) { @@ -399,9 +452,9 @@ func TestGrpcService(t *testing.T) { } rsp, err := cli.ShowCollections(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.ElementsMatch(t, rsp.CollectionNames, []string{"testColl", "testColl-again"}) - assert.Equal(t, len(rsp.CollectionNames), 2) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.ElementsMatch(t, rsp.CollectionNames, []string{collName, collName2}) + assert.Equal(t, 2, len(rsp.CollectionNames)) }) t.Run("create partition", func(t *testing.T) { @@ -412,20 +465,19 @@ func TestGrpcService(t *testing.T) { Timestamp: 140, SourceID: 140, }, - DbName: "testDb", - CollectionName: "testColl", - PartitionName: "testPartition", + DbName: dbName, + CollectionName: collName, + PartitionName: partName, } status, err := cli.CreatePartition(ctx, req) assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - assert.Equal(t, len(collMeta.PartitionIDs), 2) + assert.Equal(t, 2, len(collMeta.PartitionIDs)) partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[1], 0) assert.Nil(t, err) - assert.Equal(t, partMeta.PartitionName, "testPartition") - + assert.Equal(t, partName, partMeta.PartitionName) assert.Equal(t, 1, len(collectionMetaCache)) }) @@ -437,18 +489,18 @@ func TestGrpcService(t *testing.T) { Timestamp: 150, SourceID: 150, }, - DbName: "testDb", - CollectionName: "testColl", - PartitionName: "testPartition", + DbName: dbName, + CollectionName: collName, + PartitionName: partName, } rsp, err := cli.HasPartition(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.Value, true) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, true, rsp.Value) }) t.Run("show partition", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl", 0) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ @@ -458,18 +510,18 @@ func TestGrpcService(t *testing.T) { SourceID: 160, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, CollectionID: coll.ID, } rsp, err := cli.ShowPartitions(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, len(rsp.PartitionNames), 2) - assert.Equal(t, len(rsp.PartitionIDs), 2) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, 2, len(rsp.PartitionNames)) + assert.Equal(t, 2, len(rsp.PartitionIDs)) }) t.Run("show segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl", 0) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] part, err := core.MetaTable.GetPartitionByID(1, partID, 0) @@ -480,11 +532,12 @@ func TestGrpcService(t *testing.T) { CollectionID: coll.ID, PartitionID: part.PartitionID, } - core.DataServiceSegmentChan <- seg + segInfoMsgPack := GenSegInfoMsgPack(seg) + SegmentInfoChan <- segInfoMsgPack time.Sleep(time.Millisecond * 100) part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) - assert.Equal(t, len(part.SegmentIDs), 1) + assert.Equal(t, 1, len(part.SegmentIDs)) req := &milvuspb.ShowSegmentsRequest{ Base: &commonpb.MsgBase{ @@ -498,9 +551,9 @@ func TestGrpcService(t *testing.T) { } rsp, err := cli.ShowSegments(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, rsp.SegmentIDs[0], int64(1000)) - assert.Equal(t, len(rsp.SegmentIDs), 1) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, int64(1000), rsp.SegmentIDs[0]) + assert.Equal(t, 1, len(rsp.SegmentIDs)) }) t.Run("create index", func(t *testing.T) { @@ -511,9 +564,9 @@ func TestGrpcService(t *testing.T) { Timestamp: 180, SourceID: 180, }, - DbName: "", - CollectionName: "testColl", - FieldName: "vector", + DbName: dbName, + CollectionName: collName, + FieldName: fieldName, ExtraParams: []*commonpb.KeyValuePair{ { Key: "ik1", @@ -521,15 +574,15 @@ func TestGrpcService(t *testing.T) { }, }, } - collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - assert.Equal(t, len(collMeta.FieldIndexes), 0) + assert.Zero(t, len(collMeta.FieldIndexes)) rsp, err := cli.CreateIndex(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err = core.MetaTable.GetCollectionByName("testColl", 0) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode) + collMeta, err = core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - assert.Equal(t, len(collMeta.FieldIndexes), 1) + assert.Equal(t, 1, len(collMeta.FieldIndexes)) binlogLock.Lock() defer binlogLock.Unlock() @@ -539,11 +592,11 @@ func TestGrpcService(t *testing.T) { req.FieldName = "no field" rsp, err = cli.CreateIndex(ctx, req) assert.Nil(t, err) - assert.NotEqual(t, rsp.ErrorCode, commonpb.ErrorCode_Success) + assert.NotEqual(t, commonpb.ErrorCode_Success, rsp.ErrorCode) }) t.Run("describe segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl", 0) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.DescribeSegmentRequest{ @@ -558,7 +611,7 @@ func TestGrpcService(t *testing.T) { } rsp, err := cli.DescribeSegment(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) t.Logf("index id = %d", rsp.IndexID) }) @@ -570,36 +623,38 @@ func TestGrpcService(t *testing.T) { Timestamp: 200, SourceID: 200, }, - DbName: "", - CollectionName: "testColl", - FieldName: "vector", + DbName: dbName, + CollectionName: collName, + FieldName: fieldName, IndexName: "", } rsp, err := cli.DescribeIndex(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, len(rsp.IndexDescriptions), 1) - assert.Equal(t, rsp.IndexDescriptions[0].IndexName, cms.Params.DefaultIndexName) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, 1, len(rsp.IndexDescriptions)) + assert.Equal(t, cms.Params.DefaultIndexName, rsp.IndexDescriptions[0].IndexName) }) t.Run("flush segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl", 0) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] part, err := core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) - assert.Equal(t, len(part.SegmentIDs), 1) + assert.Equal(t, 1, len(part.SegmentIDs)) seg := &datapb.SegmentInfo{ - ID: 1001, + ID: segID, CollectionID: coll.ID, PartitionID: part.PartitionID, } - core.DataServiceSegmentChan <- seg + segInfoMsgPack := GenSegInfoMsgPack(seg) + SegmentInfoChan <- segInfoMsgPack time.Sleep(time.Millisecond * 100) part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) - assert.Equal(t, len(part.SegmentIDs), 2) - core.DataNodeSegmentFlushCompletedChan <- 1001 + assert.Equal(t, 2, len(part.SegmentIDs)) + flushedSegMsgPack := GenFlushedSegMsgPack(segID) + FlushedSegmentChan <- flushedSegMsgPack time.Sleep(time.Millisecond * 100) req := &milvuspb.DescribeIndexRequest{ @@ -609,16 +664,16 @@ func TestGrpcService(t *testing.T) { Timestamp: 210, SourceID: 210, }, - DbName: "", - CollectionName: "testColl", - FieldName: "vector", + DbName: dbName, + CollectionName: collName, + FieldName: fieldName, IndexName: "", } rsp, err := cli.DescribeIndex(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, len(rsp.IndexDescriptions), 1) - assert.Equal(t, rsp.IndexDescriptions[0].IndexName, cms.Params.DefaultIndexName) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, 1, len(rsp.IndexDescriptions)) + assert.Equal(t, cms.Params.DefaultIndexName, rsp.IndexDescriptions[0].IndexName) }) @@ -630,23 +685,22 @@ func TestGrpcService(t *testing.T) { Timestamp: 215, SourceID: 215, }, - DbName: "", - CollectionName: "testColl", - FieldName: "vector", + DbName: dbName, + CollectionName: collName, + FieldName: fieldName, IndexName: cms.Params.DefaultIndexName, } - _, idx, err := core.MetaTable.GetIndexByName("testColl", cms.Params.DefaultIndexName) + _, idx, err := core.MetaTable.GetIndexByName(collName, cms.Params.DefaultIndexName) assert.Nil(t, err) assert.Equal(t, len(idx), 1) rsp, err := cli.DropIndex(ctx, req) assert.Nil(t, err) - assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_Success) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode) dropIDLock.Lock() - assert.Equal(t, len(dropID), 1) - assert.Equal(t, dropID[0], idx[0].IndexID) + assert.Equal(t, 1, len(dropID)) + assert.Equal(t, idx[0].IndexID, dropID[0]) dropIDLock.Unlock() - }) t.Run("drop partition", func(t *testing.T) { @@ -657,19 +711,19 @@ func TestGrpcService(t *testing.T) { Timestamp: 220, SourceID: 220, }, - DbName: "testDb", - CollectionName: "testColl", - PartitionName: "testPartition", + DbName: dbName, + CollectionName: collName, + PartitionName: partName, } status, err := cli.DropPartition(ctx, req) assert.Nil(t, err) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - assert.Equal(t, len(collMeta.PartitionIDs), 1) + assert.Equal(t, 1, len(collMeta.PartitionIDs)) partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[0], 0) assert.Nil(t, err) - assert.Equal(t, partMeta.PartitionName, cms.Params.DefaultPartitionName) + assert.Equal(t, cms.Params.DefaultPartitionName, partMeta.PartitionName) assert.Equal(t, 2, len(collectionMetaCache)) }) @@ -682,17 +736,17 @@ func TestGrpcService(t *testing.T) { SourceID: 230, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, } status, err := cli.DropCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, len(dropCollectionArray), 1) - assert.Equal(t, dropCollectionArray[0].Base.MsgType, commonpb.MsgType_DropCollection) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - assert.Equal(t, dropCollectionArray[0].CollectionName, "testColl") - assert.Equal(t, len(collectionMetaCache), 3) - assert.Equal(t, collectionMetaCache[0], "testColl") + assert.Equal(t, 1, len(dropCollectionArray)) + assert.Equal(t, commonpb.MsgType_DropCollection, dropCollectionArray[0].Base.MsgType) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + assert.Equal(t, collName, dropCollectionArray[0].CollectionName) + assert.Equal(t, 3, len(collectionMetaCache)) + assert.Equal(t, collName, collectionMetaCache[0]) req = &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{ @@ -702,12 +756,12 @@ func TestGrpcService(t *testing.T) { SourceID: 231, }, DbName: "testDb", - CollectionName: "testColl", + CollectionName: collName, } status, err = cli.DropCollection(ctx, req) assert.Nil(t, err) - assert.Equal(t, len(dropCollectionArray), 1) - assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, 1, len(dropCollectionArray)) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) }) err = cli.Stop() diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index f3755aba387d8119c517050c70187f68f2611cf0..41005d58d01211133c1de8745cb60143618af400 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -114,11 +114,11 @@ type Core struct { //setMsgStreams, send drop partition into dd channel SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error - //setMsgStreams segment channel, receive segment info from data service, if master create segment - DataServiceSegmentChan chan *datapb.SegmentInfo + // if master create segment, data service will put segment msg into this channel + DataServiceSegmentChan <-chan *ms.MsgPack - //setMsgStreams ,if segment flush completed, data node would put segment id into msg stream - DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID + // if segment flush completed, data node would put segment msg into this channel + DataNodeFlushedSegmentChan <-chan *ms.MsgPack //get binlog file path from data service, GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) @@ -214,9 +214,6 @@ func (c *Core) checkInit() error { if c.SendDdDropPartitionReq == nil { return fmt.Errorf("SendDdDropPartitionReq is nil") } - if c.DataServiceSegmentChan == nil { - return fmt.Errorf("DataServiceSegmentChan is nil") - } if c.GetBinlogFilePathsFromDataServiceReq == nil { return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil") } @@ -232,8 +229,11 @@ func (c *Core) checkInit() error { if c.InvalidateCollectionMetaCache == nil { return fmt.Errorf("InvalidateCollectionMetaCache is nil") } - if c.DataNodeSegmentFlushCompletedChan == nil { - return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil") + if c.DataServiceSegmentChan == nil { + return fmt.Errorf("DataServiceSegmentChan is nil") + } + if c.DataNodeFlushedSegmentChan == nil { + return fmt.Errorf("DataNodeFlushedSegmentChan is nil") } if c.ReleaseCollection == nil { return fmt.Errorf("ReleaseCollection is nil") @@ -307,71 +307,127 @@ func (c *Core) startTimeTickLoop() { } } -//data service send segment info to master when create segment +// data service send segment info msg to master when create segment func (c *Core) startDataServiceSegmentLoop() { for { select { case <-c.ctx.Done(): log.Debug("close data service segment loop") return - case seg, ok := <-c.DataServiceSegmentChan: + case segMsg, ok := <-c.DataServiceSegmentChan: if !ok { - log.Debug("data service segment is closed, exit loop") + log.Debug("data service segment channel is closed, exit loop") return } - if seg == nil { - log.Warn("segment from data service is nil") - } else if _, err := c.MetaTable.AddSegment(seg); err != nil { - //what if master add segment failed, but data service success? - log.Warn("add segment info meta table failed ", zap.String("error", err.Error())) - } else { - log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.ID)) + var segInfos []*datapb.SegmentInfo + for _, msg := range segMsg.Msgs { + if msg.Type() != commonpb.MsgType_SegmentInfo { + continue + } + segInfoMsg := msg.(*ms.SegmentInfoMsg) + segInfos = append(segInfos, segInfoMsg.Segment) + } + if len(segInfos) > 0 { + startPosByte, err := json.Marshal(segMsg.StartPositions) + if err != nil { + log.Error("json.Marshal fail", zap.String("err", err.Error())) + continue + } + endPosByte, err := json.Marshal(segMsg.EndPositions) + if err != nil { + log.Error("json.Marshal fail", zap.String("err", err.Error())) + continue + } + + if _, err := c.MetaTable.AddSegment(segInfos, string(startPosByte), string(endPosByte)); err != nil { + //what if master add segment failed, but data service success? + log.Debug("add segment info meta table failed ", zap.String("error", err.Error())) + continue + } } } } } -func (c *Core) startSegmentFlushCompletedLoop() { +// data node will put msg in this channel when flush segment +func (c *Core) startDataNodeFlushedSegmentLoop() { for { select { case <-c.ctx.Done(): log.Debug("close segment flush completed loop") return - case segID, ok := <-c.DataNodeSegmentFlushCompletedChan: + case segMsg, ok := <-c.DataNodeFlushedSegmentChan: if !ok { log.Debug("data node segment flush completed chan has closed, exit loop") + return } - log.Debug("flush segment", zap.Int64("id", segID)) - coll, err := c.MetaTable.GetCollectionBySegmentID(segID) + + startPosByte, err := json.Marshal(segMsg.StartPositions) if err != nil { - log.Warn("GetCollectionBySegmentID error", zap.Error(err)) - break + log.Error("json.Marshal fail", zap.String("err", err.Error())) + continue } - err = c.MetaTable.AddFlushedSegment(segID) + endPosByte, err := json.Marshal(segMsg.EndPositions) if err != nil { - log.Warn("AddFlushedSegment error", zap.Error(err)) + log.Error("json.Marshal fail", zap.String("err", err.Error())) + continue } - for _, f := range coll.FieldIndexes { - idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID) - if err != nil { - log.Warn("index not found", zap.Int64("index id", f.IndexID)) + + var segIdxInfos []*etcdpb.SegmentIndexInfo + for _, msg := range segMsg.Msgs { + // check msg type + if msg.Type() != commonpb.MsgType_SegmentFlushDone { continue } + flushMsg := msg.(*ms.FlushCompletedMsg) + segID := flushMsg.SegmentID + log.Debug("flush segment", zap.Int64("id", segID)) - fieldSch, err := GetFieldSchemaByID(coll, f.FiledID) + coll, err := c.MetaTable.GetCollectionBySegmentID(segID) + if err != nil { + log.Warn("GetCollectionBySegmentID error", zap.Error(err)) + continue + } + err = c.MetaTable.AddFlushedSegment(segID) if err != nil { - log.Warn("field schema not found", zap.Int64("field id", f.FiledID)) + log.Warn("AddFlushedSegment error", zap.Error(err)) continue } - if err = c.BuildIndex(segID, fieldSch, idxInfo, true); err != nil { - log.Error("build index fail", zap.String("error", err.Error())) - } else { - log.Debug("build index", zap.String("index name", idxInfo.IndexName), - zap.String("field name", fieldSch.Name), - zap.Int64("segment id", segID)) + for _, f := range coll.FieldIndexes { + fieldSch, err := GetFieldSchemaByID(coll, f.FiledID) + if err != nil { + log.Warn("field schema not found", zap.Int64("field id", f.FiledID)) + continue + } + + idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID) + if err != nil { + log.Warn("index not found", zap.Int64("index id", f.IndexID)) + continue + } + + info := etcdpb.SegmentIndexInfo{ + SegmentID: segID, + FieldID: fieldSch.FieldID, + IndexID: idxInfo.IndexID, + EnableIndex: false, + } + info.BuildID, err = c.BuildIndex(segID, fieldSch, idxInfo, true) + if err == nil { + info.EnableIndex = true + } else { + log.Error("build index fail", zap.String("error", err.Error())) + } + + segIdxInfos = append(segIdxInfos, &info) } } + + _, err = c.MetaTable.AddIndex(segIdxInfos, string(startPosByte), string(endPosByte)) + if err != nil { + log.Error("AddIndex fail", zap.String("err", err.Error())) + } } } } @@ -583,45 +639,25 @@ func (c *Core) setMsgStreams() error { } }() - //segment channel, data service create segment,or data node flush segment will put msg in this channel if Params.DataServiceSegmentChannel == "" { return fmt.Errorf("DataServiceSegmentChannel is empty") } - dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx) - dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName) - log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + Params.MsgChannelSubName) - dataServiceStream.Start() - c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) - c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024) - // receive segment info from msg stream - go func() { - for { - select { - case <-c.ctx.Done(): - return - case segMsg, ok := <-dataServiceStream.Chan(): - if !ok { - log.Warn("data service segment msg closed") - } - if len(segMsg.Msgs) > 0 { - for _, segm := range segMsg.Msgs { - segInfoMsg, ok := segm.(*ms.SegmentInfoMsg) - if ok { - c.DataServiceSegmentChan <- segInfoMsg.Segment - } else { - flushMsg, ok := segm.(*ms.FlushCompletedMsg) - if ok { - c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID - } else { - log.Debug("receive unexpected msg from data service stream", zap.Stringer("segment", segInfoMsg.SegmentMsg.Segment)) - } - } - } - } - } - } - }() + // data service will put msg into this channel when create segment + dsStream, _ := c.msFactory.NewMsgStream(c.ctx) + dsSubName := Params.MsgChannelSubName + "ds" + dsStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dsSubName) + log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dsSubName) + dsStream.Start() + c.DataServiceSegmentChan = dsStream.Chan() + + // data node will put msg into this channel when flush segment + dnStream, _ := c.msFactory.NewMsgStream(c.ctx) + dnSubName := Params.MsgChannelSubName + "dn" + dnStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dnSubName) + log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dnSubName) + dnStream.Start() + c.DataNodeFlushedSegmentChan = dnStream.Chan() return nil } @@ -784,38 +820,31 @@ func (c *Core) SetQueryService(s types.QueryService) error { } // BuildIndex will check row num and call build index service -func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) error { +func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) (typeutil.UniqueID, error) { if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) { - return nil + return 0, nil } rows, err := c.GetNumRowsReq(segID, isFlush) if err != nil { - return err + return 0, err } var bldID typeutil.UniqueID - enableIdx := false if rows < Params.MinSegmentSizeToEnableIndex { log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows)) } else { binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID) if err != nil { - return err + return 0, err } bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo) if err != nil { - return err + return 0, err } - enableIdx = true - } - seg := etcdpb.SegmentIndexInfo{ - SegmentID: segID, - FieldID: field.FieldID, - IndexID: idxInfo.IndexID, - BuildID: bldID, - EnableIndex: enableIdx, } - _, err = c.MetaTable.AddIndex(&seg) - return err + log.Debug("build index", zap.String("index name", idxInfo.IndexName), + zap.String("field name", field.Name), + zap.Int64("segment id", segID)) + return bldID, nil } func (c *Core) Init() error { @@ -980,7 +1009,7 @@ func (c *Core) Start() error { go c.startDdScheduler() go c.startTimeTickLoop() go c.startDataServiceSegmentLoop() - go c.startSegmentFlushCompletedLoop() + go c.startDataNodeFlushedSegmentLoop() go c.tsLoop() go c.chanTimeTick.StartWatch() c.stateCode.Store(internalpb.StateCode_Healthy) diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 56f737c858cc3620b6b761c377cb14a02dc047f1..c3bd5fa58b3179246fe37d0af645cabfabf6f4bb 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -1919,7 +1919,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo) + c.DataServiceSegmentChan = make(chan *msgstream.MsgPack) err = c.checkInit() assert.NotNil(t, err) @@ -1953,7 +1953,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.DataNodeSegmentFlushCompletedChan = make(chan int64) + c.DataNodeFlushedSegmentChan = make(chan *msgstream.MsgPack) err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 4f47367d870e258b3b92664f5c3c8f8a9f1d8b5f..ae8df7e51cf115358861797cf057c8b9cbe0ec71 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -41,6 +41,9 @@ const ( TimestampPrefix = ComponentPrefix + "/timestamp" + MsgStartPositionPrefix = ComponentPrefix + "/msg-start-position" + MsgEndPositionPrefix = ComponentPrefix + "/msg-end-position" + DDOperationPrefix = ComponentPrefix + "/dd-operation" DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" @@ -689,45 +692,55 @@ func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID type } -func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, error) { +func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() - collMeta, ok := mt.collID2Meta[seg.CollectionID] - if !ok { - return 0, fmt.Errorf("can't find collection id = %d", seg.CollectionID) - } - partMeta, ok := mt.partitionID2Meta[seg.PartitionID] - if !ok { - return 0, fmt.Errorf("can't find partition id = %d", seg.PartitionID) - } - exist := false - for _, partID := range collMeta.PartitionIDs { - if partID == seg.PartitionID { - exist = true - break + + meta := make(map[string]string) + for _, segInfo := range segInfos { + collMeta, ok := mt.collID2Meta[segInfo.CollectionID] + if !ok { + return 0, fmt.Errorf("can't find collection id = %d", segInfo.CollectionID) } - } - if !exist { - return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID) - } - exist = false - for _, segID := range partMeta.SegmentIDs { - if segID == seg.ID { - exist = true + partMeta, ok := mt.partitionID2Meta[segInfo.PartitionID] + if !ok { + return 0, fmt.Errorf("can't find partition id = %d", segInfo.PartitionID) } + exist := false + for _, partID := range collMeta.PartitionIDs { + if partID == segInfo.PartitionID { + exist = true + break + } + } + if !exist { + return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", segInfo.PartitionID, segInfo.CollectionID) + } + exist = false + for _, segID := range partMeta.SegmentIDs { + if segID == segInfo.ID { + exist = true + } + } + if exist { + return 0, fmt.Errorf("segment id = %d exist", segInfo.ID) + } + partMeta.SegmentIDs = append(partMeta.SegmentIDs, segInfo.ID) + mt.partitionID2Meta[segInfo.PartitionID] = partMeta + mt.segID2CollID[segInfo.ID] = segInfo.CollectionID + mt.segID2PartitionID[segInfo.ID] = segInfo.PartitionID + + k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, segInfo.CollectionID, segInfo.PartitionID) + v := proto.MarshalTextString(&partMeta) + meta[k] = v } - if exist { - return 0, fmt.Errorf("segment id = %d exist", seg.ID) - } - partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.ID) - mt.partitionID2Meta[seg.PartitionID] = partMeta - mt.segID2CollID[seg.ID] = seg.CollectionID - mt.segID2PartitionID[seg.ID] = seg.PartitionID - k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID) - v := proto.MarshalTextString(&partMeta) - ts, err := mt.client.Save(k, v) + if msgStartPos != "" && msgEndPos != "" { + meta[MsgStartPositionPrefix] = msgStartPos + meta[MsgEndPositionPrefix] = msgEndPos + } + ts, err := mt.client.MultiSave(meta, nil) if err != nil { _ = mt.reloadFromKV() return 0, err @@ -735,62 +748,72 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, er return ts, nil } -func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) { +func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() - collID, ok := mt.segID2CollID[segIdxInfo.SegmentID] - if !ok { - return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) - } - collMeta, ok := mt.collID2Meta[collID] - if !ok { - return 0, fmt.Errorf("collection id = %d not found", collID) - } - partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID] - if !ok { - return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) - } - exist := false - for _, fidx := range collMeta.FieldIndexes { - if fidx.IndexID == segIdxInfo.IndexID { - exist = true - break + meta := make(map[string]string) + + for _, segIdxInfo := range segIdxInfos { + collID, ok := mt.segID2CollID[segIdxInfo.SegmentID] + if !ok { + return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) + } + collMeta, ok := mt.collID2Meta[collID] + if !ok { + return 0, fmt.Errorf("collection id = %d not found", collID) + } + partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID] + if !ok { + return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) + } + exist := false + for _, fidx := range collMeta.FieldIndexes { + if fidx.IndexID == segIdxInfo.IndexID { + exist = true + break + } + } + if !exist { + return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) } - } - if !exist { - return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) - } - segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] - if !ok { - idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo} - mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap - } else { - tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID] - if ok { - if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { - log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) - return 0, nil + segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] + if !ok { + idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo} + mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap + } else { + tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID] + if ok { + if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { + log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) + return 0, nil + } + return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) } - return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) } + + if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok { + mt.flushedSegID[segIdxInfo.SegmentID] = true + } + + (*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo + k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID) + v := proto.MarshalTextString(segIdxInfo) + meta[k] = v } - (*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo - k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID) - v := proto.MarshalTextString(segIdxInfo) + if msgStartPos != "" && msgEndPos != "" { + meta[MsgStartPositionPrefix] = msgStartPos + meta[MsgEndPositionPrefix] = msgEndPos + } - ts, err := mt.client.Save(k, v) + ts, err := mt.client.MultiSave(meta, nil) if err != nil { _ = mt.reloadFromKV() return 0, err } - if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok { - mt.flushedSegID[segIdxInfo.SegmentID] = true - } - return ts, nil } diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 143a6a89842b4784d6b09c46a85b89724ecfe3c9..d632df635708a236cfe922a2667914d98c831f96 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -301,29 +301,29 @@ func TestMetaTable(t *testing.T) { }) t.Run("add segment", func(t *testing.T) { - seg := &datapb.SegmentInfo{ + segInfo := &datapb.SegmentInfo{ ID: segID, CollectionID: collID, PartitionID: partID, } - _, err := mt.AddSegment(seg) + _, err := mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.NotNil(t, err) - seg.ID = segID2 - seg.CollectionID = collIDInvalid - _, err = mt.AddSegment(seg) + segInfo.ID = segID2 + segInfo.CollectionID = collIDInvalid + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.NotNil(t, err) - seg.CollectionID = collID - seg.PartitionID = partIDInvalid - _, err = mt.AddSegment(seg) + segInfo.CollectionID = collID + segInfo.PartitionID = partIDInvalid + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.NotNil(t, err) - seg.PartitionID = partID - _, err = mt.AddSegment(seg) + segInfo.PartitionID = partID + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) }) @@ -334,15 +334,15 @@ func TestMetaTable(t *testing.T) { IndexID: indexID, BuildID: buildID, } - _, err := mt.AddIndex(&segIdxInfo) + _, err := mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.Nil(t, err) // it's legal to add index twice - _, err = mt.AddIndex(&segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.Nil(t, err) segIdxInfo.BuildID = 202 - _, err = mt.AddIndex(&segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID)) }) @@ -529,12 +529,12 @@ func TestMetaTable(t *testing.T) { _, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - seg := &datapb.SegmentInfo{ + segInfo := &datapb.SegmentInfo{ ID: 100, CollectionID: collID, PartitionID: partID, } - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -542,14 +542,14 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection: %s", collInfo.Schema.Name)) - _, err = mt.GetCollectionBySegmentID(seg.ID) + _, err = mt.GetCollectionBySegmentID(segInfo.ID) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id: %d", collInfo.ID)) mt.segID2CollID = make(map[int64]int64) - _, err = mt.GetCollectionBySegmentID(seg.ID) + _, err = mt.GetCollectionBySegmentID(segInfo.ID) assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("segment id %d not belong to any collection", seg.ID)) + assert.EqualError(t, err, fmt.Sprintf("segment id %d not belong to any collection", segInfo.ID)) }) t.Run("add partition failed", func(t *testing.T) { @@ -686,24 +686,24 @@ func TestMetaTable(t *testing.T) { } mt.partitionID2Meta[noPart.PartitionID] = noPart - seg := &datapb.SegmentInfo{ + segInfo := &datapb.SegmentInfo{ ID: 100, CollectionID: collInfo.ID, PartitionID: noPart.PartitionID, } - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID)) + assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", segInfo.PartitionID, segInfo.CollectionID)) - seg = &datapb.SegmentInfo{ + segInfo = &datapb.SegmentInfo{ ID: 11, CollectionID: collInfo.ID, PartitionID: partInfo.PartitionID, } - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("save error") } - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, "save error") }) @@ -725,36 +725,36 @@ func TestMetaTable(t *testing.T) { _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - seg := &datapb.SegmentInfo{ + segInfo := &datapb.SegmentInfo{ ID: 100, CollectionID: collID, PartitionID: partID, } - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) - segIdxInfo := &pb.SegmentIndexInfo{ + segIdxInfo := pb.SegmentIndexInfo{ SegmentID: segID, FieldID: fieldID, IndexID: indexID2, BuildID: buildID, } - _, err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID)) mt.segID2PartitionID = make(map[int64]int64) - _, err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID)) mt.segID2CollID = make(map[int64]int64) - _, err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)) @@ -764,14 +764,14 @@ func TestMetaTable(t *testing.T) { collInfo.PartitionIDs = nil _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) assert.Nil(t, err) - _, err = mt.AddSegment(seg) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) segIdxInfo.IndexID = indexID - mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) { return 0, fmt.Errorf("save error") } - _, err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.NotNil(t, err) assert.EqualError(t, err, "save error") }) @@ -872,27 +872,27 @@ func TestMetaTable(t *testing.T) { CollectionID: collID, PartitionID: partID, } - _, err = mt.AddSegment(segInfo) + _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "") assert.Nil(t, err) - segIdx := &pb.SegmentIndexInfo{ + segIdxInfo := pb.SegmentIndexInfo{ SegmentID: segID, FieldID: fieldID, IndexID: indexID, BuildID: buildID, } - _, err = mt.AddIndex(segIdx) + _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "") assert.Nil(t, err) - idx, err := mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, idxInfo[0].IndexName) + idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName) assert.Nil(t, err) - assert.Equal(t, segIdx.IndexID, idx.IndexID) + assert.Equal(t, segIdxInfo.IndexID, idx.IndexID) - _, err = mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, "abc") + _, err = mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, "abc") assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("can't find index name = abc on segment = %d, with filed id = %d", segIdx.SegmentID, segIdx.FieldID)) + assert.EqualError(t, err, fmt.Sprintf("can't find index name = abc on segment = %d, with filed id = %d", segIdxInfo.SegmentID, segIdxInfo.FieldID)) - _, err = mt.GetSegmentIndexInfoByID(segIdx.SegmentID, 11, idxInfo[0].IndexName) + _, err = mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, 11, idxInfo[0].IndexName) assert.NotNil(t, err) - assert.EqualError(t, err, fmt.Sprintf("can't find index name = %s on segment = %d, with filed id = 11", idxInfo[0].IndexName, segIdx.SegmentID)) + assert.EqualError(t, err, fmt.Sprintf("can't find index name = %s on segment = %d, with filed id = 11", idxInfo[0].IndexName, segIdxInfo.SegmentID)) }) t.Run("get field schema failed", func(t *testing.T) { diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 0102a4b02238532e2c81e4ff8b8e75846fb712b3..be99ab1d349fa72111cdbd2e9bba6ab0dcefe63b 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -746,15 +746,24 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector { return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)]) } + + var segIdxInfos []*etcdpb.SegmentIndexInfo for _, segID := range segIDs { - if err := t.core.BuildIndex(segID, &field, idxInfo, false); err != nil { + info := etcdpb.SegmentIndexInfo{ + SegmentID: segID, + FieldID: field.FieldID, + IndexID: idxInfo.IndexID, + EnableIndex: false, + } + info.BuildID, err = t.core.BuildIndex(segID, &field, idxInfo, false) + if err != nil { return err } - log.Debug("build index", zap.String("index name", idxInfo.IndexName), - zap.String("field name", field.Name), - zap.Int64("segment id", segID)) + segIdxInfos = append(segIdxInfos, &info) } - return nil + + _, err = t.core.MetaTable.AddIndex(segIdxInfos, "", "") + return err } type DescribeIndexReqTask struct {