diff --git a/internal/datanode/cache.go b/internal/datanode/cache.go index 5e61ba0bdf212cc5630feca0874a18912ca14e56..4bfd148f1cd9df60be1e2e13cadd93e43be39774 100644 --- a/internal/datanode/cache.go +++ b/internal/datanode/cache.go @@ -43,14 +43,14 @@ func (c *Cache) checkIfCached(key UniqueID) bool { return ok } -// Cache caches a specific segment ID into the cache -func (c *Cache) Cache(segID UniqueID) { - c.cacheMap.Store(segID, struct{}{}) +// Cache caches a specific ID into the cache +func (c *Cache) Cache(ID UniqueID) { + c.cacheMap.Store(ID, struct{}{}) } -// Remove removes a set of segment IDs from the cache -func (c *Cache) Remove(segIDs ...UniqueID) { - for _, id := range segIDs { +// Remove removes a set of IDs from the cache +func (c *Cache) Remove(IDs ...UniqueID) { + for _, id := range IDs { c.cacheMap.Delete(id) } } diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 5a1358b8970528267680af7bf27ef494ee493199..3cfcbcef9ae3dc8a7ea9ea91ce4d1d85916fa1ea 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -19,6 +19,7 @@ package datanode import ( "context" "runtime" + "sync" "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" @@ -32,6 +33,7 @@ var maxParallelCompactionNum = calculeateParallel() type compactionExecutor struct { parallelCh chan struct{} + executing sync.Map // planID to compactor taskCh chan compactor } @@ -47,6 +49,7 @@ func calculeateParallel() int { func newCompactionExecutor() *compactionExecutor { return &compactionExecutor{ parallelCh: make(chan struct{}, maxParallelCompactionNum), + executing: sync.Map{}, taskCh: make(chan compactor, maxTaskNum), } } @@ -72,6 +75,7 @@ func (c *compactionExecutor) executeTask(task compactor) { <-c.parallelCh }() + c.executing.Store(task.getPlanID(), task) log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID())) err := task.compact() @@ -82,5 +86,24 @@ func (c *compactionExecutor) executeTask(task compactor) { ) } + c.executing.Delete(task.getPlanID()) log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID())) } + +func (c *compactionExecutor) stopTask(planID UniqueID) { + task, loaded := c.executing.LoadAndDelete(planID) + if loaded { + log.Warn("compaction executor stop task", zap.Int64("planID", planID)) + task.(compactor).stop() + } +} + +func (c *compactionExecutor) stopExecutingtaskByCollectionID(collID UniqueID) { + c.executing.Range(func(key interface{}, value interface{}) bool { + if value.(compactor).getCollection() == collID { + c.stopTask(key.(UniqueID)) + } + + return true + }) +} diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index e489526381a164e59208c87f22bb6d08c8abeb27..b729b5159bcf195651f688682d850f0a50f04730 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -28,6 +28,13 @@ func TestCompactionExecutor(t *testing.T) { ex.execute(newMockCompactor(true)) }) + t.Run("Test stopTask", func(t *testing.T) { + ex := newCompactionExecutor() + mc := newMockCompactor(true) + ex.executing.Store(UniqueID(1), mc) + ex.stopTask(UniqueID(1)) + }) + t.Run("Test start", func(t *testing.T) { ex := newCompactionExecutor() ctx, cancel := context.WithCancel(context.TODO()) @@ -59,16 +66,20 @@ func TestCompactionExecutor(t *testing.T) { } -func newMockCompactor(isvalid bool) compactor { - return &mockCompactor{isvalid} +func newMockCompactor(isvalid bool) *mockCompactor { + return &mockCompactor{isvalid: isvalid} } type mockCompactor struct { + ctx context.Context + cancel context.CancelFunc isvalid bool } +var _ compactor = (*mockCompactor)(nil) + func (mc *mockCompactor) compact() error { - if mc.isvalid { + if !mc.isvalid { return errStart } return nil @@ -77,3 +88,13 @@ func (mc *mockCompactor) compact() error { func (mc *mockCompactor) getPlanID() UniqueID { return 1 } + +func (mc *mockCompactor) stop() { + if mc.cancel != nil { + mc.cancel() + } +} + +func (mc *mockCompactor) getCollection() UniqueID { + return 1 +} diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 3cf89b1b6c0181a45811c7d23aaa672129efc2af..67fd9bbfe98f3a3f6df43742942e0c71c4a303a7 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -49,7 +49,9 @@ type iterator = storage.Iterator type compactor interface { compact() error + stop() getPlanID() UniqueID + getCollection() UniqueID } // make sure compactionTask implements compactor interface @@ -65,12 +67,16 @@ type compactionTask struct { dc types.DataCoord plan *datapb.CompactionPlan + + ctx context.Context + cancel context.CancelFunc } // check if compactionTask implements compactor var _ compactor = (*compactionTask)(nil) func newCompactionTask( + ctx context.Context, dl downloader, ul uploader, replica Replica, @@ -78,7 +84,12 @@ func newCompactionTask( alloc allocatorInterface, dc types.DataCoord, plan *datapb.CompactionPlan) *compactionTask { + + ctx1, cancel := context.WithCancel(ctx) return &compactionTask{ + ctx: ctx1, + cancel: cancel, + downloader: dl, uploader: ul, Replica: replica, @@ -89,6 +100,10 @@ func newCompactionTask( } } +func (t *compactionTask) stop() { + t.cancel() +} + func (t *compactionTask) getPlanID() UniqueID { return t.plan.GetPlanID() } @@ -238,7 +253,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } func (t *compactionTask) compact() error { - ctxTimeout, cancelAll := context.WithTimeout(context.Background(), time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) + ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) defer cancelAll() var targetSegID UniqueID @@ -590,3 +605,7 @@ func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *et } return collID, partID, meta, nil } + +func (t *compactionTask) getCollection() UniqueID { + return t.getCollectionID() +} diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 7974bd50b0c3ef3e1be3657faf60c24da826cfd6..f8f9e252b6d8aa8fc567d98aee8e37ebc35b6009 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -55,6 +55,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.NotNil(t, meta) rc.setCollectionID(-2) + task.Replica.(*SegmentReplica).collSchema = nil _, _, _, err = task.getSegmentMeta(100) assert.Error(t, err) }) @@ -288,7 +289,11 @@ func TestCompactorInterfaceMethods(t *testing.T) { t.Run("Test compact invalid", func(t *testing.T) { invalidAlloc := NewAllocatorFactory(-1) - emptyTask := &compactionTask{} + ctx, cancel := context.WithCancel(context.TODO()) + emptyTask := &compactionTask{ + ctx: ctx, + cancel: cancel, + } emptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{} plan := &datapb.CompactionPlan{ @@ -314,6 +319,8 @@ func TestCompactorInterfaceMethods(t *testing.T) { plan.SegmentBinlogs = notEmptySegmentBinlogs err = emptyTask.compact() assert.Error(t, err) + + emptyTask.stop() }) t.Run("Test typeI compact valid", func(t *testing.T) { @@ -358,7 +365,13 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + canceledTask := newCompactionTask(ctx, mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + err = canceledTask.compact() + assert.Error(t, err) + + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) err = task.compact() assert.NoError(t, err) @@ -366,6 +379,12 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(1), updates.GetNumRows()) + id := task.getCollection() + assert.Equal(t, UniqueID(1), id) + + planID := task.getPlanID() + assert.Equal(t, plan.GetPlanID(), planID) + // New test, remove all the binlogs in memkv // Deltas in timetravel range err = mockKv.RemoveWithPrefix("/") @@ -458,7 +477,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { } alloc.random = false // generated ID = 19530 - task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) err = task.compact() assert.NoError(t, err) @@ -525,7 +544,7 @@ type mockFlushManager struct { var _ flushManager = (*mockFlushManager)(nil) -func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error { +func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index e36db94ef9248af67337e311ba27aaffda945b25..5169b7f6ab7994a55a1559a1b9096078d4076aae 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -98,6 +98,7 @@ type DataNode struct { Role string State atomic.Value // internalpb.StateCode_Initializing + // TODO struct chanMut sync.RWMutex vchan2SyncService map[string]*dataSyncService // vchannel name vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels @@ -118,6 +119,11 @@ type DataNode struct { msFactory msgstream.Factory } +type plan struct { + channelName string + cancel context.CancelFunc +} + // NewDataNode will return a DataNode with abnormal state. func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { rand.Seed(time.Now().UnixNano()) @@ -327,7 +333,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { flushCh := make(chan flushMsg, 100) - dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache) + dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv) if err != nil { return err } @@ -351,6 +357,7 @@ func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) { select { case collID := <-collIDCh: log.Info("GC collection", zap.Int64("ID", collID)) + node.stopCompactionOfCollection(collID) for _, vchanName := range node.getChannelNamesbyCollectionID(collID) { node.ReleaseDataSyncService(vchanName) } @@ -725,6 +732,12 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe }, nil } +func (node *DataNode) stopCompactionOfCollection(collID UniqueID) { + log.Debug("Stop compaction of collection", zap.Int64("collection ID", collID)) + + node.compactionExecutor.stopExecutingtaskByCollectionID(collID) +} + func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -739,6 +752,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan binlogIO := &binlogIO{node.blobKv, ds.idAllocator} task := newCompactionTask( + ctx, binlogIO, binlogIO, ds.replica, ds.flushManager, diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 802ac52f535c6b0ac7bbb209de31bf7220f2293f..5f028215ddb457c22a046f69d24221639632053f 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" - miniokv "github.com/milvus-io/milvus/internal/kv/minio" + "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -47,6 +47,7 @@ type dataSyncService struct { flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushManager flushManager // flush manager handles flush process + blobKV kv.BaseKV } func newDataSyncService(ctx context.Context, @@ -58,6 +59,7 @@ func newDataSyncService(ctx context.Context, clearSignal chan<- UniqueID, dataCoord types.DataCoord, flushingSegCache *Cache, + blobKV kv.BaseKV, ) (*dataSyncService, error) { @@ -79,6 +81,7 @@ func newDataSyncService(ctx context.Context, dataCoord: dataCoord, clearSignal: clearSignal, flushingSegCache: flushingSegCache, + blobKV: blobKV, } if err := service.initNodes(vchan); err != nil { @@ -141,23 +144,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - // MinIO - option := &miniokv.Option{ - Address: Params.MinioAddress, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSL, - CreateBucket: true, - BucketName: Params.MinioBucketName, - } - - minIOKV, err := miniokv.NewMinIOKV(dsService.ctx, option) - if err != nil { - return err - } - // initialize flush manager for DataSync Service - dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error { + dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, func(pack *segmentFlushPack) error { fieldInsert := []*datapb.FieldBinlog{} fieldStats := []*datapb.FieldBinlog{} deltaInfos := []*datapb.DeltaLogInfo{} @@ -205,8 +193,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro StartPositions: dsService.replica.listNewSegmentsStartPositions(), Flushed: pack.flushed, + Dropped: pack.dropped, } - rsp, err := dsService.dataCoord.SaveBinlogPaths(dsService.ctx, req) + rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req) if err != nil { return fmt.Errorf(err.Error()) } @@ -280,7 +269,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - var ddNode Node = newDDNode(dsService.ctx, dsService.clearSignal, dsService.collectionID, vchanInfo, dsService.msFactory) + var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory) var insertBufferNode Node insertBufferNode, err = newInsertBufferNode( dsService.ctx, @@ -294,7 +283,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro } var deleteNode Node - deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, c) + deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.clearSignal, c) if err != nil { return err } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index cc2fb01014d482a242919b02a10b69b965f7d967..f2287d02c9864c747bf97ab847d7f8f782462bf4 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" + memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -146,6 +147,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { make(chan UniqueID), df, newCache(), + memkv.NewMemoryKV(), ) if !test.isValidCase { @@ -222,7 +224,7 @@ func TestDataSyncService_Start(t *testing.T) { } signalCh := make(chan UniqueID, 100) - sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache()) + sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV()) assert.Nil(t, err) // sync.replica.addCollection(collMeta.ID, collMeta.Schema) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 838ecd26e66cda859e27f60e72faa626a731577d..016c13792fe85f7f355440334b563bc9cfd87981 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -19,6 +19,7 @@ package datanode import ( "context" "sync" + "sync/atomic" "go.uber.org/zap" @@ -53,13 +54,13 @@ var _ flowgraph.Node = (*ddNode)(nil) type ddNode struct { BaseNode - clearSignal chan<- UniqueID collectionID UniqueID segID2SegInfo sync.Map // segment ID to *SegmentInfo flushedSegments []*datapb.SegmentInfo deltaMsgStream msgstream.MsgStream + dropMode atomic.Value } // Name returns node name, implementing flowgraph.Node @@ -89,6 +90,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { msg.SetTraceCtx(ctx) } + if load := ddn.dropMode.Load(); load != nil && load.(bool) { + log.Debug("ddNode in dropMode") + return []Msg{} + } + var fgMsg = flowGraphMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), timeRange: TimeRange{ @@ -97,6 +103,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { }, startPositions: make([]*internalpb.MsgPosition, 0), endPositions: make([]*internalpb.MsgPosition, 0), + dropCollection: false, } forwardMsgs := make([]msgstream.TsMsg, 0) @@ -104,9 +111,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { switch msg.Type() { case commonpb.MsgType_DropCollection: if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { - log.Info("Destroying current flowgraph", zap.Any("collectionID", ddn.collectionID)) - ddn.clearSignal <- ddn.collectionID - return []Msg{} + log.Info("Receiving DropCollection msg", zap.Any("collectionID", ddn.collectionID)) + ddn.dropMode.Store(true) + fgMsg.dropCollection = true } case commonpb.MsgType_Insert: log.Debug("DDNode receive insert messages") @@ -233,7 +240,7 @@ func (ddn *ddNode) Close() { } } -func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode { +func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) @@ -247,6 +254,7 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID deltaStream, err := msFactory.NewMsgStream(ctx) if err != nil { + log.Error(err.Error()) return nil } pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName) @@ -255,6 +263,7 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID log.Error(err.Error()) return nil } + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) deltaStream.AsProducer([]string{deltaChannelName}) log.Debug("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName)) @@ -263,12 +272,13 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID dd := &ddNode{ BaseNode: baseNode, - clearSignal: clearSignal, collectionID: collID, flushedSegments: fs, deltaMsgStream: deltaMsgStream, } + dd.dropMode.Store(false) + for _, us := range vchanInfo.GetUnflushedSegments() { dd.segID2SegInfo.Store(us.GetID(), us) } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 6399b25f45ddae172407b0eae580ef6a2278fa47..e8049253a21eeb7c8c6f8a5661a9a2f5934a5db5 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -63,25 +64,28 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) { di.DmlPosition = &internalpb.MsgPosition{Timestamp: test.inUnFlushedChannelTs} } - fi := []*datapb.SegmentInfo{} + var fi []*datapb.SegmentInfo for _, id := range test.inFlushedSegs { s := &datapb.SegmentInfo{ID: id} fi = append(fi, s) } + mmf := &mockMsgStreamFactory{ + true, true, + } ddNode := newDDNode( context.Background(), - make(chan UniqueID), test.inCollID, &datapb.VchannelInfo{ FlushedSegments: fi, UnflushedSegments: []*datapb.SegmentInfo{di}, ChannelName: "by-dev-rootcoord-dml-test", }, - msgstream.NewPmsFactory(), + mmf, ) + require.NotNil(t, ddNode) - flushedSegIDs := make([]int64, 0) + var flushedSegIDs []UniqueID for _, seg := range ddNode.flushedSegments { flushedSegIDs = append(flushedSegIDs, seg.ID) } @@ -123,17 +127,16 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { // valid inputs tests := []struct { - ddnClearSignal chan UniqueID - ddnCollID UniqueID + ddnCollID UniqueID msgCollID UniqueID expectedChlen int description string }{ - {make(chan UniqueID, 1), 1, 1, 1, + {1, 1, 1, "DropCollectionMsg collID == ddNode collID"}, - {make(chan UniqueID, 1), 1, 2, 0, + {1, 2, 0, "DropCollectionMsg collID != ddNode collID"}, } @@ -143,7 +146,6 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) ddn := ddNode{ - clearSignal: test.ddnClearSignal, collectionID: test.ddnCollID, deltaMsgStream: deltaStream, } @@ -158,10 +160,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) rt := ddn.Operate([]Msg{msgStreamMsg}) - assert.Equal(t, test.expectedChlen, len(test.ddnClearSignal)) if test.ddnCollID == test.msgCollID { - assert.Empty(t, rt) + assert.NotEmpty(t, rt) + assert.True(t, rt[0].(*flowGraphMsg).dropCollection) } else { assert.NotEmpty(t, rt) } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 49e852476a3614ddc7db814547cd7b21b84963bd..4cadcf61545fd6d947b72e8a974ec0fd71684b09 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -44,6 +44,8 @@ type deleteNode struct { replica Replica idAllocator allocatorInterface flushManager flushManager + + clearSignal chan<- UniqueID } // DelDataBuf buffers insert data, monitoring buffer size and limit @@ -206,10 +208,14 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { dn.delBuf.Delete(segmentToFlush) } } - } } + if fgMsg.dropCollection { + log.Debug("DeleteNode reveives dropCollection signal") + dn.clearSignal <- dn.replica.getCollectionID() + } + for _, sp := range spans { sp.Finish() } @@ -235,7 +241,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []int64) map[int64] return result } -func newDeleteNode(ctx context.Context, fm flushManager, config *nodeConfig) (*deleteNode, error) { +func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- UniqueID, config *nodeConfig) (*deleteNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(config.maxQueueLength) baseNode.SetMaxParallelism(config.maxParallelism) @@ -248,5 +254,6 @@ func newDeleteNode(ctx context.Context, fm flushManager, config *nodeConfig) (*d idAllocator: config.allocator, channelName: config.vChannelName, flushManager: fm, + clearSignal: sig, }, nil } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 7284c18988dbf7ff8644f7513ecdbbb12a2cbbdb..1e4e3accdfff843ba4bfcffaefdaf861b5c6fe81 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -107,7 +107,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn, err := newDeleteNode(test.ctx, nil, test.config) + dn, err := newDeleteNode(test.ctx, nil, make(chan UniqueID, 1), test.config) assert.Nil(t, err) assert.NotNil(t, dn) @@ -215,7 +215,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { vChannelName: chanName, } - dn, err := newDeleteNode(context.Background(), fm, c) + dn, err := newDeleteNode(context.Background(), fm, make(chan UniqueID, 1), c) assert.Nil(t, err) results := dn.filterSegmentByPK(0, pks) @@ -246,7 +246,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, c) + delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c) assert.Nil(te, err) msg := genFlowGraphDeleteMsg(pks, chanName) @@ -270,7 +270,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: chanName, } - delNode, err := newDeleteNode(ctx, fm, c) + delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c) assert.Nil(te, err) msg := genFlowGraphDeleteMsg(pks, chanName) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index d7e1bd081f8a1c11a2e3ce814b1acbff5fef74d3..5c100b84513415d369237d34246ad8a5ba923013 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -21,7 +21,6 @@ import ( "context" "encoding/binary" "errors" - "fmt" "io" "strconv" "sync" @@ -40,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -247,14 +245,45 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { zap.Int64("buffer limit", bd.(*BufferData).limit)) } - segmentsToFlush := make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush - + // Flush type flushTask struct { buffer *BufferData segmentID UniqueID flushed bool + dropped bool + } + + var ( + flushTaskList []flushTask + segmentsToFlush []UniqueID + ) + + if fgMsg.dropCollection { + segmentsToFlush := ibNode.replica.listAllSegmentIDs() + log.Debug("Recive drop collection req and flushing all segments", + zap.Any("segments", segmentsToFlush)) + flushTaskList = make([]flushTask, 0, len(segmentsToFlush)) + + for _, seg2Flush := range segmentsToFlush { + var buf *BufferData + bd, ok := ibNode.insertBuffer.Load(seg2Flush) + if !ok { + buf = nil + } else { + buf = bd.(*BufferData) + } + flushTaskList = append(flushTaskList, flushTask{ + buffer: buf, + segmentID: seg2Flush, + flushed: false, + dropped: true, + }) + } + goto flush // Jump over the auto-flush and manual flush procedure } - flushTaskList := make([]flushTask, 0, len(seg2Upload)+1) + + segmentsToFlush = make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush + flushTaskList = make([]flushTask, 0, len(seg2Upload)+1) // Auto Flush for _, segToFlush := range seg2Upload { @@ -267,6 +296,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { buffer: ibuffer, segmentID: segToFlush, flushed: false, + dropped: false, }) } } @@ -274,6 +304,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { // Manual Flush select { case fmsg := <-ibNode.flushChan: + log.Debug(". Receiving flush message", zap.Int64("segmentID", fmsg.segmentID), zap.Int64("collectionID", fmsg.collectionID), @@ -299,13 +330,15 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { buffer: buf, segmentID: currentSegID, flushed: fmsg.flushed, + dropped: false, }) } default: } +flush: for _, task := range flushTaskList { - err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, endPositions[0]) + err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0]) if err != nil { log.Warn("failed to invoke flushBufferData", zap.Error(err)) } else { @@ -327,6 +360,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { startPositions: fgMsg.startPositions, endPositions: fgMsg.endPositions, segmentsToFlush: segmentsToFlush, + dropCollection: fgMsg.dropCollection, } for _, sp := range spans { @@ -719,24 +753,6 @@ func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error { return ibNode.segmentStatisticsStream.Produce(&msgPack) } -func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timestamp) (meta *etcdpb.CollectionMeta, err error) { - if !ibNode.replica.hasSegment(segmentID, true) { - return nil, fmt.Errorf("No such segment %d in the replica", segmentID) - } - - collID := ibNode.replica.getCollectionID() - sch, err := ibNode.replica.getCollectionSchema(collID, ts) - if err != nil { - return nil, err - } - - meta = &etcdpb.CollectionMeta{ - ID: collID, - Schema: sch, - } - return -} - func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) { return ibNode.replica.getCollectionAndPartitionID(segmentID) } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 38e7e455faee592704df8f3f260997b8cd84d174..c225d1dd7639cdb0f734f009f5daf24c2a5b66e6 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -622,67 +622,6 @@ func (m *CompactedRootCoord) DescribeCollection(ctx context.Context, in *milvusp return m.RootCoord.DescribeCollection(ctx, in) } -func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate" - - testPath := "/test/datanode/root/meta" - err := clearEtcd(testPath) - require.NoError(t, err) - Params.MetaRootPath = testPath - - Factory := &MetaFactory{} - collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") - - rcf := &RootCoordFactory{} - mockRootCoord := &CompactedRootCoord{ - RootCoord: rcf, - compactTs: 100, - } - - replica, err := newReplica(ctx, mockRootCoord, collMeta.ID) - assert.Nil(t, err) - - err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) - require.NoError(t, err) - - msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarAddress, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) - assert.Nil(t, err) - - memkv := memkv.NewMemoryKV() - - fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error { - return nil - }) - - flushChan := make(chan flushMsg, 100) - c := &nodeConfig{ - replica: replica, - msFactory: msFactory, - allocator: NewAllocatorFactory(), - vChannelName: "string", - } - iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c) - require.NoError(t, err) - - meta, err := iBNode.getCollMetabySegID(1, 101) - assert.Nil(t, err) - assert.Equal(t, collMeta.ID, meta.ID) - - _, err = iBNode.getCollMetabySegID(2, 101) - assert.NotNil(t, err) - - meta, err = iBNode.getCollMetabySegID(1, 99) - assert.NotNil(t, err) -} - func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -740,12 +679,6 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { assert.Nil(t, err) } - for _, msg := range inMsg.insertMessages { - msg.EndTimestamp = 99 // ts invalid - err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{}) - assert.NotNil(t, err) - } - for _, msg := range inMsg.insertMessages { msg.EndTimestamp = 101 // ts valid msg.RowIDs = []int64{} //misaligned data diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index d9cae7724bbf543a67fd742adb165bd5d5eeabb5..5221c077a9a5902cbe51eaf127cf291c420d82f8 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -38,6 +38,7 @@ type flowGraphMsg struct { endPositions []*internalpb.MsgPosition //segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush segmentsToFlush []UniqueID + dropCollection bool } func (fgMsg *flowGraphMsg) TimeTick() Timestamp { diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index f27262bb77ceda88b62aa6d86da944dad96c01a7..2a83106c6dcb07bc0c979cc83aebacbb04f9f7b4 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -33,7 +33,7 @@ import ( // flushManager defines a flush manager signature type flushManager interface { // notify flush manager insert buffer data - flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error + flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error // notify flush manager del buffer data flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error // injectFlush injects compaction or other blocking task before flush sync @@ -48,6 +48,7 @@ type segmentFlushPack struct { deltaLogs []*DelDataBuf pos *internalpb.MsgPosition flushed bool + dropped bool } // notifyMetaFunc notify meta to persistent flush result @@ -139,8 +140,8 @@ func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInj } // enqueueInsertBuffer put insert buffer data into queue -func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) { - q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, pos) +func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) { + q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos) } // enqueueDelBuffer put delete buffer data into queue @@ -219,12 +220,12 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu // notify flush manager insert buffer data func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, - pos *internalpb.MsgPosition) error { + dropped bool, pos *internalpb.MsgPosition) error { // empty flush if data == nil || data.buffer == nil { m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{}, - map[UniqueID]string{}, map[UniqueID]string{}, flushed, pos) + map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos) return nil } @@ -292,7 +293,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{ BaseKV: m.BaseKV, data: kvs, - }, field2Insert, field2Stats, flushed, pos) + }, field2Insert, field2Stats, flushed, dropped, pos) return nil } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 495667a16ec8db12a879fc93fd001678b57fe7b9..18c5f8a93bf6cfd2bd2446c942ac69062dc08ec3 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -68,7 +68,7 @@ func TestOrderFlushQueue_Execute(t *testing.T) { wg.Done() }(ids[i]) go func(id []byte) { - q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{ + q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, false, &internalpb.MsgPosition{ MsgID: id, }) wg.Done() @@ -107,7 +107,7 @@ func TestOrderFlushQueue_Order(t *testing.T) { q.enqueueDelFlush(&emptyFlushTask{}, &DelDataBuf{}, &internalpb.MsgPosition{ MsgID: ids[i], }) - q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{ + q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() @@ -149,7 +149,7 @@ func TestRendezvousFlushManager(t *testing.T) { m.flushDelData(nil, 1, &internalpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, &internalpb.MsgPosition{ + m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() @@ -199,7 +199,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { m.flushDelData(nil, 1, &internalpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, &internalpb.MsgPosition{ + m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() @@ -212,7 +212,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { finish.Add(1) id := make([]byte, 10) rand.Read(id) - m.flushBufferData(nil, 2, true, &internalpb.MsgPosition{ + m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{ MsgID: id, }) @@ -238,7 +238,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { finish.Add(1) rand.Read(id) - m.flushBufferData(nil, 2, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{ MsgID: id, }) m.flushDelData(nil, 2, &internalpb.MsgPosition{ diff --git a/internal/datanode/flush_task.go b/internal/datanode/flush_task.go index 9ae36aeb5984c4dc1aea0a5e528750c9e306335f..b3bdceb9d9eb4f04ca378373396e3efbe27def48 100644 --- a/internal/datanode/flush_task.go +++ b/internal/datanode/flush_task.go @@ -58,6 +58,7 @@ type flushTaskRunner struct { deltaLogs []*DelDataBuf pos *internalpb.MsgPosition flushed bool + dropped bool } type taskInjection struct { @@ -76,12 +77,13 @@ func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal < } // runFlushInsert executei flush insert task with once and retry -func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) { +func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) { t.insertOnce.Do(func() { t.insertLogs = binlogs t.statsLogs = statslogs t.flushed = flushed t.pos = pos + t.dropped = dropped go func() { err := errStart for err != nil { @@ -150,6 +152,7 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack { pos: t.pos, deltaLogs: t.deltaLogs, flushed: t.flushed, + dropped: t.dropped, } return pack diff --git a/internal/datanode/flush_task_test.go b/internal/datanode/flush_task_test.go index 7f08017834fc0ee0504f7832a7e018e686b133a0..75adb855498a51c13d852ed3b4d33f699786e114 100644 --- a/internal/datanode/flush_task_test.go +++ b/internal/datanode/flush_task_test.go @@ -44,7 +44,7 @@ func TestFlushTaskRunner(t *testing.T) { assert.False(t, saveFlag) assert.False(t, nextFlag) - task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, nil) + task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, false, nil) task.runFlushDel(&emptyFlushTask{}, &DelDataBuf{}) assert.False(t, saveFlag) @@ -102,7 +102,7 @@ func TestFlushTaskRunner_Injection(t *testing.T) { assert.False(t, saveFlag) assert.False(t, nextFlag) - task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, nil) + task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, false, nil) task.runFlushDel(&emptyFlushTask{}, &DelDataBuf{}) assert.False(t, saveFlag) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 1dee6eb1f6febfbd355ddd51801c738a29be7b39..2fcf4bc5710bda05ea5134569fbfba57f704d25a 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -417,6 +417,7 @@ func (df *DataFactory) GenMsgStreamInsertMsg(idx int, chanName string) *msgstrea CollectionName: "col1", PartitionName: "default", SegmentID: 1, + CollectionID: UniqueID(0), ShardName: chanName, Timestamps: []Timestamp{Timestamp(idx + 1000)}, RowIDs: []UniqueID{UniqueID(idx)}, diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 05dbadb3910d3fbb963ddb6c2e86103dc4bcefd3..c07b3ceb7af8009bdf250eef3b4f62de3c6ba6af 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -23,18 +23,19 @@ import ( "sync" "sync/atomic" + "github.com/bits-and-blooms/bloom/v3" "go.uber.org/zap" - "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/schemapb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/types" ) const ( @@ -49,6 +50,7 @@ type Replica interface { getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) + listAllSegmentIDs() []UniqueID addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint) error filterSegments(channelName string, partitionID UniqueID) []*Segment @@ -618,13 +620,16 @@ func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp return nil, fmt.Errorf("Not supported collection %v", collID) } - sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts) - if err != nil { - log.Error("Grpc error", zap.Error(err)) - return nil, err + if replica.collSchema == nil { + sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts) + if err != nil { + log.Error("Grpc error", zap.Error(err)) + return nil, err + } + replica.collSchema = sch } - return sch, nil + return replica.collSchema, nil } func (replica *SegmentReplica) validCollection(collID UniqueID) bool { @@ -700,3 +705,24 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un replica.flushedSegments[segID] = seg replica.segMu.Unlock() } + +func (replica *SegmentReplica) listAllSegmentIDs() []UniqueID { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + var segIDs []UniqueID + + for _, seg := range replica.newSegments { + segIDs = append(segIDs, seg.segmentID) + } + + for _, seg := range replica.normalSegments { + segIDs = append(segIDs, seg.segmentID) + } + + for _, seg := range replica.flushedSegments { + segIDs = append(segIDs, seg.segmentID) + } + + return segIDs +} diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index c73d79be7ee4e44f1a08eef7cd5957621b8f2092..d5a4e2dbbb37b71bab090b1f3c5d68d9a6ded72d 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -571,6 +571,18 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { }) + t.Run("Test listAllSegmentIDs", func(t *testing.T) { + sr := &SegmentReplica{ + newSegments: map[UniqueID]*Segment{1: {segmentID: 1}}, + normalSegments: map[UniqueID]*Segment{2: {segmentID: 2}}, + flushedSegments: map[UniqueID]*Segment{3: {segmentID: 3}}, + } + + ids := sr.listAllSegmentIDs() + assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids) + + }) + t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) { sr, err := newReplica(context.Background(), rc, 1) assert.Nil(t, err)